From 370051ba24643bef2d544f5f45520d6c90b41b82 Mon Sep 17 00:00:00 2001 From: Mike McKerns Date: Fri, 28 Jun 2013 10:17:37 -0700 Subject: [PATCH 1/4] Initial commit --- README.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..451c857 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +pyina +===== + +a MPI-based parallel mapper and launcher From 8016476376341730468ea4df6b27ca1c85c34ced Mon Sep 17 00:00:00 2001 From: Mike McKerns Date: Thu, 11 Jul 2013 10:28:19 -0700 Subject: [PATCH 2/4] merged changes to README.md from svn --- README.md | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 451c857..2780e20 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,90 @@ pyina ===== - a MPI-based parallel mapper and launcher + +About Pyina +----------- +The pyina package provides several basic tools to make MPI-based +high-performance computing more accessable to the end user. The goal +of pyina is to allow the user to extend their own code to MPI-based +high-performance computing with minimal refactoring. + +The central element of pyina is the parallel map-reduce algorithm. +Pyina currently provides two strategies for executing the parallel-map, +where a strategy is the algorithm for distributing the work list of +jobs across the availble nodes. These strategies can be used "in-the-raw" +(i.e. directly) to provide map-reduce to a user's own mpi-aware code. +Further, pyina provides the "ez_map" interface, which is a map-reduce +implementation that hides the MPI internals from the user. With ez_map, +the user can launch their code in parallel batch mode -- using standard +python and without ever having to write a line of parallel python or MPI code. + +There are several ways that a user would typically launch their code in +parallel -- directly with "mpirun" or "mpiexec", or through the use of a +scheduler such as torque or slurm. Pyina encapsulates several of these +'launchers', and provides a common interface to the different methods of +launching a MPI job. + +Pyina is part of pathos, a python framework for heterogenous computing. +Pyina is in the early development stages, and any user feedback is +highly appreciated. Contact Mike McKerns [mmckerns at caltech dot edu] +with comments, suggestions, and any bugs you may find. A list of known +issues is maintained at http://trac.mystic.cacr.caltech.edu/project/pathos/query. + + +Major Features +-------------- +Pyina provides a highly configurable parallel map-reduce interface +to running MPI jobs, with:: + * a map-reduce interface that extends the python 'map' standard + * the ability to submit batch jobs to a selection of schedulers + * the ability to customize node and process launch configurations + * the ability to launch parallel MPI jobs with standard python + * ease in selecting different strategies for processing a work list + + +Current Release +--------------- +The latest released version of pyina is available from:: + http://trac.mystic.cacr.caltech.edu/project/pathos + +Pyina is distributed under a modified BSD license. + +Development Release +------------------- +You can get the latest development release with all the shiny new features at:: + http://dev.danse.us/packages. + +or even better, fork us on our github mirror of the svn trunk:: + https://github.com/uqfoundation + +Citation +-------- +If you use pyina to do research that leads to publication, we ask that you +acknowledge use of pyina by citing the following in your publication:: + + M.M. McKerns, L. Strand, T. Sullivan, A. Fang, M.A.G. Aivazis, + "Building a framework for predictive science", Proceedings of + the 10th Python in Science Conference, 2011; + http://arxiv.org/pdf/1202.1056 + + Michael McKerns and Michael Aivazis, + "pathos: a framework for heterogeneous computing", 2010- ; + http://trac.mystic.cacr.caltech.edu/project/pathos + +More Information +---------------- +Probably the best way to get started is to look at a few of the +examples provided within pyina. See `pyina.examples` for a +set of scripts that demonstrate the configuration and launching of +mpi-based parallel jobs using the `ez_map` interface. Also see +`pyina.examples_other` for a set of scripts that test the more raw +internals of pyina. The source code is also generally well documented, +so further questions may be resolved by inspecting the code itself, or through +browsing the reference manual. For those who like to leap before +they look, you can jump right to the installation instructions. If the aforementioned documents +do not adequately address your needs, please send us feedback. + +Pyina is an active research tool. There are a growing number of publications and presentations that +discuss real-world examples and new features of pyina in greater detail than presented in the user's guide. +If you would like to share how you use pyina in your work, please send us a link. From bd70091b3988c8c583eb9dad3232d67c0c31bb4a Mon Sep 17 00:00:00 2001 From: Mike McKerns Date: Thu, 11 Jul 2013 11:24:17 -0700 Subject: [PATCH 3/4] fixed formatting in README.md --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 2780e20..153dfab 100644 --- a/README.md +++ b/README.md @@ -36,11 +36,12 @@ Major Features -------------- Pyina provides a highly configurable parallel map-reduce interface to running MPI jobs, with:: - * a map-reduce interface that extends the python 'map' standard - * the ability to submit batch jobs to a selection of schedulers - * the ability to customize node and process launch configurations - * the ability to launch parallel MPI jobs with standard python - * ease in selecting different strategies for processing a work list + +* a map-reduce interface that extends the python 'map' standard +* the ability to submit batch jobs to a selection of schedulers +* the ability to customize node and process launch configurations +* the ability to launch parallel MPI jobs with standard python +* ease in selecting different strategies for processing a work list Current Release From 5936cac302dfd9848875dd47b34a591ca47065cf Mon Sep 17 00:00:00 2001 From: Lee Kamentsky Date: Wed, 4 Apr 2018 14:27:12 -0400 Subject: [PATCH 4/4] Unit tests pass on Python 3.6 --- .gitignore | 4 +++ .travis.yml | 1 + applications/machines_raw.py | 4 +-- applications/parallel_batch.py | 4 +-- applications/parallel_batch_raw.py | 4 +-- examples/nodes.py | 2 +- examples/pypi.py | 4 +-- examples/pypi_pmap.py | 4 +-- examples/test_ezmap.py | 46 +++++++++++++-------------- examples/test_ezmap1.py | 16 +++++----- examples/test_ezmap2.py | 16 +++++----- examples/test_ezmap3.py | 16 +++++----- examples/test_ezmap4.py | 8 ++--- examples/test_ezmap5.py | 16 +++++----- examples/test_ezmap6.py | 8 ++--- examples/test_ezmap7.py | 10 +++--- examples/test_ezmap8.py | 10 +++--- examples/test_launch.py | 42 ++++++++++++------------- examples/test_pmap.py | 4 +-- examples/which.py | 10 +++--- examples_other/hello.py | 2 +- examples_other/mpd_trace.py | 6 ++-- examples_other/mpipyre1.py | 2 +- examples_other/mpipyre2.py | 4 +-- examples_other/test_parallelmap.py | 6 ++-- examples_other/test_ports.py | 2 +- pyina/__init__.py | 19 ++++++------ pyina/ez_map.py | 50 +++++++++++++++--------------- pyina/launchers.py | 20 ++++++------ pyina/mappers.py | 2 +- pyina/mpi.py | 8 ++--- pyina/mpi_pool.py | 22 +++++++------ pyina/mpi_scatter.py | 17 +++++----- pyina/schedulers.py | 14 ++++----- pyina/tools.py | 4 +-- scripts/ezpool.py | 6 ++-- scripts/ezscatter.py | 6 ++-- scripts/machines.py | 4 +-- scripts/mpi_world.py | 14 ++++----- setup.py | 12 +++---- tests/test_ezmap.py | 18 +++++------ tests/test_map.py | 14 ++++----- tests/test_pool.py | 10 ++++-- tests/test_star.py | 34 ++++++++++---------- tests/test_with.py | 6 ++-- 45 files changed, 272 insertions(+), 259 deletions(-) diff --git a/.gitignore b/.gitignore index a1eb928..c085e87 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ .cache/ *.egg-info/ *.pyc +pyina/info.py +dist/ +README +.idea/ diff --git a/.travis.yml b/.travis.yml index 77e3cb6..1431913 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ matrix: - python: '2.5' - python: '2.6' - python: '2.7' + - python: '3.5' allow_failures: - python: '2.5' fast_finish: true diff --git a/applications/machines_raw.py b/applications/machines_raw.py index 5f69e7d..7170bed 100755 --- a/applications/machines_raw.py +++ b/applications/machines_raw.py @@ -29,9 +29,9 @@ def host(id): hostnames = parallel_map(host, range(world.size)) if world.rank == 0: - print '\n'.join(hostnames) + print('\n'.join(hostnames)) except: - print __doc__ + print(__doc__) # end of file diff --git a/applications/parallel_batch.py b/applications/parallel_batch.py index 336c35d..23a196d 100755 --- a/applications/parallel_batch.py +++ b/applications/parallel_batch.py @@ -20,7 +20,7 @@ def runshell(input): """ import socket from subprocess import Popen, PIPE - print "%s executing: %s" % (socket.gethostname(), input) + print("%s executing: %s" % (socket.gethostname(), input)) pipe = Popen(input, shell=True, stdout=PIPE).stdout pipe.readlines() return 0 @@ -40,7 +40,7 @@ def runshell(input): inputlist = open(batchfile).readlines() out = ez_map(runshell, inputlist, nodes=nnodes) except: - print __doc__ + print(__doc__) # End of file diff --git a/applications/parallel_batch_raw.py b/applications/parallel_batch_raw.py index d4859de..884b4e0 100755 --- a/applications/parallel_batch_raw.py +++ b/applications/parallel_batch_raw.py @@ -21,7 +21,7 @@ def runshell(input): """ from pyina import mpi from subprocess import Popen, PIPE - print "%d of %d: executing: %s" % (mpi.world.rank, mpi.world.size, input) + print("%d of %d: executing: %s" % (mpi.world.rank, mpi.world.size, input)) pipe = Popen(input, shell=True, stdout=PIPE).stdout pipe.readlines() return 0 @@ -42,7 +42,7 @@ def runshell(input): inputlist = open(batchfile).readlines() out = parallel_map2(runshell, inputlist) except: - print __doc__ + print(__doc__) # End of file diff --git a/examples/nodes.py b/examples/nodes.py index 6eab92b..034aa0f 100755 --- a/examples/nodes.py +++ b/examples/nodes.py @@ -17,6 +17,6 @@ from pyina import mpi world = mpi.world -print "Node (%d) of %d " % (world.rank, world.size) +print("Node (%d) of %d " % (world.rank, world.size)) # End of file diff --git a/examples/pypi.py b/examples/pypi.py index 9a67ad4..89389b4 100755 --- a/examples/pypi.py +++ b/examples/pypi.py @@ -48,8 +48,8 @@ def f(x): from pyina import mpi if mpi.world.rank == 0: - print "approxmiate pi : ", sum(out)/n - print "calculated on %d nodes: " % work.nodes + print("approxmiate pi : ", sum(out)/n) + print("calculated on %d nodes: " % work.nodes) # end of file diff --git a/examples/pypi_pmap.py b/examples/pypi_pmap.py index 53f633a..f48d6e8 100755 --- a/examples/pypi_pmap.py +++ b/examples/pypi_pmap.py @@ -48,8 +48,8 @@ def f(x): from pyina import mpi if mpi.world.rank == 0: - print "approxmiate pi : ", sum(out)/n - print "calculated on %d nodes: " % mpi.world.size + print("approxmiate pi : ", sum(out)/n) + print("calculated on %d nodes: " % mpi.world.size) # end of file diff --git a/examples/test_ezmap.py b/examples/test_ezmap.py index 3af1536..f1d02f9 100755 --- a/examples/test_ezmap.py +++ b/examples/test_ezmap.py @@ -12,46 +12,46 @@ def host(id): import socket return "Rank: %d -- %s" % (id, socket.gethostname()) -print "Evaluate 10 items on 3 nodes using a worker pool:" +print("Evaluate 10 items on 3 nodes using a worker pool:") pool = MpiPool(3) res1 = pool.map(host, range(10)) -print pool -print '\n'.join(res1) -print '' +print(pool) +print('\n'.join(res1)) +print('') -print "Evaluate 10 items on 3 nodes using scatter-gather:" +print("Evaluate 10 items on 3 nodes using scatter-gather:") scat = MpiScatter(3) res2 = scat.map(host, range(10)) -print scat -print '\n'.join(res2) -print '' +print(scat) +print('\n'.join(res2)) +print('') -print "Evaluate 5 items on 2 nodes using a worker pool:" +print("Evaluate 5 items on 2 nodes using a worker pool:") pool.nodes = 2 res3 = pool.map(host, range(5)) -print pool -print '\n'.join(res3) -print '' +print(pool) +print('\n'.join(res3)) +print('') -print "Evaluate 5 items on 2 nodes using scatter-gather:" +print("Evaluate 5 items on 2 nodes using scatter-gather:") scat.nodes = 2 res4 = scat.map(host, range(5)) -print scat -print '\n'.join(res4) -print '' +print(scat) +print('\n'.join(res4)) +print('') #NOTE: bug? does worker pool perform correctly when nnodes > range ??? -print "Evaluate 5 items on 10 nodes using worker pool:" +print("Evaluate 5 items on 10 nodes using worker pool:") pool.nodes = 10 res5 = pool.map(host, range(5)) -print pool -print '\n'.join(res5) -print '' +print(pool) +print('\n'.join(res5)) +print('') -print "Evaluate 5 items on 10 nodes using scatter-gather:" +print("Evaluate 5 items on 10 nodes using scatter-gather:") scat.nodes = 10 res6 = scat.map(host, range(5)) -print scat -print '\n'.join(res6) +print(scat) +print('\n'.join(res6)) # end of file diff --git a/examples/test_ezmap1.py b/examples/test_ezmap1.py index 7b2b3d8..92adb20 100755 --- a/examples/test_ezmap1.py +++ b/examples/test_ezmap1.py @@ -12,18 +12,18 @@ def host(id): import socket return "Rank: %d -- %s" % (id, socket.gethostname()) -print "Evaluate 10 items on 1 node (w/ 1 ppn) using scatter-gather:" +print("Evaluate 10 items on 1 node (w/ 1 ppn) using scatter-gather:") scat = MpiScatter('1:ppn=1') res1 = scat.map(host, range(10)) -print scat -print '\n'.join(res1) -print '' +print(scat) +print('\n'.join(res1)) +print('') -print "Evaluate 10 items on 1 node (w/ 2 ppn) using scatter-gather:" +print("Evaluate 10 items on 1 node (w/ 2 ppn) using scatter-gather:") scat.nodes = '1:ppn=2' res2 = scat.map(host, range(10)) -print scat -print '\n'.join(res2) -print '' +print(scat) +print('\n'.join(res2)) +print('') # end of file diff --git a/examples/test_ezmap2.py b/examples/test_ezmap2.py index 90c8725..85c1408 100755 --- a/examples/test_ezmap2.py +++ b/examples/test_ezmap2.py @@ -21,19 +21,19 @@ def play2(id,l): arg1 = [ i for i in range(5) ] arg2 = [ range(3)*i for i in range(5) ] -print "Using 12 nodes and a worker pool..." -print 'Evaluate a function that expects a n-tuple argument "map(f,args)"' +print("Using 12 nodes and a worker pool...") +print('Evaluate a function that expects a n-tuple argument "map(f,args)"') pool = MpiPool(12) res1 = pool.map(play, args) #res1 = map(play, args) -print pool -print '\n'.join(res1) -print '' +print(pool) +print('\n'.join(res1)) +print('') -print 'Evaluate a function that expects n arguments "map(f,arg1,arg2)"' +print('Evaluate a function that expects n arguments "map(f,arg1,arg2)"') res2 = pool.map(play2, arg1, arg2) #res2 = map(play2, arg1, arg2) -print pool -print '\n'.join(res2) +print(pool) +print('\n'.join(res2)) # end of file diff --git a/examples/test_ezmap3.py b/examples/test_ezmap3.py index 73c3e08..9b362eb 100755 --- a/examples/test_ezmap3.py +++ b/examples/test_ezmap3.py @@ -12,18 +12,18 @@ def host(id): import socket return "Rank: %d -- %s" % (id, socket.gethostname()) -print "Explicitly using the MPI launcher, we will execute..." +print("Explicitly using the MPI launcher, we will execute...") pool = Mpi(4) -print "10 items on 4 nodes using a worker pool:" +print("10 items on 4 nodes using a worker pool:") res1 = pool.map(host, range(10)) -print pool -print '\n'.join(res1) -print '' +print(pool) +print('\n'.join(res1)) +print('') -print "10 items on 4 nodes using scatter-gather:" +print("10 items on 4 nodes using scatter-gather:") pool.scatter = True res2 = pool.map(host, range(10)) -print pool -print '\n'.join(res2) +print(pool) +print('\n'.join(res2)) # end of file diff --git a/examples/test_ezmap4.py b/examples/test_ezmap4.py index 6ac2e4e..4db61e0 100755 --- a/examples/test_ezmap4.py +++ b/examples/test_ezmap4.py @@ -15,12 +15,12 @@ def host(coeffs): from mystic.models import rosen as func return "rosen%s = %s" % (coeffs, func(coeffs)) -print "Evaluate an imported function (the rosenbrock function)..." -print "For 10 items on 4 nodes, using the default mapping strategy" +print("Evaluate an imported function (the rosenbrock function)...") +print("For 10 items on 4 nodes, using the default mapping strategy") params = [(i,i,i) for i in range(10)] pool = Mpi(4) res = pool.map(host, params) -print pool -print '\n'.join(res) +print(pool) +print('\n'.join(res)) # end of file diff --git a/examples/test_ezmap5.py b/examples/test_ezmap5.py index 072bab1..f32d464 100755 --- a/examples/test_ezmap5.py +++ b/examples/test_ezmap5.py @@ -19,18 +19,18 @@ def host(coeffs): params = [(i,0,-2*i,0,4*i,0,-2*i,0,i) for i in range(10)] pool = Mpi() -print "Evaluate the 8th order Chebyshev polynomial..." -print "Using 'dill' for 10 combinations over 4 nodes" +print("Evaluate the 8th order Chebyshev polynomial...") +print("Using 'dill' for 10 combinations over 4 nodes") pool.nodes = 4 res1 = pool.map(host, params) -print pool -print '\n'.join(res1) -print '' +print(pool) +print('\n'.join(res1)) +print('') -print "Using 'dill.source' for 10 combinations over 4 nodes" +print("Using 'dill.source' for 10 combinations over 4 nodes") pool.source = True res2 = pool.map(host, params) -print pool -print '\n'.join(res2) +print(pool) +print('\n'.join(res2)) # end of file diff --git a/examples/test_ezmap6.py b/examples/test_ezmap6.py index cabdf50..525008d 100755 --- a/examples/test_ezmap6.py +++ b/examples/test_ezmap6.py @@ -16,12 +16,12 @@ def host(id): import socket return "Rank: %d -- %s" % (id, socket.gethostname()) -print "Submit a non-parallel job to torque in the 'productionQ' queue..." -print "Using 5 items over 1 nodes and the default mapping strategy" +print("Submit a non-parallel job to torque in the 'productionQ' queue...") +print("Using 5 items over 1 nodes and the default mapping strategy") torque = Torque(queue='productionQ', timelimit='20:00:00', workdir='.') pool = SerialMapper(scheduler=torque) res = pool.map(host, range(5)) -print pool -print '\n'.join(res) +print(pool) +print('\n'.join(res)) # end of file diff --git a/examples/test_ezmap7.py b/examples/test_ezmap7.py index 37e5c88..32c6041 100755 --- a/examples/test_ezmap7.py +++ b/examples/test_ezmap7.py @@ -16,14 +16,14 @@ def host(id): import socket return "Rank: %d -- %s" % (id, socket.gethostname()) -print "Submit an mpi job to torque in the 'productionQ' queue..." -print "Using 15 items over 5 nodes and the scatter-gather strategy" +print("Submit an mpi job to torque in the 'productionQ' queue...") +print("Using 15 items over 5 nodes and the scatter-gather strategy") torque = Torque('5:ppn=2', queue='productionQ', timelimit='20:00:00', workdir='.') pool = Mpi(scheduler=torque, scatter=True) res = pool.map(host, range(15)) -print pool -print '\n'.join(res) +print(pool) +print('\n'.join(res)) -print "hello from master" +print("hello from master") # end of file diff --git a/examples/test_ezmap8.py b/examples/test_ezmap8.py index 3359f63..de8cab0 100755 --- a/examples/test_ezmap8.py +++ b/examples/test_ezmap8.py @@ -15,13 +15,13 @@ def host(id): import socket return "Rank: %d -- %s" % (id, socket.gethostname()) -print "Submit an mpi job to torque in the 'productionQ' queue..." -print "Using 15 items over 5 nodes and the worker pool strategy" +print("Submit an mpi job to torque in the 'productionQ' queue...") +print("Using 15 items over 5 nodes and the worker pool strategy") pool = Launcher('5:ppn=2', queue='productionQ', timelimit='20:00:00', workdir='.') res = pool.map(host, range(15)) -print pool -print '\n'.join(res) +print(pool) +print('\n'.join(res)) -print "hello from master" +print("hello from master") # end of file diff --git a/examples/test_launch.py b/examples/test_launch.py index 03f3988..b2fcd3b 100644 --- a/examples/test_launch.py +++ b/examples/test_launch.py @@ -13,53 +13,53 @@ def test_launches(): # default launch commands for all launchers - print "***** defaults for all launchers *****" - print all_launches() - print "**************************************", "\n" + print("***** defaults for all launchers *****") + print(all_launches()) + print("**************************************", "\n") def test_launcher(): # configured launch commands for selected launchers serial = SerialMapper() - print "non-python serial launch:", serial + print("non-python serial launch:", serial) settings = {'python':'', 'program':"hostname"} - print serial._launcher(settings), "\n" + print(serial._launcher(settings), "\n") - print "serial python launch:", serial + print("serial python launch:", serial) defaults['program'] = "tools.py" defaults['progargs'] = "12345" - print serial._launcher(defaults), "\n" + print(serial._launcher(defaults), "\n") qsub = Torque() serial.scheduler = qsub - print "scheduled serial launch:", serial + print("scheduled serial launch:", serial) settings = {'program':"tools.py", 'progargs':'12345'} - print serial._launcher(settings), "\n" + print(serial._launcher(settings), "\n") mpi = Mpi() - print "non-scheduled parallel launch:", mpi - print mpi._launcher(settings), "\n" + print("non-scheduled parallel launch:", mpi) + print(mpi._launcher(settings), "\n") qsub.nodes = '4:ppn=2' mpi.nodes = mpi.njobs(qsub.nodes) - print "scheduled parallel launch:", mpi, "| Torque" - print qsub._submit(mpi._launcher(settings)), "\n" + print("scheduled parallel launch:", mpi, "| Torque") + print(qsub._submit(mpi._launcher(settings)), "\n") mpi.scheduler = qsub - print "scheduled parallel launch:", mpi - print mpi._launcher(settings), "\n" + print("scheduled parallel launch:", mpi) + print(mpi._launcher(settings), "\n") _mpi = Mpi(scheduler=Torque(nodes='4:ppn=2')) - print "scheduled parallel launch:", _mpi - print _mpi._launcher(settings), "\n" + print("scheduled parallel launch:", _mpi) + print(_mpi._launcher(settings), "\n") _mpi = TorqueMpi(nodes='4:ppn=2') - print "scheduled parallel launch:", _mpi - print _mpi._launcher(settings), "\n" + print("scheduled parallel launch:", _mpi) + print(_mpi._launcher(settings), "\n") qsub.nodes = 1 serial = SerialMapper() - print "scheduled serial launch:", serial, "| Torque" - print qsub._submit(serial._launcher(settings)), "\n" + print("scheduled serial launch:", serial, "| Torque") + print(qsub._submit(serial._launcher(settings)), "\n") if __name__ == '__main__': diff --git a/examples/test_pmap.py b/examples/test_pmap.py index f420c6a..05f9b41 100755 --- a/examples/test_pmap.py +++ b/examples/test_pmap.py @@ -38,11 +38,11 @@ def func(input): for i in range(20): if world.rank == 0: - print "iteration %d" % i + print("iteration %d" % i) out = parallel_map(func, inputlist, comm = world) if world.rank == 0: - print ''.join(out) + print(''.join(out)) # End of file diff --git a/examples/which.py b/examples/which.py index a720a33..92900ad 100755 --- a/examples/which.py +++ b/examples/which.py @@ -14,12 +14,12 @@ """ import sys -print(sys.version) -print(sys.executable) +print((sys.version)) +print((sys.executable)) try: import mpi4py - print("mpi4py %s is installed" % getattr(mpi4py, '__version__', '')) + print(("mpi4py %s is installed" % getattr(mpi4py, '__version__', ''))) except ImportError: print("mpi4py not installed") exit() @@ -29,7 +29,7 @@ try: import pathos - print("pathos %s is installed" % getattr(pathos, '__version__', '')) + print(("pathos %s is installed" % getattr(pathos, '__version__', ''))) except ImportError: print("pathos not installed") exit() @@ -51,7 +51,7 @@ try: import pyina - print("pyina %s is installed" % getattr(pyina, '__version__', '')) + print(("pyina %s is installed" % getattr(pyina, '__version__', ''))) except ImportError: print("pyina not installed") except: diff --git a/examples_other/hello.py b/examples_other/hello.py index 0ebb07b..7642806 100755 --- a/examples_other/hello.py +++ b/examples_other/hello.py @@ -32,7 +32,7 @@ def _configure(self): def main(self, *args, **kwargs): from pyina import mpi - print "hello from mpi.world.rank --> %s " % mpi.world.rank + print("hello from mpi.world.rank --> %s " % mpi.world.rank) return diff --git a/examples_other/mpd_trace.py b/examples_other/mpd_trace.py index ddae6f7..3afa2e6 100755 --- a/examples_other/mpd_trace.py +++ b/examples_other/mpd_trace.py @@ -13,15 +13,15 @@ CPI = "~/src/mpich-1.0.7/examples/cpi" command = 'mpdtrace' -print "\nlaunch: %s" % command +print("\nlaunch: %s" % command) subprocess.call(command, shell=True) command = 'mpdringtest 10' -print "\nlaunch: %s" % command +print("\nlaunch: %s" % command) subprocess.call(command, shell=True) command = 'mpiexec -n 4 hostname' -print "\nlaunch: %s" % command +print("\nlaunch: %s" % command) subprocess.call(command, shell=True) #command = 'mpiexec -n 1 %s' % CPI diff --git a/examples_other/mpipyre1.py b/examples_other/mpipyre1.py index 1894d40..dfeb54f 100755 --- a/examples_other/mpipyre1.py +++ b/examples_other/mpipyre1.py @@ -35,7 +35,7 @@ def main(self): for peer in range(1, world.size): status = mpi.Status() message = world.recv(source=MPI_ANY_SOURCE,tag=17) - print "[%d/%d]: received {%s}" % (world.rank, world.size, message) + print("[%d/%d]: received {%s}" % (world.rank, world.size, message)) else: s = "My message is this: I am node %d " % world.rank logging.debug("%s" % s) diff --git a/examples_other/mpipyre2.py b/examples_other/mpipyre2.py index e9e0c16..3b52740 100755 --- a/examples_other/mpipyre2.py +++ b/examples_other/mpipyre2.py @@ -27,10 +27,10 @@ def main(self): if world.rank == root: str = "hello world" nn = world.bcast(str, root) - print "Master has: %s " % nn + print("Master has: %s " % nn) else: nn = world.bcast("", root) - print "Slave (%d) has: %s " % (world.rank, nn) + print("Slave (%d) has: %s " % (world.rank, nn)) return def _defaults(self): diff --git a/examples_other/test_parallelmap.py b/examples_other/test_parallelmap.py index 8c3e66b..f0fbeb6 100755 --- a/examples_other/test_parallelmap.py +++ b/examples_other/test_parallelmap.py @@ -34,9 +34,9 @@ def func(input): """ from subprocess import call if type(input) == list: - print "Executing: %s" % ' '.join(input) + print("Executing: %s" % ' '.join(input)) else: - print "Executing: %s" % input + print("Executing: %s" % input) retcode = call(input) return retcode @@ -46,7 +46,7 @@ def runshell(input): """ from pyina import mpi from subprocess import Popen, PIPE - print "%d of %d: executing: %s" % (mpi.world.rank, mpi.world.size, input) + print("%d of %d: executing: %s" % (mpi.world.rank, mpi.world.size, input)) pipe = Popen(input, shell=True, stdout=PIPE).stdout pipe.readlines() return 0 diff --git a/examples_other/test_ports.py b/examples_other/test_ports.py index e61ea64..28d4202 100755 --- a/examples_other/test_ports.py +++ b/examples_other/test_ports.py @@ -28,7 +28,7 @@ def main(self): #FIXME: How to set up a port in mpi4py? port = world.port(mpi.ANY_SOURCE, tag=17) message = port.receive() - print "[%d/%d]: received {%s}" % (world.rank, world.size, message) + print("[%d/%d]: received {%s}" % (world.rank, world.size, message)) else: s = "My message is this: I am node %d " % world.rank logging.debug("%s" % s) diff --git a/pyina/__init__.py b/pyina/__init__.py index 7347d19..42ef584 100644 --- a/pyina/__init__.py +++ b/pyina/__init__.py @@ -8,8 +8,8 @@ # get version numbers, license, and long description try: - from info import this_version as __version__ - from info import readme as __doc__, license as __license__ + from pyina.info import this_version as __version__ + from pyina.info import readme as __doc__, license as __license__ except ImportError: msg = """First run 'python setup.py build' to build pyina.""" raise ImportError(msg) @@ -25,17 +25,18 @@ # shortcuts # launchers -import launchers -import schedulers +import pyina.launchers as launchers +import pyina.schedulers as schedulers # mappers -import mpi +import pyina.mpi as mpi # strategies -import mpi_scatter, mpi_pool +import pyina.mpi_scatter as mpi_scatter +import pyina.mpi_pool as mpi_pool # tools -from tools import * +from .tools import * # backward compatibility parallel_map = mpi_pool @@ -48,12 +49,12 @@ def license(): """print license""" - print __license__ + print(__license__) return def citation(): """print citation""" - print __doc__[-499:-140] + print((__doc__[-499:-140])) return # end of file diff --git a/pyina/ez_map.py b/pyina/ez_map.py index dc5b7fc..9c094e8 100755 --- a/pyina/ez_map.py +++ b/pyina/ez_map.py @@ -59,10 +59,10 @@ from pyina.mpi import defaults as ezdefaults ezdefaults.update(defaults) -from launchers import launch, mpirun_tasks, srun_tasks, aprun_tasks -from launchers import serial_launcher, mpirun_launcher, srun_launcher -from launchers import aprun_launcher, torque_launcher, moab_launcher -from schedulers import torque_scheduler, moab_scheduler +from .launchers import launch, mpirun_tasks, srun_tasks, aprun_tasks +from .launchers import serial_launcher, mpirun_launcher, srun_launcher +from .launchers import aprun_launcher, torque_launcher, moab_launcher +from .schedulers import torque_scheduler, moab_scheduler HOLD = [] sleeptime = 30 #XXX: the time between checking for results @@ -87,24 +87,24 @@ def ez_map(func, *arglist, **kwds): import os.path, tempfile, subprocess from pyina.tools import which_strategy # mapper = None (allow for use of default mapper) - if kwds.has_key('mapper'): + if 'mapper' in kwds: mapper = kwds['mapper'] if mapper() == "mpi_pool": scatter = False elif mapper() == "mpi_scatter": scatter = True - else: raise NotImplementedError, "Mapper '%s' not found." % mapper() + else: raise NotImplementedError("Mapper '%s' not found." % mapper()) ezdefaults['program'] = which_strategy(scatter, lazy=True) # override the defaults - if kwds.has_key('nnodes'): ezdefaults['nodes'] = kwds['nnodes'] - if kwds.has_key('nodes'): ezdefaults['nodes'] = kwds['nodes'] - if kwds.has_key('timelimit'): ezdefaults['timelimit'] = kwds['timelimit'] - if kwds.has_key('queue'): ezdefaults['queue'] = kwds['queue'] + if 'nnodes' in kwds: ezdefaults['nodes'] = kwds['nnodes'] + if 'nodes' in kwds: ezdefaults['nodes'] = kwds['nodes'] + if 'timelimit' in kwds: ezdefaults['timelimit'] = kwds['timelimit'] + if 'queue' in kwds: ezdefaults['queue'] = kwds['queue'] # set the scheduler & launcher (or use the given default) - if kwds.has_key('launcher'): launcher = kwds['launcher'] + if 'launcher' in kwds: launcher = kwds['launcher'] else: launcher = mpirun_launcher #XXX: default = non_mpi? - if kwds.has_key('scheduler'): scheduler = kwds['scheduler'] + if 'scheduler' in kwds: scheduler = kwds['scheduler'] else: scheduler = '' # set scratch directory (most often required for queue launcher) - if kwds.has_key('workdir'): ezdefaults['workdir'] = kwds['workdir'] + if 'workdir' in kwds: ezdefaults['workdir'] = kwds['workdir'] else: if launcher in [torque_launcher, moab_launcher] \ or scheduler in [torque_scheduler, moab_scheduler]: @@ -184,7 +184,7 @@ def ez_map(func, *arglist, **kwds): #subprocess.call('cp -f %s resfile.py' % resfilename, shell=True) # pickled list of output # read result back - res = pickle.load(open(resfilename,'r')) + res = pickle.load(open(resfilename,'rb')) subprocess.call('rm -f %s' % resfilename, shell=True) subprocess.call('rm -f %sc' % modfile.name, shell=True) return res @@ -210,24 +210,24 @@ def ez_map2(func, *arglist, **kwds): import os.path, tempfile, subprocess from pyina.tools import which_strategy # mapper = None (allow for use of default mapper) - if kwds.has_key('mapper'): + if 'mapper' in kwds: mapper = kwds['mapper'] if mapper() == "mpi_pool": scatter = False elif mapper() == "mpi_scatter": scatter = True - else: raise NotImplementedError, "Mapper '%s' not found." % mapper() + else: raise NotImplementedError("Mapper '%s' not found." % mapper()) ezdefaults['program'] = which_strategy(scatter, lazy=True) # override the defaults - if kwds.has_key('nnodes'): ezdefaults['nodes'] = kwds['nnodes'] - if kwds.has_key('nodes'): ezdefaults['nodes'] = kwds['nodes'] - if kwds.has_key('timelimit'): ezdefaults['timelimit'] = kwds['timelimit'] - if kwds.has_key('queue'): ezdefaults['queue'] = kwds['queue'] + if 'nnodes' in kwds: ezdefaults['nodes'] = kwds['nnodes'] + if 'nodes' in kwds: ezdefaults['nodes'] = kwds['nodes'] + if 'timelimit' in kwds: ezdefaults['timelimit'] = kwds['timelimit'] + if 'queue' in kwds: ezdefaults['queue'] = kwds['queue'] # set the scheduler & launcher (or use the given default) - if kwds.has_key('launcher'): launcher = kwds['launcher'] + if 'launcher' in kwds: launcher = kwds['launcher'] else: launcher = mpirun_launcher #XXX: default = non_mpi? - if kwds.has_key('scheduler'): scheduler = kwds['scheduler'] + if 'scheduler' in kwds: scheduler = kwds['scheduler'] else: scheduler = '' # set scratch directory (most often required for queue launcher) - if kwds.has_key('workdir'): ezdefaults['workdir'] = kwds['workdir'] + if 'workdir' in kwds: ezdefaults['workdir'] = kwds['workdir'] else: if launcher in [torque_launcher, moab_launcher] \ or scheduler in [torque_scheduler, moab_scheduler]: @@ -299,12 +299,12 @@ def ez_map2(func, *arglist, **kwds): subprocess.call('rm -f %s' % errfilename, shell=True) # read result back - res = pickle.load(open(resfilename,'r')) + res = pickle.load(open(resfilename,'rb')) subprocess.call('rm -f %s' % resfilename, shell=True) return res if __name__ == '__main__': - print "simple tests are in examples/test_ezmap*.py" + print("simple tests are in examples/test_ezmap*.py") # end of file diff --git a/pyina/launchers.py b/pyina/launchers.py index bda342e..3ed2865 100755 --- a/pyina/launchers.py +++ b/pyina/launchers.py @@ -85,7 +85,7 @@ from pyina.mpi import Mapper, defaults from pathos.abstract_launcher import AbstractWorkerPool from pathos.helpers import cpu_count -from schedulers import Torque, Moab, Lsf +from pyina.schedulers import Torque, Moab, Lsf import logging log = logging.getLogger("launchers") @@ -146,7 +146,7 @@ def __init__(self, *args, **kwds): Mapper.__init__(self, *args, **kwds) self.scatter = bool(kwds.get('scatter', False)) #XXX: hang w/ nodes=1 ? #self.nodes = kwds.get('nodes', None) - if not len(args) and not kwds.has_key('nodes'): + if not len(args) and 'nodes' not in kwds: if self.scheduler: self.nodes = self.scheduler.nodes else: @@ -447,7 +447,7 @@ def launch(command): subproc = mapper._Mapper__launch(command) #pid = subproc.pid error = subproc.wait() # block until all done - if error: raise IOError, "launch failed: %s" % command + if error: raise IOError("launch failed: %s" % command) return error @@ -545,7 +545,7 @@ def torque_launcher(kdict={}): #FIXME: update """ mydict = defaults.copy() mydict.update(kdict) - from schedulers import torque_scheduler + from .schedulers import torque_scheduler torque = torque_scheduler() #FIXME: hackery if mydict['scheduler'] == torque.srun: mydict['tasks'] = srun_tasks(mydict['nodes']) @@ -575,7 +575,7 @@ def moab_launcher(kdict={}): #FIXME: update """ mydict = defaults.copy() mydict.update(kdict) - from schedulers import moab_scheduler + from .schedulers import moab_scheduler moab = moab_scheduler() #FIXME: hackery if mydict['scheduler'] == moab.mpirun: mydict['tasks'] = mpirun_tasks(mydict['nodes']) @@ -622,13 +622,13 @@ def lsfgm_launcher(kdict={}): #FIXME: update def all_launchers(): - import launchers + import pyina.launchers as launchers L = ["launchers.%s" % f for f in dir(launchers) if f[-8:] == "launcher"] return L def all_launches(kdict = {}): - import launchers, traceback, os.path + import pyina.launchers as launchers, traceback, os.path stack = traceback.extract_stack() caller = stack[ -min(len(stack),2) ][0] # @@ -636,7 +636,7 @@ def all_launches(kdict = {}): defaults['progname'] = os.path.basename(caller) # for key in defaults.keys(): - if not kdict.has_key(key): + if key not in kdict: kdict[key] = defaults[key] L = all_launchers() # @@ -681,11 +681,11 @@ def __launch(): # from mystic import helputil # helputil.paginate(__launch()) - print "python launch" + print("python launch") defaults['program'] = "tools.py" launch(serial_launcher(defaults)) - print "serial launch" + print("serial launch") settings = {'python':'', 'program':"hostname"} launch(serial_launcher(settings)) diff --git a/pyina/mappers.py b/pyina/mappers.py index afec8c9..ee078fd 100755 --- a/pyina/mappers.py +++ b/pyina/mappers.py @@ -43,6 +43,6 @@ def all_mappers(): if __name__=='__main__': - print all_mappers() + print(all_mappers()) # EOF diff --git a/pyina/mpi.py b/pyina/mpi.py index fd11229..85263e9 100644 --- a/pyina/mpi.py +++ b/pyina/mpi.py @@ -169,7 +169,7 @@ def __launch(self, command): executable = command.split("|")[-1].split()[0] from pox import which if not which(executable): - raise IOError, "launch failed: %s not found" % executable + raise IOError("launch failed: %s not found" % executable) return Popen([command], shell=True) #FIXME: shell=True is insecure def _launcher(self, kdict={}): """prepare launch command based on current settings @@ -302,11 +302,11 @@ def map(self, func, *args, **kwds): from time import sleep sleep(1); counter += 1 if counter >= maxcount: - print "Warning: exceeded timeout (%s s)" % maxcount + print("Warning: exceeded timeout (%s s)" % maxcount) break #print "after wait" # read result back - res = dill.load(open(resfilename,'r')) + res = dill.load(open(resfilename,'rb')) #print "got result" except: error = True @@ -319,7 +319,7 @@ def map(self, func, *args, **kwds): self._cleanup(resfilename, modfile.name, argfile.name) if self.scheduler and not _SAVE[0]: self.scheduler._cleanup() if error: - raise IOError, "launch failed: %s" % command + raise IOError("launch failed: %s" % command) return res #def imap(self, func, *args, **kwds): # """'non-blocking' and 'ordered' diff --git a/pyina/mpi_pool.py b/pyina/mpi_pool.py index e859313..bf8ac4c 100644 --- a/pyina/mpi_pool.py +++ b/pyina/mpi_pool.py @@ -6,7 +6,9 @@ # License: 3-clause BSD. The full license text is available at: # - https://github.com/uqfoundation/pyina/blob/master/LICENSE -from itertools import izip +import sys +if sys.version < "3": + from itertools import izip as zip from mpi4py import MPI as mpi import dill try: @@ -35,7 +37,7 @@ def _debug(boolean): def __queue(*inputs): "iterator that groups inputs by index (i.e. [(x[0], a[0]),(x[1], a[1])])" - return izip(*inputs) + return zip(*inputs) def __index(*inputs): """build an index iterator for the given inputs""" @@ -56,20 +58,20 @@ def parallel_map(func, *seq, **kwds): if rank == master: log.info("size: %s, NJOBS: %s, nodes: %s, skip: %s" % (size, NJOBS, nodes, skip)) if nodes == 1: # the pool is just the master - if skip: raise ValueError, "There must be at least one worker node" + if skip: raise ValueError("There must be at least one worker node") return map(func, *seq) # spawn a separate process for jobs running on the master if not skip: pool = MPool(1) #XXX: poor pickling... use iSend/iRecv instead? #input = queue.next() #XXX: receiving the *data* - input = lookup(seq, queue.next()) #XXX: receives an *index* + input = lookup(seq, next(queue)) #XXX: receives an *index* log.info("MASTER SEND'ING(0)") mresult, mjobid = pool.apply_async(func, args=input), 0 # farm out to workers: 1-N for indexing, 0 reserved for termination for worker in range(1, nodes): #XXX: don't run on master... # master send next job to worker 'worker' with tag='worker' log.info("WORKER SEND'ING(%s)" % (worker-skip,)) - comm.send(queue.next(), worker, worker) + comm.send(next(queue), worker, worker) # start receiving recvjob = 0; donejob = 0 @@ -89,7 +91,7 @@ def parallel_map(func, *seq, **kwds): if (sendjob-skip < NJOBS): # then workers are not done # master send next job to worker 'sender' with tag='jobid' log.info("WORKER SEND'ING(%s)" % (sendjob-skip)) - input = queue.next() + input = next(queue) comm.send(input, sender, sendjob) sendjob += 1 else: # workers are done @@ -108,7 +110,7 @@ def parallel_map(func, *seq, **kwds): if (sendjob < NJOBS): log.info("MASTER SEND'ING(%s)" % sendjob) #input = queue.next() #XXX: receiving the *data* - input = lookup(seq, queue.next()) #XXX: receives an *index* + input = lookup(seq, next(queue)) #XXX: receives an *index* mresult, mjobid = pool.apply_async(func, args=input),sendjob sendjob += 1 else: mresult.ready = lambda : False @@ -143,9 +145,9 @@ def squared(x): return x**2 x = range(10) y = parallel_map(squared, x)#, onall=False) if rank == master: - print "f: %s" % squared.__name__ - print "x: %s" % x - print "y: %s" % y + print(("f: %s" % squared.__name__)) + print(("x: %s" % x)) + print(("y: %s" % y)) # EOF diff --git a/pyina/mpi_scatter.py b/pyina/mpi_scatter.py index f4ebbf5..efc5214 100644 --- a/pyina/mpi_scatter.py +++ b/pyina/mpi_scatter.py @@ -6,7 +6,6 @@ # License: 3-clause BSD. The full license text is available at: # - https://github.com/uqfoundation/pyina/blob/master/LICENSE -from itertools import izip from mpi4py import MPI as mpi import dill try: @@ -30,7 +29,7 @@ def __queue(*inputs): #NJOBS = len(inputs[0]) #return (lookup(inputs, *get_workload(i, size, NJOBS, skip=__SKIP[0])) for i in range(size)) load = __index(*inputs) - return (lookup(inputs, load.next()) for i in range(size)) + return (lookup(inputs, next(load)) for i in range(size)) def __index(*inputs): """build an index iterator for the given inputs""" @@ -45,7 +44,7 @@ def parallel_map(func, *seq, **kwds): if skip is False: skip = None else: if size is 1: - raise ValueError, "There must be at least one worker node" + raise ValueError("There must be at least one worker node") skip = master __SKIP[0] = skip @@ -56,11 +55,11 @@ def parallel_map(func, *seq, **kwds): if rank == master: # each processor needs to do its set of jobs. - message = queue.next() + message = next(queue) # send jobs to workers for worker in range(1, size): # master sending seq[ib:ie] to worker 'worker' - comm.send(queue.next(), worker, 0) + comm.send(next(queue), worker, 0) else: # worker 'rank' receiving job status = mpi.Status() @@ -69,7 +68,7 @@ def parallel_map(func, *seq, **kwds): # now message is the part of seq that each worker has to do # result = map(func, *message) #XXX: receiving the *data* - result = map(func, *lookup(seq, *message)) #XXX: receives an *index* + result = list(map(func, *lookup(seq, *message))) #XXX: receives an *index* if rank == master: _b, _e = get_workload(rank, size, NJOBS, skip=skip) @@ -103,9 +102,9 @@ def squared(x): return x**2 x = range(10) y = parallel_map(squared, x)#, onall=False) if rank == master: - print "f: %s" % squared.__name__ - print "x: %s" % x - print "y: %s" % y + print("f: %s" % squared.__name__) + print("x: %s" % x) + print("y: %s" % y) # EOF diff --git a/pyina/schedulers.py b/pyina/schedulers.py index 05d3110..d737ee1 100755 --- a/pyina/schedulers.py +++ b/pyina/schedulers.py @@ -110,7 +110,7 @@ def __init(self, *args, **kwds): try: nodes = kwds['nodes'] msg = "got multiple values for keyword argument 'nodes'" - raise TypeError, msg + raise TypeError(msg) except KeyError: nodes = args[0] else: nodes = kwds.get('nodes', self.__nodes) @@ -145,11 +145,11 @@ def fetch(self, outfile, subproc=None): #FIXME: call fetch after submit??? """fetch result from the results file""" try: error = subproc.wait() # block until all done - res = pickle.load(open(outfile,'r')) + res = pickle.load(open(outfile,'rb')) except: error = True if error: - raise IOError, "fetch failed: %s" % outfile + raise IOError("fetch failed: %s" % outfile) return res def _submit(self, command, kdict={}): """prepare the given command for the scheduler @@ -168,7 +168,7 @@ def submit(self, command): subproc = self.__launch(command) #pid = subproc.pid error = subproc.wait() # block until all done - if error: raise IOError, "launch failed: %s" % command + if error: raise IOError("launch failed: %s" % command) return error #self._cleanup() return @@ -178,7 +178,7 @@ def __launch(self, command): executable = command.split("|")[-1].split()[0] from pox.shutils import which if not which(executable): - raise IOError, "launch failed: %s not found" % executable + raise IOError("launch failed: %s not found" % executable) return Popen([command], shell=True) #FIXME: shell=True is insecure def __repr__(self): subargs = (self.__class__.__name__, self.nodes, self.timelimit, self.queue) @@ -323,12 +323,12 @@ class moab_scheduler(object): pass def all_schedulers(): - import schedulers + import pyina.schedulers as schedulers L = ["schedulers.%s" % f for f in dir(schedulers) if f[-9:] == "scheduler"] return L if __name__=='__main__': - print all_schedulers() + print(all_schedulers()) # EOF diff --git a/pyina/tools.py b/pyina/tools.py index 397649b..3935960 100755 --- a/pyina/tools.py +++ b/pyina/tools.py @@ -27,7 +27,7 @@ def ensure_mpi(size = 1, doc = None): mpisize = world.Get_size() mpirank = world.Get_rank() if mpisize < size: - if mpirank == 0: print doc + if mpirank == 0: print(doc) import sys sys.exit() return @@ -38,7 +38,7 @@ def mpiprint(string="", end="\n", rank=0, comm=None): if comm is None: comm = world if not hasattr(rank, '__len__'): rank = (rank,) if comm.rank in rank: - print string+end, + print((string+end,)) #XXX: has light load on *last* proc, heavy/equal on first proc diff --git a/scripts/ezpool.py b/scripts/ezpool.py index c73cf76..9573a99 100755 --- a/scripts/ezpool.py +++ b/scripts/ezpool.py @@ -40,7 +40,7 @@ def _debug(boolean): if funcname.endswith('.pik'): # used pickled func workdir = None - func = pickle.load(open(funcname,'r')) + func = pickle.load(open(funcname,'rb')) else: # used tempfile for func workdir = sys.argv[4] sys.path = [workdir] + sys.path @@ -48,7 +48,7 @@ def _debug(boolean): module = __import__(modname) sys.path.pop(0) func = module.FUNC - args,kwds = pickle.load(open(argfilename,'r')) + args,kwds = pickle.load(open(argfilename,'rb')) if world.rank == 0: log.info('funcname: %s' % funcname) # sys.argv[1] @@ -62,7 +62,7 @@ def _debug(boolean): if world.rank == 0: log.info('res: %s' % str(res)) - pickle.dump(res, open(outfilename,'w')) + pickle.dump(res, open(outfilename,'wb')) # end of file diff --git a/scripts/ezscatter.py b/scripts/ezscatter.py index 21e8cf0..d85bc8c 100755 --- a/scripts/ezscatter.py +++ b/scripts/ezscatter.py @@ -40,7 +40,7 @@ def _debug(boolean): if funcname.endswith('.pik'): # used pickled func workdir = None - func = pickle.load(open(funcname,'r')) + func = pickle.load(open(funcname,'rb')) else: # used tempfile for func workdir = sys.argv[4] sys.path = [workdir] + sys.path @@ -48,7 +48,7 @@ def _debug(boolean): module = __import__(modname) sys.path.pop(0) func = module.FUNC - args,kwds = pickle.load(open(argfilename,'r')) + args,kwds = pickle.load(open(argfilename,'rb')) if world.rank == 0: log.info('funcname: %s' % funcname) # sys.argv[1] @@ -62,7 +62,7 @@ def _debug(boolean): if world.rank == 0: log.info('res: %s' % str(res)) - pickle.dump(res, open(outfilename,'w')) + pickle.dump(res, open(outfilename,'wb')) # end of file diff --git a/scripts/machines.py b/scripts/machines.py index a883a07..59c2776 100755 --- a/scripts/machines.py +++ b/scripts/machines.py @@ -36,9 +36,9 @@ def host(id): pool = MpiPool() pool.nodes = nnodes hostnames = pool.map(host, range(nnodes)) - print '\n'.join(hostnames) + print('\n'.join(hostnames)) except: - print __doc__ + print(__doc__) # end of file diff --git a/scripts/mpi_world.py b/scripts/mpi_world.py index f005ae2..51f0fa0 100755 --- a/scripts/mpi_world.py +++ b/scripts/mpi_world.py @@ -34,7 +34,7 @@ def launch(command,quiet=True): "launch a os.system command; if quiet, don't grab the output" - print "launch: %s" % command + print("launch: %s" % command) p = Popen(command, **popen4) p.stdin.close() if quiet is True: @@ -50,7 +50,7 @@ def alias(nnodes): node = str(nnodes) alias = "mpython%s='mpiexec -np %s `which python`'" % (node,node) command = "alias %s" % alias - print command + print(command) raise NotImplementedError #FIXME: alias doesn't stick to user's console try: launch(command) @@ -68,7 +68,7 @@ def set_master(): MASTERINFO = [master,int(port)] except: err = "did you run 'mpd &' first?" - raise Exception, err + raise (Exception, err) return MASTERINFO def set_slaves(nodelist,masterinfo=MASTERINFO): @@ -92,11 +92,11 @@ def kill_all(): if __name__=="__main__": import sys if sys.argv[-1] == "-kill": - print "killing all..." + print("killing all...") kill_all() elif len(sys.argv) > 2: if sys.argv[-2] == "-slaves": - print "seting up mpi..." + print("seting up mpi...") MASTERINFO = set_master() nodes = sys.argv[-1] nodes = nodes.strip('[()]').split(',') @@ -108,9 +108,9 @@ def kill_all(): # for node in nodes: # alias(int(node)) else: # "-help" - print __doc__ + print(__doc__) else: # "-help" - print __doc__ + print(__doc__) # End of file diff --git a/setup.py b/setup.py index 8387011..b68d5e7 100644 --- a/setup.py +++ b/setup.py @@ -328,18 +328,18 @@ def write_info_py(filename='pyina/info.py'): except ImportError: print("\n***********************************************************") print("WARNING: One of the following dependencies may be unresolved:") - print(" numpy %s" % numpy_version) - print(" dill %s" % dill_version) - print(" pox %s" % pox_version) - print(" pathos %s" % pathos_version) - print(" mpi4py %s" % mpi4py_version) + print((" numpy %s" % numpy_version)) + print((" dill %s" % dill_version)) + print((" pox %s" % pox_version)) + print((" pathos %s" % pathos_version)) + print((" mpi4py %s" % mpi4py_version)) # print(" pypar %s (optional)" % pypar_version) print("***********************************************************\n") if sdkroot_set: print("\n***********************************************************") print("WARNING: One of following variables was set to a default:") - print(" SDKROOT %s" % sdkroot) + print((" SDKROOT %s" % sdkroot)) print("***********************************************************\n") else: pass diff --git a/tests/test_ezmap.py b/tests/test_ezmap.py index 54e805d..7f012f8 100644 --- a/tests/test_ezmap.py +++ b/tests/test_ezmap.py @@ -39,23 +39,23 @@ def quad(x): square_plus_one = quad_factory(2,0,1) -x2 = map(squared, x) +x2 = list(map(squared, x)) def check_sanity(_map, nodes, verbose=False): if verbose: - print _map - print "x: %s\n" % str(x) + print(_map) + print(("x: %s\n" % str(x))) - print type, _map.__name__ + print((type, _map.__name__)) _config = {'type':"blocking", 'threads':False, 'nproc':nodes, 'ncpus':nodes} mapconfig = {'nodes':nodes} start = time.time() res = _map(squared, x, **mapconfig) end = time.time() - start if verbose: - print "time to results:", end - print "y: %s\n" % str(res) + print(( "time to results:", end)) + print(( "y: %s\n" % str(res))) assert res == x2 mapconfig.update(_config) @@ -73,20 +73,20 @@ def check_sanity(_map, nodes, verbose=False): def check_maps(_map, nodes, items=4, delay=0 ): - _x = range(-items/2,items/2,2) + _x = range(int(-items/2), int(items/2),2) _y = range(len(_x)) _d = [delay]*len(_x) _z = [0]*len(_x) #print map - res1 = map(busy_squared, _x) + res1 = list(map(busy_squared, _x)) mapconfig = {'nodes':nodes} #print _map _res1 = _map(busy_squared, _x, **mapconfig) assert _res1 == res1 - res2 = map(busy_add, _x, _y, _d) + res2 = list(map(busy_add, _x, _y, _d)) _res2 = _map(busy_add, _x, _y, _d, **mapconfig) assert _res2 == res2 #print "" diff --git a/tests/test_map.py b/tests/test_map.py index f840ba7..51aff61 100644 --- a/tests/test_map.py +++ b/tests/test_map.py @@ -22,20 +22,20 @@ def busy_add(x,y, delay=0.01): def timed_pool(pool, items=100, delay=0.1, verbose=False): - _x = range(-items/2,items/2,2) + _x = range(int(-items/2), int(items/2), 2) _y = range(len(_x)) _d = [delay]*len(_x) - if verbose: print pool + if verbose: print( pool) import time start = time.time() res = pool.map(busy_add, _x, _y, _d) _t = time.time() - start - if verbose: print "time to queue:", _t + if verbose: print(("time to queue:", _t)) start = time.time() _sol_ = list(res) t_ = time.time() - start - if verbose: print "time to results:", t_, "\n" + if verbose: print(("time to results:", t_, "\n")) return _sol_ @@ -82,9 +82,9 @@ def test_source(): #_debug(True) if verbose: - print "CONFIG: delay = %s" % delay - print "CONFIG: items = %s" % items - print "" + print(("CONFIG: delay = %s" % delay)) + print(("CONFIG: items = %s" % items)) + print("") test_nosource() test_source() diff --git a/tests/test_pool.py b/tests/test_pool.py index 53dbee3..4039f6b 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -11,8 +11,12 @@ def run_source(obj): _obj = source._wrap(obj) assert _obj(1.57) == obj(1.57) src = source.importable(obj, alias='_f') - exec src in globals(), locals() - assert _f(1.57) == obj(1.57) + # LEEK: for 3.x, locals may not be modified + # (see https://docs.python.org/3.6/library/functions.html#locals) + # + my_locals = locals() + exec(src, globals(), my_locals) + assert my_locals["_f"](1.57) == obj(1.57) name = source.getname(obj) assert name == obj.__name__ or src.split("=",1)[0].strip() @@ -25,7 +29,7 @@ def run_pool(obj): from pyina.launchers import Mpi p = Mpi(2) x = [1,2,3] - y = map(obj, x) + y = list(map(obj, x)) p.scatter = False assert p.map(obj, x) == y p.source = True diff --git a/tests/test_star.py b/tests/test_star.py index 2e48a6c..17f50f1 100644 --- a/tests/test_star.py +++ b/tests/test_star.py @@ -38,23 +38,23 @@ def quad(x): square_plus_one = quad_factory(2,0,1) -x2 = map(squared, x) +x2 = list(map(squared, x)) def check_sanity(pool, verbose=False): if verbose: - print pool - print "x: %s\n" % str(x) + print(pool) + print("x: %s\n" % str(x)) - print pool.map.__name__ + print(pool.map.__name__) # blocking map start = time.time() res = pool.map(squared, x) end = time.time() - start assert res == x2 if verbose: - print "time to results:", end - print "y: %s\n" % str(res) + print("time to results:", end) + print("y: %s\n" % str(res)) # print pool.imap.__name__ # iterative map @@ -88,14 +88,14 @@ def check_sanity(pool, verbose=False): def check_maps(pool, items=4, delay=0): - _x = range(-items/2,items/2,2) + _x = range(int(-items/2),int(items/2),2) _y = range(len(_x)) _d = [delay]*len(_x) _z = [0]*len(_x) #print map - res1 = map(squared, _x) - res2 = map(busy_add, _x, _y, _z) + res1 = list(map(squared, _x)) + res2 = list(map(busy_add, _x, _y, _z)) #print pool.map _res1 = pool.map(squared, _x) @@ -125,10 +125,10 @@ def check_maps(pool, items=4, delay=0): def check_dill(pool, verbose=False): # test function that should fail in pickle if verbose: - print pool - print "x: %s\n" % str(x) + print(pool) + print("x: %s\n" % str(x)) - print pool.map.__name__ + print(pool.map.__name__) #start = time.time() try: res = pool.map(square_plus_one, x) @@ -136,12 +136,12 @@ def check_dill(pool, verbose=False): # test function that should fail in pickle assert False # should use a smarter test here... #end = time.time() - start # print "time to results:", end - print "y: %s\n" % str(res) + print("y: %s\n" % str(res)) assert True def check_ready(pool, maxtries, delay, verbose=True): - if verbose: print pool + if verbose: print(pool) m = pool.amap(busy_squared, x)# x) # print m.ready() @@ -150,14 +150,14 @@ def check_ready(pool, maxtries, delay, verbose=True): while not m.ready(): time.sleep(delay) tries += 1 - if verbose: print "TRY: %s" % tries + if verbose: print("TRY: %s" % tries) if tries >= maxtries: - if verbose: print "TIMEOUT" + if verbose: print("TIMEOUT") break #print m.ready() # print m.get(0) res = m.get() - if verbose: print res + if verbose: print(res) z = [0]*len(x) assert res == map(squared, x)# x, z) assert tries > 0 diff --git a/tests/test_with.py b/tests/test_with.py index 63b15f1..a13aa8c 100644 --- a/tests/test_with.py +++ b/tests/test_with.py @@ -8,7 +8,9 @@ from __future__ import with_statement from time import sleep -from itertools import izip +import sys +if sys.version < "3": + from itertools import izip as zip PRIMES = [ @@ -47,7 +49,7 @@ def run_with_multipool(Pool): #XXX: amap and imap -- NotImplementedError with Pool() as pool3: #for number, prime in izip(PRIMES, pool3.imap(is_prime, PRIMES)): - for number, prime in izip(PRIMES, pool3.map(is_prime, PRIMES)): + for number, prime in zip(PRIMES, pool3.map(is_prime, PRIMES)): assert prime if number != PRIMES[-1] else not prime #print ('%d is prime: %s' % (number, prime))