From 908a93a470cafcb3fad8f7412e438e7f5b939d04 Mon Sep 17 00:00:00 2001 From: Todd Gamblin Date: Sat, 24 Oct 2015 19:54:52 -0700 Subject: Add a multiprocess Barrier class to use for testing parallel code. --- lib/spack/spack/util/multiproc.py | 50 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/lib/spack/spack/util/multiproc.py b/lib/spack/spack/util/multiproc.py index 9e045a090f..21cd6f543d 100644 --- a/lib/spack/spack/util/multiproc.py +++ b/lib/spack/spack/util/multiproc.py @@ -27,9 +27,11 @@ This implements a parallel map operation but it can accept more values than multiprocessing.Pool.apply() can. For example, apply() will fail to pickle functions if they're passed indirectly as parameters. """ -from multiprocessing import Process, Pipe +from multiprocessing import Process, Pipe, Semaphore, Value from itertools import izip +__all__ = ['spawn', 'parmap', 'Barrier'] + def spawn(f): def fun(pipe,x): pipe.send(f(x)) @@ -43,3 +45,49 @@ def parmap(f,X): [p.join() for p in proc] return [p.recv() for (p,c) in pipe] + +class Barrier: + """Simple reusable semaphore barrier. + + Python 2.6 doesn't have multiprocessing barriers so we implement this. + + See http://greenteapress.com/semaphores/downey08semaphores.pdf, p. 41. + """ + def __init__(self, n, timeout=None): + self.n = n + self.to = timeout + self.count = Value('i', 0) + self.mutex = Semaphore(1) + self.turnstile1 = Semaphore(0) + self.turnstile2 = Semaphore(1) + + + def wait(self): + if not self.mutex.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.count.value += 1 + if self.count.value == self.n: + if not self.turnstile2.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile1.release() + self.mutex.release() + + if not self.turnstile1.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile1.release() + + if not self.mutex.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.count.value -= 1 + if self.count.value == 0: + if not self.turnstile1.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile2.release() + self.mutex.release() + + if not self.turnstile2.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile2.release() + + +class BarrierTimeoutError: pass -- cgit v1.2.3-70-g09d2