diff options
author | Todd Gamblin <tgamblin@llnl.gov> | 2015-10-24 19:54:52 -0700 |
---|---|---|
committer | Todd Gamblin <tgamblin@llnl.gov> | 2015-10-24 19:54:52 -0700 |
commit | 908a93a470cafcb3fad8f7412e438e7f5b939d04 (patch) | |
tree | 95e3b4996d2ed4f3837a34a3df90b3a506d42441 | |
parent | 5fda7daf57e2362c803d4f2152da93ed270818d9 (diff) | |
download | spack-908a93a470cafcb3fad8f7412e438e7f5b939d04.tar.gz spack-908a93a470cafcb3fad8f7412e438e7f5b939d04.tar.bz2 spack-908a93a470cafcb3fad8f7412e438e7f5b939d04.tar.xz spack-908a93a470cafcb3fad8f7412e438e7f5b939d04.zip |
Add a multiprocess Barrier class to use for testing parallel code.
-rw-r--r-- | lib/spack/spack/util/multiproc.py | 50 |
1 files changed, 49 insertions, 1 deletions
diff --git a/lib/spack/spack/util/multiproc.py b/lib/spack/spack/util/multiproc.py index 9e045a090f..21cd6f543d 100644 --- a/lib/spack/spack/util/multiproc.py +++ b/lib/spack/spack/util/multiproc.py @@ -27,9 +27,11 @@ This implements a parallel map operation but it can accept more values than multiprocessing.Pool.apply() can. For example, apply() will fail to pickle functions if they're passed indirectly as parameters. """ -from multiprocessing import Process, Pipe +from multiprocessing import Process, Pipe, Semaphore, Value from itertools import izip +__all__ = ['spawn', 'parmap', 'Barrier'] + def spawn(f): def fun(pipe,x): pipe.send(f(x)) @@ -43,3 +45,49 @@ def parmap(f,X): [p.join() for p in proc] return [p.recv() for (p,c) in pipe] + +class Barrier: + """Simple reusable semaphore barrier. + + Python 2.6 doesn't have multiprocessing barriers so we implement this. + + See http://greenteapress.com/semaphores/downey08semaphores.pdf, p. 41. + """ + def __init__(self, n, timeout=None): + self.n = n + self.to = timeout + self.count = Value('i', 0) + self.mutex = Semaphore(1) + self.turnstile1 = Semaphore(0) + self.turnstile2 = Semaphore(1) + + + def wait(self): + if not self.mutex.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.count.value += 1 + if self.count.value == self.n: + if not self.turnstile2.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile1.release() + self.mutex.release() + + if not self.turnstile1.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile1.release() + + if not self.mutex.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.count.value -= 1 + if self.count.value == 0: + if not self.turnstile1.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile2.release() + self.mutex.release() + + if not self.turnstile2.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile2.release() + + +class BarrierTimeoutError: pass |