summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMassimiliano Culpo <massimiliano.culpo@gmail.com>2021-10-19 17:09:34 +0200
committerGitHub <noreply@github.com>2021-10-19 10:09:34 -0500
commit2d45a9d6176eda9375515b77699a17cfd5221111 (patch)
treede860f32704a0f075b43d5c8a80ae3a05b843a08 /lib
parent64a323b22d7953ea644cf7a8f6861de800e42117 (diff)
downloadspack-2d45a9d6176eda9375515b77699a17cfd5221111.tar.gz
spack-2d45a9d6176eda9375515b77699a17cfd5221111.tar.bz2
spack-2d45a9d6176eda9375515b77699a17cfd5221111.tar.xz
spack-2d45a9d6176eda9375515b77699a17cfd5221111.zip
Speed-up environment concretization on linux with a process pool (#26264)
* Speed-up environment concretization with a process pool We can exploit the fact that the environment is concretized separately and use a pool of processes to concretize it. * Add module spack.util.parallel Module includes `pool` and `parallel_map` abstractions, along with implementation details for both. * Add a new hash type to pass specs across processes * Add tty msg with concretization time
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/spack/environment/environment.py67
-rw-r--r--lib/spack/spack/fetch_strategy.py16
-rw-r--r--lib/spack/spack/hash_types.py7
-rw-r--r--lib/spack/spack/modules/common.py12
-rw-r--r--lib/spack/spack/solver/asp.py2
-rw-r--r--lib/spack/spack/spec.py18
-rw-r--r--lib/spack/spack/test/cmd/dev_build.py2
-rw-r--r--lib/spack/spack/user_environment.py6
-rw-r--r--lib/spack/spack/util/parallel.py129
9 files changed, 232 insertions, 27 deletions
diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py
index 5a271fc6dd..5fce76118c 100644
--- a/lib/spack/spack/environment/environment.py
+++ b/lib/spack/spack/environment/environment.py
@@ -9,6 +9,7 @@ import os
import re
import shutil
import sys
+import time
import ruamel.yaml as yaml
import six
@@ -17,6 +18,8 @@ from ordereddict_backport import OrderedDict
import llnl.util.filesystem as fs
import llnl.util.tty as tty
+import spack.bootstrap
+import spack.compilers
import spack.concretize
import spack.config
import spack.error
@@ -28,10 +31,13 @@ import spack.schema.env
import spack.spec
import spack.stage
import spack.store
+import spack.subprocess_context
import spack.user_environment as uenv
+import spack.util.cpus
import spack.util.environment
import spack.util.hash
import spack.util.lock as lk
+import spack.util.parallel
import spack.util.path
import spack.util.spack_json as sjson
import spack.util.spack_yaml as syaml
@@ -1111,14 +1117,57 @@ class Environment(object):
self._add_concrete_spec(s, concrete, new=False)
# Concretize any new user specs that we haven't concretized yet
- concretized_specs = []
+ arguments, root_specs = [], []
for uspec, uspec_constraints in zip(
- self.user_specs, self.user_specs.specs_as_constraints):
+ self.user_specs, self.user_specs.specs_as_constraints
+ ):
if uspec not in old_concretized_user_specs:
- concrete = _concretize_from_constraints(uspec_constraints, tests=tests)
- self._add_concrete_spec(uspec, concrete)
- concretized_specs.append((uspec, concrete))
- return concretized_specs
+ root_specs.append(uspec)
+ arguments.append((uspec_constraints, tests))
+
+ # Ensure we don't try to bootstrap clingo in parallel
+ if spack.config.get('config:concretizer') == 'clingo':
+ with spack.bootstrap.ensure_bootstrap_configuration():
+ spack.bootstrap.ensure_clingo_importable_or_raise()
+
+ # Ensure all the indexes have been built or updated, since
+ # otherwise the processes in the pool may timeout on waiting
+ # for a write lock. We do this indirectly by retrieving the
+ # provider index, which should in turn trigger the update of
+ # all the indexes if there's any need for that.
+ _ = spack.repo.path.provider_index
+
+ # Ensure we have compilers in compilers.yaml to avoid that
+ # processes try to write the config file in parallel
+ _ = spack.compilers.get_compiler_config()
+
+ # Solve the environment in parallel on Linux
+ start = time.time()
+ max_processes = min(
+ max(len(arguments), 1), # Number of specs
+ 16 # Cap on 16 cores
+ )
+
+ # TODO: revisit this print as soon as darwin is parallel too
+ msg = 'Starting concretization'
+ if sys.platform != 'darwin':
+ msg = msg + ' pool with {0} processes'.format(
+ spack.util.parallel.num_processes(max_processes=max_processes)
+ )
+ tty.msg(msg)
+
+ concretized_root_specs = spack.util.parallel.parallel_map(
+ _concretize_task, arguments, max_processes=max_processes
+ )
+
+ finish = time.time()
+ tty.msg('Environment concretized in {0} sec.'.format(finish - start))
+ results = []
+ for abstract, concrete in zip(root_specs, concretized_root_specs):
+ self._add_concrete_spec(abstract, concrete)
+ results.append((abstract, concrete))
+
+ return results
def concretize_and_add(self, user_spec, concrete_spec=None, tests=False):
"""Concretize and add a single spec to the environment.
@@ -1962,6 +2011,12 @@ def _concretize_from_constraints(spec_constraints, tests=False):
invalid_constraints.extend(inv_variant_constraints)
+def _concretize_task(packed_arguments):
+ spec_constraints, tests = packed_arguments
+ with tty.SuppressOutput(msg_enabled=False):
+ return _concretize_from_constraints(spec_constraints, tests)
+
+
def make_repo_path(root):
"""Make a RepoPath from the repo subdirectories in an environment."""
path = spack.repo.RepoPath()
diff --git a/lib/spack/spack/fetch_strategy.py b/lib/spack/spack/fetch_strategy.py
index 6db41e2328..436fbf39bf 100644
--- a/lib/spack/spack/fetch_strategy.py
+++ b/lib/spack/spack/fetch_strategy.py
@@ -48,7 +48,7 @@ import spack.error
import spack.util.crypto as crypto
import spack.util.pattern as pattern
import spack.util.url as url_util
-import spack.util.web as web_util
+import spack.util.web
import spack.version
from spack.util.compression import decompressor_for, extension
from spack.util.executable import CommandNotFoundError, which
@@ -350,8 +350,8 @@ class URLFetchStrategy(FetchStrategy):
else:
# Telling urllib to check if url is accessible
try:
- url, headers, response = web_util.read_from_url(url)
- except web_util.SpackWebError:
+ url, headers, response = spack.util.web.read_from_url(url)
+ except spack.util.web.SpackWebError:
msg = "Urllib fetch failed to verify url {0}".format(url)
raise FailedDownloadError(url, msg)
return (response.getcode() is None or response.getcode() == 200)
@@ -380,8 +380,8 @@ class URLFetchStrategy(FetchStrategy):
# Run urllib but grab the mime type from the http headers
try:
- url, headers, response = web_util.read_from_url(url)
- except web_util.SpackWebError as e:
+ url, headers, response = spack.util.web.read_from_url(url)
+ except spack.util.web.SpackWebError as e:
# clean up archive on failure.
if self.archive_file:
os.remove(self.archive_file)
@@ -571,7 +571,7 @@ class URLFetchStrategy(FetchStrategy):
if not self.archive_file:
raise NoArchiveFileError("Cannot call archive() before fetching.")
- web_util.push_to_url(
+ spack.util.web.push_to_url(
self.archive_file,
destination,
keep_original=True)
@@ -1388,12 +1388,12 @@ class S3FetchStrategy(URLFetchStrategy):
basename = os.path.basename(parsed_url.path)
with working_dir(self.stage.path):
- _, headers, stream = web_util.read_from_url(self.url)
+ _, headers, stream = spack.util.web.read_from_url(self.url)
with open(basename, 'wb') as f:
shutil.copyfileobj(stream, f)
- content_type = web_util.get_header(headers, 'Content-type')
+ content_type = spack.util.web.get_header(headers, 'Content-type')
if content_type == 'text/html':
warn_content_type_mismatch(self.archive_file or "the archive")
diff --git a/lib/spack/spack/hash_types.py b/lib/spack/spack/hash_types.py
index 12a57640bd..ab41e1869e 100644
--- a/lib/spack/spack/hash_types.py
+++ b/lib/spack/spack/hash_types.py
@@ -44,6 +44,13 @@ build_hash = SpecHashDescriptor(
deptype=('build', 'link', 'run'), package_hash=False, name='build_hash')
+#: Hash descriptor used only to transfer a DAG, as is, across processes
+process_hash = SpecHashDescriptor(
+ deptype=('build', 'link', 'run', 'test'),
+ package_hash=False,
+ name='process_hash'
+)
+
#: Full hash used in build pipelines to determine when to rebuild packages.
full_hash = SpecHashDescriptor(
deptype=('build', 'link', 'run'), package_hash=True, name='full_hash')
diff --git a/lib/spack/spack/modules/common.py b/lib/spack/spack/modules/common.py
index 21cb527b65..0e9c8eac68 100644
--- a/lib/spack/spack/modules/common.py
+++ b/lib/spack/spack/modules/common.py
@@ -2,7 +2,6 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
-
"""Here we consolidate the logic for creating an abstract description
of the information that module systems need.
@@ -43,7 +42,7 @@ from llnl.util.lang import dedupe
import spack.build_environment as build_environment
import spack.config
-import spack.environment as ev
+import spack.environment
import spack.error
import spack.paths
import spack.projections as proj
@@ -698,12 +697,13 @@ class BaseContext(tengine.Context):
spec = self.spec.copy() # defensive copy before setting prefix
if use_view:
if use_view is True:
- use_view = ev.default_view_name
+ use_view = spack.environment.default_view_name
- env = ev.active_environment()
+ env = spack.environment.active_environment()
if not env:
- raise ev.SpackEnvironmentViewError("Module generation with views "
- "requires active environment")
+ raise spack.environment.SpackEnvironmentViewError(
+ "Module generation with views requires active environment"
+ )
view = env.views[use_view]
diff --git a/lib/spack/spack/solver/asp.py b/lib/spack/spack/solver/asp.py
index c091d0b89d..8bafa075db 100644
--- a/lib/spack/spack/solver/asp.py
+++ b/lib/spack/spack/solver/asp.py
@@ -334,7 +334,7 @@ class PyclingoDriver(object):
self.control.configuration.asp.trans_ext = 'all'
self.control.configuration.asp.eq = '5'
self.control.configuration.configuration = 'tweety'
- self.control.configuration.solve.parallel_mode = '2'
+ self.control.configuration.solve.parallel_mode = '1'
self.control.configuration.solver.opt_strategy = "usc,one"
# set up the problem -- this generates facts and rules
diff --git a/lib/spack/spack/spec.py b/lib/spack/spack/spec.py
index 8b13374430..a153ffe50f 100644
--- a/lib/spack/spack/spec.py
+++ b/lib/spack/spack/spec.py
@@ -1567,6 +1567,14 @@ class Spec(object):
"""
return self._cached_hash(ht.build_hash, length)
+ def process_hash(self, length=None):
+ """Hash used to store specs in environments.
+
+ This hash includes build and test dependencies and is only used to
+ serialize a spec and pass it around among processes.
+ """
+ return self._cached_hash(ht.process_hash, length)
+
def full_hash(self, length=None):
"""Hash to determine when to rebuild packages in the build pipeline.
@@ -1832,6 +1840,7 @@ class Spec(object):
not self._hashes_final) # lazily compute
if write_full_hash:
node[ht.full_hash.name] = self.full_hash()
+
write_build_hash = 'build' in hash.deptype and (
self._hashes_final and self._build_hash or # cached and final
not self._hashes_final) # lazily compute
@@ -1839,8 +1848,12 @@ class Spec(object):
node[ht.build_hash.name] = self.build_hash()
else:
node['concrete'] = False
+
if hash.name == 'build_hash':
node[hash.name] = self.build_hash()
+ elif hash.name == 'process_hash':
+ node[hash.name] = self.process_hash()
+
return node
def to_yaml(self, stream=None, hash=ht.dag_hash):
@@ -1974,7 +1987,8 @@ class Spec(object):
# new format: elements of dependency spec are keyed.
for key in (ht.full_hash.name,
ht.build_hash.name,
- ht.dag_hash.name):
+ ht.dag_hash.name,
+ ht.process_hash.name):
if key in elt:
dep_hash, deptypes = elt[key], elt['type']
hash_type = key
@@ -4430,7 +4444,7 @@ class Spec(object):
return hash(lang.tuplify(self._cmp_iter))
def __reduce__(self):
- return _spec_from_dict, (self.to_dict(hash=ht.build_hash),)
+ return _spec_from_dict, (self.to_dict(hash=ht.process_hash),)
def merge_abstract_anonymous_specs(*abstract_specs):
diff --git a/lib/spack/spack/test/cmd/dev_build.py b/lib/spack/spack/test/cmd/dev_build.py
index f8be2364d9..765d4dc81b 100644
--- a/lib/spack/spack/test/cmd/dev_build.py
+++ b/lib/spack/spack/test/cmd/dev_build.py
@@ -245,7 +245,7 @@ env:
env('create', 'test', './spack.yaml')
with ev.read('test'):
- with pytest.raises(spack.spec.UnsatisfiableVersionSpecError):
+ with pytest.raises(RuntimeError):
install()
diff --git a/lib/spack/spack/user_environment.py b/lib/spack/spack/user_environment.py
index 23fb6529d8..55f516b732 100644
--- a/lib/spack/spack/user_environment.py
+++ b/lib/spack/spack/user_environment.py
@@ -5,7 +5,7 @@
import os
import sys
-import spack.build_environment as build_env
+import spack.build_environment
import spack.config
import spack.util.environment as environment
import spack.util.prefix as prefix
@@ -85,13 +85,13 @@ def environment_modifications_for_spec(spec, view=None):
# Let the extendee/dependency modify their extensions/dependents
# before asking for package-specific modifications
env.extend(
- build_env.modifications_from_dependencies(
+ spack.build_environment.modifications_from_dependencies(
spec, context='run'
)
)
# Package specific modifications
- build_env.set_module_variables_for_package(spec.package)
+ spack.build_environment.set_module_variables_for_package(spec.package)
spec.package.setup_run_environment(env)
return env
diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py
new file mode 100644
index 0000000000..04562a1801
--- /dev/null
+++ b/lib/spack/spack/util/parallel.py
@@ -0,0 +1,129 @@
+# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
+# Spack Project Developers. See the top-level COPYRIGHT file for details.
+#
+# SPDX-License-Identifier: (Apache-2.0 OR MIT)
+from __future__ import print_function
+
+import contextlib
+import multiprocessing
+import os
+import sys
+import traceback
+
+import six
+
+from .cpus import cpus_available
+
+
+class ErrorFromWorker(object):
+ """Wrapper class to report an error from a worker process"""
+ def __init__(self, exc_cls, exc, tb):
+ """Create an error object from an exception raised from
+ the worker process.
+
+ The attributes of the process error objects are all strings
+ as they are easy to send over a pipe.
+
+ Args:
+ exc: exception raised from the worker process
+ """
+ self.pid = os.getpid()
+ self.error_message = ''.join(traceback.format_exception(exc_cls, exc, tb))
+
+ def __str__(self):
+ msg = "[PID={0.pid}] {0.error_message}"
+ return msg.format(self)
+
+
+class Task(object):
+ """Wrapped task that trap every Exception and return it as an
+ ErrorFromWorker object.
+
+ We are using a wrapper class instead of a decorator since the class
+ is pickleable, while a decorator with an inner closure is not.
+ """
+ def __init__(self, func):
+ self.func = func
+
+ def __call__(self, *args, **kwargs):
+ try:
+ value = self.func(*args, **kwargs)
+ except Exception:
+ value = ErrorFromWorker(*sys.exc_info())
+ return value
+
+
+def raise_if_errors(*results):
+ """Analyze results from worker Processes to search for ErrorFromWorker
+ objects. If found print all of them and raise an exception.
+
+ Args:
+ *results: results from worker processes
+
+ Raise:
+ RuntimeError: if ErrorFromWorker objects are in the results
+ """
+ err_stream = six.StringIO() # sys.stderr
+ errors = [x for x in results if isinstance(x, ErrorFromWorker)]
+ if not errors:
+ return
+
+ # Report the errors and then raise
+ for error in errors:
+ print(error, file=err_stream)
+
+ print('[PARENT PROCESS]:', file=err_stream)
+ traceback.print_stack(file=err_stream)
+ error_msg = 'errors occurred in worker processes:\n{0}'
+ raise RuntimeError(error_msg.format(err_stream.getvalue()))
+
+
+@contextlib.contextmanager
+def pool(*args, **kwargs):
+ """Context manager to start and terminate a pool of processes, similar to the
+ default one provided in Python 3.X
+
+ Arguments are forwarded to the multiprocessing.Pool.__init__ method.
+ """
+ try:
+ p = multiprocessing.Pool(*args, **kwargs)
+ yield p
+ finally:
+ p.terminate()
+ p.join()
+
+
+def num_processes(max_processes=None):
+ """Return the number of processes in a pool.
+
+ Currently the function return the minimum between the maximum number
+ of processes and the cpus available.
+
+ When a maximum number of processes is not specified return the cpus available.
+
+ Args:
+ max_processes (int or None): maximum number of processes allowed
+ """
+ max_processes or cpus_available()
+ return min(cpus_available(), max_processes)
+
+
+def parallel_map(func, arguments, max_processes=None):
+ """Map a task object to the list of arguments, return the list of results.
+
+ Args:
+ func (Task): user defined task object
+ arguments (list): list of arguments for the task
+ max_processes (int or None): maximum number of processes allowed
+
+ Raises:
+ RuntimeError: if any error occurred in the worker processes
+ """
+ task_wrapper = Task(func)
+ if sys.platform != 'darwin':
+ with pool(processes=num_processes(max_processes=max_processes)) as p:
+ results = p.map(task_wrapper, arguments)
+ else:
+ results = list(map(task_wrapper, arguments))
+ raise_if_errors(*results)
+ return results