From 7626ec4579c15371d73ac8485adbb9675ca4b7c3 Mon Sep 17 00:00:00 2001 From: Todd Gamblin Date: Fri, 29 Jun 2018 18:10:36 -0400 Subject: refactor: move spack.util.multiproc to llnl.util.multiproc - multiproc doesn't depend on Spack - llnl.util.lock test uses it, but shouldn't use parts of Spack. --- lib/spack/llnl/util/multiproc.py | 96 ++++++++++++++++++++++++++++++++ lib/spack/spack/architecture.py | 12 ++-- lib/spack/spack/compiler.py | 4 +- lib/spack/spack/operating_systems/cnl.py | 4 +- lib/spack/spack/test/llnl/util/lock.py | 4 +- lib/spack/spack/util/multiproc.py | 96 -------------------------------- 6 files changed, 108 insertions(+), 108 deletions(-) create mode 100644 lib/spack/llnl/util/multiproc.py delete mode 100644 lib/spack/spack/util/multiproc.py diff --git a/lib/spack/llnl/util/multiproc.py b/lib/spack/llnl/util/multiproc.py new file mode 100644 index 0000000000..2bf5d1a200 --- /dev/null +++ b/lib/spack/llnl/util/multiproc.py @@ -0,0 +1,96 @@ +############################################################################## +# Copyright (c) 2013-2018, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory. +# +# This file is part of Spack. +# Created by Todd Gamblin, tgamblin@llnl.gov, All rights reserved. +# LLNL-CODE-647188 +# +# For details, see https://github.com/spack/spack +# Please also see the NOTICE and LICENSE files for our notice and the LGPL. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License (as +# published by the Free Software Foundation) version 2.1, February 1999. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the IMPLIED WARRANTY OF +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the terms and +# conditions of the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +############################################################################## +""" +This implements a parallel map operation but it can accept more values +than multiprocessing.Pool.apply() can. For example, apply() will fail +to pickle functions if they're passed indirectly as parameters. +""" +from multiprocessing import Process, Pipe, Semaphore, Value + +__all__ = ['spawn', 'parmap', 'Barrier'] + + +def spawn(f): + def fun(pipe, x): + pipe.send(f(x)) + pipe.close() + return fun + + +def parmap(f, X): + pipe = [Pipe() for x in X] + proc = [Process(target=spawn(f), args=(c, x)) + for x, (p, c) in zip(X, pipe)] + [p.start() for p in proc] + [p.join() for p in proc] + return [p.recv() for (p, c) in pipe] + + +class Barrier: + """Simple reusable semaphore barrier. + + Python 2.6 doesn't have multiprocessing barriers so we implement this. + + See http://greenteapress.com/semaphores/downey08semaphores.pdf, p. 41. + """ + + def __init__(self, n, timeout=None): + self.n = n + self.to = timeout + self.count = Value('i', 0) + self.mutex = Semaphore(1) + self.turnstile1 = Semaphore(0) + self.turnstile2 = Semaphore(1) + + def wait(self): + if not self.mutex.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.count.value += 1 + if self.count.value == self.n: + if not self.turnstile2.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile1.release() + self.mutex.release() + + if not self.turnstile1.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile1.release() + + if not self.mutex.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.count.value -= 1 + if self.count.value == 0: + if not self.turnstile1.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile2.release() + self.mutex.release() + + if not self.turnstile2.acquire(timeout=self.to): + raise BarrierTimeoutError() + self.turnstile2.release() + + +class BarrierTimeoutError(Exception): + pass diff --git a/lib/spack/spack/architecture.py b/lib/spack/spack/architecture.py index f7b1946c46..4ef61d569f 100644 --- a/lib/spack/spack/architecture.py +++ b/lib/spack/spack/architecture.py @@ -79,14 +79,14 @@ import os import inspect import platform as py_platform -from llnl.util.lang import memoized, list_modules, key_ordering +import llnl.util.multiproc as mp import llnl.util.tty as tty +from llnl.util.lang import memoized, list_modules, key_ordering import spack.paths import spack.error as serr from spack.util.naming import mod_to_class from spack.util.environment import get_path -from spack.util.multiproc import parmap from spack.util.spack_yaml import syaml_dict @@ -280,9 +280,9 @@ class OperatingSystem(object): # NOTE: we import spack.compilers here to avoid init order cycles import spack.compilers types = spack.compilers.all_compiler_types() - compiler_lists = parmap(lambda cmp_cls: - self.find_compiler(cmp_cls, *filtered_path), - types) + compiler_lists = mp.parmap( + lambda cmp_cls: self.find_compiler(cmp_cls, *filtered_path), + types) # ensure all the version calls we made are cached in the parent # process, as well. This speeds up Spack a lot. @@ -300,7 +300,7 @@ class OperatingSystem(object): prefixes, suffixes, and versions. e.g., gcc-mp-4.7 would be grouped with g++-mp-4.7 and gfortran-mp-4.7. """ - dicts = parmap( + dicts = mp.parmap( lambda t: cmp_cls._find_matches_in_path(*t), [(cmp_cls.cc_names, cmp_cls.cc_version) + tuple(path), (cmp_cls.cxx_names, cmp_cls.cxx_version) + tuple(path), diff --git a/lib/spack/spack/compiler.py b/lib/spack/spack/compiler.py index b219eee09d..afb18ddcac 100644 --- a/lib/spack/spack/compiler.py +++ b/lib/spack/spack/compiler.py @@ -27,11 +27,11 @@ import re import itertools import llnl.util.tty as tty +import llnl.util.multiproc as mp import spack.error import spack.spec import spack.architecture -from spack.util.multiproc import parmap from spack.util.executable import Executable, ProcessError from spack.util.environment import get_path @@ -280,7 +280,7 @@ class Compiler(object): key = (full_path,) + match.groups() + (detect_version,) checks.append(key) - successful = [k for k in parmap(_get_versioned_tuple, checks) + successful = [k for k in mp.parmap(_get_versioned_tuple, checks) if k is not None] # The 'successful' list is ordered like the input paths. diff --git a/lib/spack/spack/operating_systems/cnl.py b/lib/spack/spack/operating_systems/cnl.py index d5e8f46077..9be7fe641a 100644 --- a/lib/spack/spack/operating_systems/cnl.py +++ b/lib/spack/spack/operating_systems/cnl.py @@ -25,9 +25,9 @@ import re import llnl.util.tty as tty +import llnl.util.multiproc as mp from spack.architecture import OperatingSystem -from spack.util.multiproc import parmap from spack.util.module_cmd import get_module_cmd @@ -60,7 +60,7 @@ class Cnl(OperatingSystem): import spack.compilers types = spack.compilers.all_compiler_types() - compiler_lists = parmap( + compiler_lists = mp.parmap( lambda cmp_cls: self.find_compiler(cmp_cls, *paths), types) # ensure all the version calls we made are cached in the parent diff --git a/lib/spack/spack/test/llnl/util/lock.py b/lib/spack/spack/test/llnl/util/lock.py index 0060bc9273..9b32b9fd15 100644 --- a/lib/spack/spack/test/llnl/util/lock.py +++ b/lib/spack/spack/test/llnl/util/lock.py @@ -72,10 +72,10 @@ from multiprocessing import Process import pytest +import llnl.util.multiproc as mp from llnl.util.filesystem import touch, group_ids import spack.util.lock -from spack.util.multiproc import Barrier from spack.util.lock import Lock, WriteTransaction, ReadTransaction, LockError @@ -205,7 +205,7 @@ def lock_path(lock_dir): def local_multiproc_test(*functions): """Order some processes using simple barrier synchronization.""" - b = Barrier(len(functions), timeout=barrier_timeout) + b = mp.Barrier(len(functions), timeout=barrier_timeout) procs = [Process(target=f, args=(b,)) for f in functions] for p in procs: diff --git a/lib/spack/spack/util/multiproc.py b/lib/spack/spack/util/multiproc.py deleted file mode 100644 index 2bf5d1a200..0000000000 --- a/lib/spack/spack/util/multiproc.py +++ /dev/null @@ -1,96 +0,0 @@ -############################################################################## -# Copyright (c) 2013-2018, Lawrence Livermore National Security, LLC. -# Produced at the Lawrence Livermore National Laboratory. -# -# This file is part of Spack. -# Created by Todd Gamblin, tgamblin@llnl.gov, All rights reserved. -# LLNL-CODE-647188 -# -# For details, see https://github.com/spack/spack -# Please also see the NOTICE and LICENSE files for our notice and the LGPL. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License (as -# published by the Free Software Foundation) version 2.1, February 1999. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the IMPLIED WARRANTY OF -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the terms and -# conditions of the GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -############################################################################## -""" -This implements a parallel map operation but it can accept more values -than multiprocessing.Pool.apply() can. For example, apply() will fail -to pickle functions if they're passed indirectly as parameters. -""" -from multiprocessing import Process, Pipe, Semaphore, Value - -__all__ = ['spawn', 'parmap', 'Barrier'] - - -def spawn(f): - def fun(pipe, x): - pipe.send(f(x)) - pipe.close() - return fun - - -def parmap(f, X): - pipe = [Pipe() for x in X] - proc = [Process(target=spawn(f), args=(c, x)) - for x, (p, c) in zip(X, pipe)] - [p.start() for p in proc] - [p.join() for p in proc] - return [p.recv() for (p, c) in pipe] - - -class Barrier: - """Simple reusable semaphore barrier. - - Python 2.6 doesn't have multiprocessing barriers so we implement this. - - See http://greenteapress.com/semaphores/downey08semaphores.pdf, p. 41. - """ - - def __init__(self, n, timeout=None): - self.n = n - self.to = timeout - self.count = Value('i', 0) - self.mutex = Semaphore(1) - self.turnstile1 = Semaphore(0) - self.turnstile2 = Semaphore(1) - - def wait(self): - if not self.mutex.acquire(timeout=self.to): - raise BarrierTimeoutError() - self.count.value += 1 - if self.count.value == self.n: - if not self.turnstile2.acquire(timeout=self.to): - raise BarrierTimeoutError() - self.turnstile1.release() - self.mutex.release() - - if not self.turnstile1.acquire(timeout=self.to): - raise BarrierTimeoutError() - self.turnstile1.release() - - if not self.mutex.acquire(timeout=self.to): - raise BarrierTimeoutError() - self.count.value -= 1 - if self.count.value == 0: - if not self.turnstile1.acquire(timeout=self.to): - raise BarrierTimeoutError() - self.turnstile2.release() - self.mutex.release() - - if not self.turnstile2.acquire(timeout=self.to): - raise BarrierTimeoutError() - self.turnstile2.release() - - -class BarrierTimeoutError(Exception): - pass -- cgit v1.2.3-70-g09d2