summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTodd Gamblin <tgamblin@llnl.gov>2015-10-24 19:54:52 -0700
committerTodd Gamblin <tgamblin@llnl.gov>2015-10-24 19:54:52 -0700
commit908a93a470cafcb3fad8f7412e438e7f5b939d04 (patch)
tree95e3b4996d2ed4f3837a34a3df90b3a506d42441
parent5fda7daf57e2362c803d4f2152da93ed270818d9 (diff)
downloadspack-908a93a470cafcb3fad8f7412e438e7f5b939d04.tar.gz
spack-908a93a470cafcb3fad8f7412e438e7f5b939d04.tar.bz2
spack-908a93a470cafcb3fad8f7412e438e7f5b939d04.tar.xz
spack-908a93a470cafcb3fad8f7412e438e7f5b939d04.zip
Add a multiprocess Barrier class to use for testing parallel code.
-rw-r--r--lib/spack/spack/util/multiproc.py50
1 files changed, 49 insertions, 1 deletions
diff --git a/lib/spack/spack/util/multiproc.py b/lib/spack/spack/util/multiproc.py
index 9e045a090f..21cd6f543d 100644
--- a/lib/spack/spack/util/multiproc.py
+++ b/lib/spack/spack/util/multiproc.py
@@ -27,9 +27,11 @@ This implements a parallel map operation but it can accept more values
than multiprocessing.Pool.apply() can. For example, apply() will fail
to pickle functions if they're passed indirectly as parameters.
"""
-from multiprocessing import Process, Pipe
+from multiprocessing import Process, Pipe, Semaphore, Value
from itertools import izip
+__all__ = ['spawn', 'parmap', 'Barrier']
+
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
@@ -43,3 +45,49 @@ def parmap(f,X):
[p.join() for p in proc]
return [p.recv() for (p,c) in pipe]
+
+class Barrier:
+ """Simple reusable semaphore barrier.
+
+ Python 2.6 doesn't have multiprocessing barriers so we implement this.
+
+ See http://greenteapress.com/semaphores/downey08semaphores.pdf, p. 41.
+ """
+ def __init__(self, n, timeout=None):
+ self.n = n
+ self.to = timeout
+ self.count = Value('i', 0)
+ self.mutex = Semaphore(1)
+ self.turnstile1 = Semaphore(0)
+ self.turnstile2 = Semaphore(1)
+
+
+ def wait(self):
+ if not self.mutex.acquire(timeout=self.to):
+ raise BarrierTimeoutError()
+ self.count.value += 1
+ if self.count.value == self.n:
+ if not self.turnstile2.acquire(timeout=self.to):
+ raise BarrierTimeoutError()
+ self.turnstile1.release()
+ self.mutex.release()
+
+ if not self.turnstile1.acquire(timeout=self.to):
+ raise BarrierTimeoutError()
+ self.turnstile1.release()
+
+ if not self.mutex.acquire(timeout=self.to):
+ raise BarrierTimeoutError()
+ self.count.value -= 1
+ if self.count.value == 0:
+ if not self.turnstile1.acquire(timeout=self.to):
+ raise BarrierTimeoutError()
+ self.turnstile2.release()
+ self.mutex.release()
+
+ if not self.turnstile2.acquire(timeout=self.to):
+ raise BarrierTimeoutError()
+ self.turnstile2.release()
+
+
+class BarrierTimeoutError: pass