From 27a0425e5d432647d549d4f671f8556b097bfa4e Mon Sep 17 00:00:00 2001 From: Harmen Stoppels Date: Fri, 20 Oct 2023 17:09:19 +0200 Subject: concretize separately: show concretization time per spec as they concretize when verbose (#40634) --- lib/spack/spack/environment/environment.py | 51 ++++++++++-------- lib/spack/spack/test/cmd/dev_build.py | 3 +- lib/spack/spack/util/parallel.py | 87 ++++++------------------------ 3 files changed, 45 insertions(+), 96 deletions(-) (limited to 'lib') diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py index 62dda31034..51ea453c39 100644 --- a/lib/spack/spack/environment/environment.py +++ b/lib/spack/spack/environment/environment.py @@ -1480,11 +1480,12 @@ class Environment: self._add_concrete_spec(s, concrete, new=False) # Concretize any new user specs that we haven't concretized yet - arguments, root_specs = [], [] + args, root_specs, i = [], [], 0 for uspec, uspec_constraints in zip(self.user_specs, self.user_specs.specs_as_constraints): if uspec not in old_concretized_user_specs: root_specs.append(uspec) - arguments.append((uspec_constraints, tests)) + args.append((i, uspec_constraints, tests)) + i += 1 # Ensure we don't try to bootstrap clingo in parallel if spack.config.get("config:concretizer", "clingo") == "clingo": @@ -1503,34 +1504,36 @@ class Environment: _ = spack.compilers.get_compiler_config() # Early return if there is nothing to do - if len(arguments) == 0: + if len(args) == 0: return [] # Solve the environment in parallel on Linux start = time.time() - max_processes = min( - len(arguments), # Number of specs - spack.util.cpus.determine_number_of_jobs(parallel=True), - ) + num_procs = min(len(args), spack.util.cpus.determine_number_of_jobs(parallel=True)) - # TODO: revisit this print as soon as darwin is parallel too + # TODO: support parallel concretization on macOS and Windows msg = "Starting concretization" - if sys.platform != "darwin": - pool_size = spack.util.parallel.num_processes(max_processes=max_processes) - if pool_size > 1: - msg = msg + " pool with {0} processes".format(pool_size) + if sys.platform not in ("darwin", "win32") and num_procs > 1: + msg += f" pool with {num_procs} processes" tty.msg(msg) - concretized_root_specs = spack.util.parallel.parallel_map( - _concretize_task, arguments, max_processes=max_processes, debug=tty.is_debug() - ) + batch = [] + for i, concrete, duration in spack.util.parallel.imap_unordered( + _concretize_task, args, processes=num_procs, debug=tty.is_debug() + ): + batch.append((i, concrete)) + tty.verbose(f"[{duration:7.2f}s] {root_specs[i]}") + sys.stdout.flush() + + # Add specs in original order + batch.sort(key=lambda x: x[0]) + by_hash = {} # for attaching information on test dependencies + for root, (_, concrete) in zip(root_specs, batch): + self._add_concrete_spec(root, concrete) + by_hash[concrete.dag_hash()] = concrete finish = time.time() - tty.msg("Environment concretized in %.2f seconds." % (finish - start)) - by_hash = {} - for abstract, concrete in zip(root_specs, concretized_root_specs): - self._add_concrete_spec(abstract, concrete) - by_hash[concrete.dag_hash()] = concrete + tty.msg(f"Environment concretized in {finish - start:.2f} seconds") # Unify the specs objects, so we get correct references to all parents self._read_lockfile_dict(self._to_lockfile_dict()) @@ -2392,10 +2395,12 @@ def _concretize_from_constraints(spec_constraints, tests=False): invalid_constraints.extend(inv_variant_constraints) -def _concretize_task(packed_arguments): - spec_constraints, tests = packed_arguments +def _concretize_task(packed_arguments) -> Tuple[int, Spec, float]: + index, spec_constraints, tests = packed_arguments with tty.SuppressOutput(msg_enabled=False): - return _concretize_from_constraints(spec_constraints, tests) + start = time.time() + spec = _concretize_from_constraints(spec_constraints, tests) + return index, spec, time.time() - start def make_repo_path(root): diff --git a/lib/spack/spack/test/cmd/dev_build.py b/lib/spack/spack/test/cmd/dev_build.py index 71ab195b64..c5a7b5c3bb 100644 --- a/lib/spack/spack/test/cmd/dev_build.py +++ b/lib/spack/spack/test/cmd/dev_build.py @@ -11,6 +11,7 @@ import llnl.util.filesystem as fs import spack.build_environment import spack.environment as ev +import spack.error import spack.spec import spack.store from spack.main import SpackCommand @@ -237,7 +238,7 @@ spack: env("create", "test", "./spack.yaml") with ev.read("test"): - with pytest.raises(RuntimeError): + with pytest.raises((RuntimeError, spack.error.UnsatisfiableSpecError)): install() diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py index 06e9ed5225..683835641a 100644 --- a/lib/spack/spack/util/parallel.py +++ b/lib/spack/spack/util/parallel.py @@ -2,14 +2,11 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) -import contextlib import multiprocessing import os import sys import traceback -from .cpus import cpus_available - class ErrorFromWorker: """Wrapper class to report an error from a worker process""" @@ -56,79 +53,25 @@ class Task: return value -def raise_if_errors(*results, **kwargs): - """Analyze results from worker Processes to search for ErrorFromWorker - objects. If found print all of them and raise an exception. - - Args: - *results: results from worker processes - debug: if True show complete stacktraces - - Raise: - RuntimeError: if ErrorFromWorker objects are in the results - """ - debug = kwargs.get("debug", False) # This can be a keyword only arg in Python 3 - errors = [x for x in results if isinstance(x, ErrorFromWorker)] - if not errors: - return - - msg = "\n".join([error.stacktrace if debug else str(error) for error in errors]) - - error_fmt = "{0}" - if len(errors) > 1 and not debug: - error_fmt = "errors occurred during concretization of the environment:\n{0}" - - raise RuntimeError(error_fmt.format(msg)) - - -@contextlib.contextmanager -def pool(*args, **kwargs): - """Context manager to start and terminate a pool of processes, similar to the - default one provided in Python 3.X - - Arguments are forwarded to the multiprocessing.Pool.__init__ method. - """ - try: - p = multiprocessing.Pool(*args, **kwargs) - yield p - finally: - p.terminate() - p.join() - - -def num_processes(max_processes=None): - """Return the number of processes in a pool. - - Currently the function return the minimum between the maximum number - of processes and the cpus available. - - When a maximum number of processes is not specified return the cpus available. - - Args: - max_processes (int or None): maximum number of processes allowed - """ - max_processes or cpus_available() - return min(cpus_available(), max_processes) - - -def parallel_map(func, arguments, max_processes=None, debug=False): - """Map a task object to the list of arguments, return the list of results. +def imap_unordered(f, list_of_args, *, processes: int, debug=False): + """Wrapper around multiprocessing.Pool.imap_unordered. Args: - func (Task): user defined task object - arguments (list): list of arguments for the task - max_processes (int or None): maximum number of processes allowed - debug (bool): if False, raise an exception containing just the error messages + f: function to apply + list_of_args: list of tuples of args for the task + processes: maximum number of processes allowed + debug: if False, raise an exception containing just the error messages from workers, if True an exception with complete stacktraces Raises: RuntimeError: if any error occurred in the worker processes """ - task_wrapper = Task(func) - if sys.platform != "darwin" and sys.platform != "win32": - with pool(processes=num_processes(max_processes=max_processes)) as p: - results = p.map(task_wrapper, arguments) - else: - results = list(map(task_wrapper, arguments)) - raise_if_errors(*results, debug=debug) - return results + if sys.platform in ("darwin", "win32") or len(list_of_args) == 1: + yield from map(f, list_of_args) + return + + with multiprocessing.Pool(processes) as p: + for result in p.imap_unordered(Task(f), list_of_args): + if isinstance(result, ErrorFromWorker): + raise RuntimeError(result.stacktrace if debug else str(result)) + yield result -- cgit v1.2.3-60-g2f50