From 2d45a9d6176eda9375515b77699a17cfd5221111 Mon Sep 17 00:00:00 2001 From: Massimiliano Culpo Date: Tue, 19 Oct 2021 17:09:34 +0200 Subject: Speed-up environment concretization on linux with a process pool (#26264) * Speed-up environment concretization with a process pool We can exploit the fact that the environment is concretized separately and use a pool of processes to concretize it. * Add module spack.util.parallel Module includes `pool` and `parallel_map` abstractions, along with implementation details for both. * Add a new hash type to pass specs across processes * Add tty msg with concretization time --- lib/spack/spack/environment/environment.py | 67 +++++++++++++-- lib/spack/spack/fetch_strategy.py | 16 ++-- lib/spack/spack/hash_types.py | 7 ++ lib/spack/spack/modules/common.py | 12 +-- lib/spack/spack/solver/asp.py | 2 +- lib/spack/spack/spec.py | 18 +++- lib/spack/spack/test/cmd/dev_build.py | 2 +- lib/spack/spack/user_environment.py | 6 +- lib/spack/spack/util/parallel.py | 129 +++++++++++++++++++++++++++++ 9 files changed, 232 insertions(+), 27 deletions(-) create mode 100644 lib/spack/spack/util/parallel.py (limited to 'lib') diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py index 5a271fc6dd..5fce76118c 100644 --- a/lib/spack/spack/environment/environment.py +++ b/lib/spack/spack/environment/environment.py @@ -9,6 +9,7 @@ import os import re import shutil import sys +import time import ruamel.yaml as yaml import six @@ -17,6 +18,8 @@ from ordereddict_backport import OrderedDict import llnl.util.filesystem as fs import llnl.util.tty as tty +import spack.bootstrap +import spack.compilers import spack.concretize import spack.config import spack.error @@ -28,10 +31,13 @@ import spack.schema.env import spack.spec import spack.stage import spack.store +import spack.subprocess_context import spack.user_environment as uenv +import spack.util.cpus import spack.util.environment import spack.util.hash import spack.util.lock as lk +import spack.util.parallel import spack.util.path import spack.util.spack_json as sjson import spack.util.spack_yaml as syaml @@ -1111,14 +1117,57 @@ class Environment(object): self._add_concrete_spec(s, concrete, new=False) # Concretize any new user specs that we haven't concretized yet - concretized_specs = [] + arguments, root_specs = [], [] for uspec, uspec_constraints in zip( - self.user_specs, self.user_specs.specs_as_constraints): + self.user_specs, self.user_specs.specs_as_constraints + ): if uspec not in old_concretized_user_specs: - concrete = _concretize_from_constraints(uspec_constraints, tests=tests) - self._add_concrete_spec(uspec, concrete) - concretized_specs.append((uspec, concrete)) - return concretized_specs + root_specs.append(uspec) + arguments.append((uspec_constraints, tests)) + + # Ensure we don't try to bootstrap clingo in parallel + if spack.config.get('config:concretizer') == 'clingo': + with spack.bootstrap.ensure_bootstrap_configuration(): + spack.bootstrap.ensure_clingo_importable_or_raise() + + # Ensure all the indexes have been built or updated, since + # otherwise the processes in the pool may timeout on waiting + # for a write lock. We do this indirectly by retrieving the + # provider index, which should in turn trigger the update of + # all the indexes if there's any need for that. + _ = spack.repo.path.provider_index + + # Ensure we have compilers in compilers.yaml to avoid that + # processes try to write the config file in parallel + _ = spack.compilers.get_compiler_config() + + # Solve the environment in parallel on Linux + start = time.time() + max_processes = min( + max(len(arguments), 1), # Number of specs + 16 # Cap on 16 cores + ) + + # TODO: revisit this print as soon as darwin is parallel too + msg = 'Starting concretization' + if sys.platform != 'darwin': + msg = msg + ' pool with {0} processes'.format( + spack.util.parallel.num_processes(max_processes=max_processes) + ) + tty.msg(msg) + + concretized_root_specs = spack.util.parallel.parallel_map( + _concretize_task, arguments, max_processes=max_processes + ) + + finish = time.time() + tty.msg('Environment concretized in {0} sec.'.format(finish - start)) + results = [] + for abstract, concrete in zip(root_specs, concretized_root_specs): + self._add_concrete_spec(abstract, concrete) + results.append((abstract, concrete)) + + return results def concretize_and_add(self, user_spec, concrete_spec=None, tests=False): """Concretize and add a single spec to the environment. @@ -1962,6 +2011,12 @@ def _concretize_from_constraints(spec_constraints, tests=False): invalid_constraints.extend(inv_variant_constraints) +def _concretize_task(packed_arguments): + spec_constraints, tests = packed_arguments + with tty.SuppressOutput(msg_enabled=False): + return _concretize_from_constraints(spec_constraints, tests) + + def make_repo_path(root): """Make a RepoPath from the repo subdirectories in an environment.""" path = spack.repo.RepoPath() diff --git a/lib/spack/spack/fetch_strategy.py b/lib/spack/spack/fetch_strategy.py index 6db41e2328..436fbf39bf 100644 --- a/lib/spack/spack/fetch_strategy.py +++ b/lib/spack/spack/fetch_strategy.py @@ -48,7 +48,7 @@ import spack.error import spack.util.crypto as crypto import spack.util.pattern as pattern import spack.util.url as url_util -import spack.util.web as web_util +import spack.util.web import spack.version from spack.util.compression import decompressor_for, extension from spack.util.executable import CommandNotFoundError, which @@ -350,8 +350,8 @@ class URLFetchStrategy(FetchStrategy): else: # Telling urllib to check if url is accessible try: - url, headers, response = web_util.read_from_url(url) - except web_util.SpackWebError: + url, headers, response = spack.util.web.read_from_url(url) + except spack.util.web.SpackWebError: msg = "Urllib fetch failed to verify url {0}".format(url) raise FailedDownloadError(url, msg) return (response.getcode() is None or response.getcode() == 200) @@ -380,8 +380,8 @@ class URLFetchStrategy(FetchStrategy): # Run urllib but grab the mime type from the http headers try: - url, headers, response = web_util.read_from_url(url) - except web_util.SpackWebError as e: + url, headers, response = spack.util.web.read_from_url(url) + except spack.util.web.SpackWebError as e: # clean up archive on failure. if self.archive_file: os.remove(self.archive_file) @@ -571,7 +571,7 @@ class URLFetchStrategy(FetchStrategy): if not self.archive_file: raise NoArchiveFileError("Cannot call archive() before fetching.") - web_util.push_to_url( + spack.util.web.push_to_url( self.archive_file, destination, keep_original=True) @@ -1388,12 +1388,12 @@ class S3FetchStrategy(URLFetchStrategy): basename = os.path.basename(parsed_url.path) with working_dir(self.stage.path): - _, headers, stream = web_util.read_from_url(self.url) + _, headers, stream = spack.util.web.read_from_url(self.url) with open(basename, 'wb') as f: shutil.copyfileobj(stream, f) - content_type = web_util.get_header(headers, 'Content-type') + content_type = spack.util.web.get_header(headers, 'Content-type') if content_type == 'text/html': warn_content_type_mismatch(self.archive_file or "the archive") diff --git a/lib/spack/spack/hash_types.py b/lib/spack/spack/hash_types.py index 12a57640bd..ab41e1869e 100644 --- a/lib/spack/spack/hash_types.py +++ b/lib/spack/spack/hash_types.py @@ -44,6 +44,13 @@ build_hash = SpecHashDescriptor( deptype=('build', 'link', 'run'), package_hash=False, name='build_hash') +#: Hash descriptor used only to transfer a DAG, as is, across processes +process_hash = SpecHashDescriptor( + deptype=('build', 'link', 'run', 'test'), + package_hash=False, + name='process_hash' +) + #: Full hash used in build pipelines to determine when to rebuild packages. full_hash = SpecHashDescriptor( deptype=('build', 'link', 'run'), package_hash=True, name='full_hash') diff --git a/lib/spack/spack/modules/common.py b/lib/spack/spack/modules/common.py index 21cb527b65..0e9c8eac68 100644 --- a/lib/spack/spack/modules/common.py +++ b/lib/spack/spack/modules/common.py @@ -2,7 +2,6 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) - """Here we consolidate the logic for creating an abstract description of the information that module systems need. @@ -43,7 +42,7 @@ from llnl.util.lang import dedupe import spack.build_environment as build_environment import spack.config -import spack.environment as ev +import spack.environment import spack.error import spack.paths import spack.projections as proj @@ -698,12 +697,13 @@ class BaseContext(tengine.Context): spec = self.spec.copy() # defensive copy before setting prefix if use_view: if use_view is True: - use_view = ev.default_view_name + use_view = spack.environment.default_view_name - env = ev.active_environment() + env = spack.environment.active_environment() if not env: - raise ev.SpackEnvironmentViewError("Module generation with views " - "requires active environment") + raise spack.environment.SpackEnvironmentViewError( + "Module generation with views requires active environment" + ) view = env.views[use_view] diff --git a/lib/spack/spack/solver/asp.py b/lib/spack/spack/solver/asp.py index c091d0b89d..8bafa075db 100644 --- a/lib/spack/spack/solver/asp.py +++ b/lib/spack/spack/solver/asp.py @@ -334,7 +334,7 @@ class PyclingoDriver(object): self.control.configuration.asp.trans_ext = 'all' self.control.configuration.asp.eq = '5' self.control.configuration.configuration = 'tweety' - self.control.configuration.solve.parallel_mode = '2' + self.control.configuration.solve.parallel_mode = '1' self.control.configuration.solver.opt_strategy = "usc,one" # set up the problem -- this generates facts and rules diff --git a/lib/spack/spack/spec.py b/lib/spack/spack/spec.py index 8b13374430..a153ffe50f 100644 --- a/lib/spack/spack/spec.py +++ b/lib/spack/spack/spec.py @@ -1567,6 +1567,14 @@ class Spec(object): """ return self._cached_hash(ht.build_hash, length) + def process_hash(self, length=None): + """Hash used to store specs in environments. + + This hash includes build and test dependencies and is only used to + serialize a spec and pass it around among processes. + """ + return self._cached_hash(ht.process_hash, length) + def full_hash(self, length=None): """Hash to determine when to rebuild packages in the build pipeline. @@ -1832,6 +1840,7 @@ class Spec(object): not self._hashes_final) # lazily compute if write_full_hash: node[ht.full_hash.name] = self.full_hash() + write_build_hash = 'build' in hash.deptype and ( self._hashes_final and self._build_hash or # cached and final not self._hashes_final) # lazily compute @@ -1839,8 +1848,12 @@ class Spec(object): node[ht.build_hash.name] = self.build_hash() else: node['concrete'] = False + if hash.name == 'build_hash': node[hash.name] = self.build_hash() + elif hash.name == 'process_hash': + node[hash.name] = self.process_hash() + return node def to_yaml(self, stream=None, hash=ht.dag_hash): @@ -1974,7 +1987,8 @@ class Spec(object): # new format: elements of dependency spec are keyed. for key in (ht.full_hash.name, ht.build_hash.name, - ht.dag_hash.name): + ht.dag_hash.name, + ht.process_hash.name): if key in elt: dep_hash, deptypes = elt[key], elt['type'] hash_type = key @@ -4430,7 +4444,7 @@ class Spec(object): return hash(lang.tuplify(self._cmp_iter)) def __reduce__(self): - return _spec_from_dict, (self.to_dict(hash=ht.build_hash),) + return _spec_from_dict, (self.to_dict(hash=ht.process_hash),) def merge_abstract_anonymous_specs(*abstract_specs): diff --git a/lib/spack/spack/test/cmd/dev_build.py b/lib/spack/spack/test/cmd/dev_build.py index f8be2364d9..765d4dc81b 100644 --- a/lib/spack/spack/test/cmd/dev_build.py +++ b/lib/spack/spack/test/cmd/dev_build.py @@ -245,7 +245,7 @@ env: env('create', 'test', './spack.yaml') with ev.read('test'): - with pytest.raises(spack.spec.UnsatisfiableVersionSpecError): + with pytest.raises(RuntimeError): install() diff --git a/lib/spack/spack/user_environment.py b/lib/spack/spack/user_environment.py index 23fb6529d8..55f516b732 100644 --- a/lib/spack/spack/user_environment.py +++ b/lib/spack/spack/user_environment.py @@ -5,7 +5,7 @@ import os import sys -import spack.build_environment as build_env +import spack.build_environment import spack.config import spack.util.environment as environment import spack.util.prefix as prefix @@ -85,13 +85,13 @@ def environment_modifications_for_spec(spec, view=None): # Let the extendee/dependency modify their extensions/dependents # before asking for package-specific modifications env.extend( - build_env.modifications_from_dependencies( + spack.build_environment.modifications_from_dependencies( spec, context='run' ) ) # Package specific modifications - build_env.set_module_variables_for_package(spec.package) + spack.build_environment.set_module_variables_for_package(spec.package) spec.package.setup_run_environment(env) return env diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py new file mode 100644 index 0000000000..04562a1801 --- /dev/null +++ b/lib/spack/spack/util/parallel.py @@ -0,0 +1,129 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) +from __future__ import print_function + +import contextlib +import multiprocessing +import os +import sys +import traceback + +import six + +from .cpus import cpus_available + + +class ErrorFromWorker(object): + """Wrapper class to report an error from a worker process""" + def __init__(self, exc_cls, exc, tb): + """Create an error object from an exception raised from + the worker process. + + The attributes of the process error objects are all strings + as they are easy to send over a pipe. + + Args: + exc: exception raised from the worker process + """ + self.pid = os.getpid() + self.error_message = ''.join(traceback.format_exception(exc_cls, exc, tb)) + + def __str__(self): + msg = "[PID={0.pid}] {0.error_message}" + return msg.format(self) + + +class Task(object): + """Wrapped task that trap every Exception and return it as an + ErrorFromWorker object. + + We are using a wrapper class instead of a decorator since the class + is pickleable, while a decorator with an inner closure is not. + """ + def __init__(self, func): + self.func = func + + def __call__(self, *args, **kwargs): + try: + value = self.func(*args, **kwargs) + except Exception: + value = ErrorFromWorker(*sys.exc_info()) + return value + + +def raise_if_errors(*results): + """Analyze results from worker Processes to search for ErrorFromWorker + objects. If found print all of them and raise an exception. + + Args: + *results: results from worker processes + + Raise: + RuntimeError: if ErrorFromWorker objects are in the results + """ + err_stream = six.StringIO() # sys.stderr + errors = [x for x in results if isinstance(x, ErrorFromWorker)] + if not errors: + return + + # Report the errors and then raise + for error in errors: + print(error, file=err_stream) + + print('[PARENT PROCESS]:', file=err_stream) + traceback.print_stack(file=err_stream) + error_msg = 'errors occurred in worker processes:\n{0}' + raise RuntimeError(error_msg.format(err_stream.getvalue())) + + +@contextlib.contextmanager +def pool(*args, **kwargs): + """Context manager to start and terminate a pool of processes, similar to the + default one provided in Python 3.X + + Arguments are forwarded to the multiprocessing.Pool.__init__ method. + """ + try: + p = multiprocessing.Pool(*args, **kwargs) + yield p + finally: + p.terminate() + p.join() + + +def num_processes(max_processes=None): + """Return the number of processes in a pool. + + Currently the function return the minimum between the maximum number + of processes and the cpus available. + + When a maximum number of processes is not specified return the cpus available. + + Args: + max_processes (int or None): maximum number of processes allowed + """ + max_processes or cpus_available() + return min(cpus_available(), max_processes) + + +def parallel_map(func, arguments, max_processes=None): + """Map a task object to the list of arguments, return the list of results. + + Args: + func (Task): user defined task object + arguments (list): list of arguments for the task + max_processes (int or None): maximum number of processes allowed + + Raises: + RuntimeError: if any error occurred in the worker processes + """ + task_wrapper = Task(func) + if sys.platform != 'darwin': + with pool(processes=num_processes(max_processes=max_processes)) as p: + results = p.map(task_wrapper, arguments) + else: + results = list(map(task_wrapper, arguments)) + raise_if_errors(*results) + return results -- cgit v1.2.3-60-g2f50