summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/spack/environment/environment.py51
-rw-r--r--lib/spack/spack/test/cmd/dev_build.py3
-rw-r--r--lib/spack/spack/util/parallel.py87
3 files changed, 45 insertions, 96 deletions
diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py
index 62dda31034..51ea453c39 100644
--- a/lib/spack/spack/environment/environment.py
+++ b/lib/spack/spack/environment/environment.py
@@ -1480,11 +1480,12 @@ class Environment:
self._add_concrete_spec(s, concrete, new=False)
# Concretize any new user specs that we haven't concretized yet
- arguments, root_specs = [], []
+ args, root_specs, i = [], [], 0
for uspec, uspec_constraints in zip(self.user_specs, self.user_specs.specs_as_constraints):
if uspec not in old_concretized_user_specs:
root_specs.append(uspec)
- arguments.append((uspec_constraints, tests))
+ args.append((i, uspec_constraints, tests))
+ i += 1
# Ensure we don't try to bootstrap clingo in parallel
if spack.config.get("config:concretizer", "clingo") == "clingo":
@@ -1503,34 +1504,36 @@ class Environment:
_ = spack.compilers.get_compiler_config()
# Early return if there is nothing to do
- if len(arguments) == 0:
+ if len(args) == 0:
return []
# Solve the environment in parallel on Linux
start = time.time()
- max_processes = min(
- len(arguments), # Number of specs
- spack.util.cpus.determine_number_of_jobs(parallel=True),
- )
+ num_procs = min(len(args), spack.util.cpus.determine_number_of_jobs(parallel=True))
- # TODO: revisit this print as soon as darwin is parallel too
+ # TODO: support parallel concretization on macOS and Windows
msg = "Starting concretization"
- if sys.platform != "darwin":
- pool_size = spack.util.parallel.num_processes(max_processes=max_processes)
- if pool_size > 1:
- msg = msg + " pool with {0} processes".format(pool_size)
+ if sys.platform not in ("darwin", "win32") and num_procs > 1:
+ msg += f" pool with {num_procs} processes"
tty.msg(msg)
- concretized_root_specs = spack.util.parallel.parallel_map(
- _concretize_task, arguments, max_processes=max_processes, debug=tty.is_debug()
- )
+ batch = []
+ for i, concrete, duration in spack.util.parallel.imap_unordered(
+ _concretize_task, args, processes=num_procs, debug=tty.is_debug()
+ ):
+ batch.append((i, concrete))
+ tty.verbose(f"[{duration:7.2f}s] {root_specs[i]}")
+ sys.stdout.flush()
+
+ # Add specs in original order
+ batch.sort(key=lambda x: x[0])
+ by_hash = {} # for attaching information on test dependencies
+ for root, (_, concrete) in zip(root_specs, batch):
+ self._add_concrete_spec(root, concrete)
+ by_hash[concrete.dag_hash()] = concrete
finish = time.time()
- tty.msg("Environment concretized in %.2f seconds." % (finish - start))
- by_hash = {}
- for abstract, concrete in zip(root_specs, concretized_root_specs):
- self._add_concrete_spec(abstract, concrete)
- by_hash[concrete.dag_hash()] = concrete
+ tty.msg(f"Environment concretized in {finish - start:.2f} seconds")
# Unify the specs objects, so we get correct references to all parents
self._read_lockfile_dict(self._to_lockfile_dict())
@@ -2392,10 +2395,12 @@ def _concretize_from_constraints(spec_constraints, tests=False):
invalid_constraints.extend(inv_variant_constraints)
-def _concretize_task(packed_arguments):
- spec_constraints, tests = packed_arguments
+def _concretize_task(packed_arguments) -> Tuple[int, Spec, float]:
+ index, spec_constraints, tests = packed_arguments
with tty.SuppressOutput(msg_enabled=False):
- return _concretize_from_constraints(spec_constraints, tests)
+ start = time.time()
+ spec = _concretize_from_constraints(spec_constraints, tests)
+ return index, spec, time.time() - start
def make_repo_path(root):
diff --git a/lib/spack/spack/test/cmd/dev_build.py b/lib/spack/spack/test/cmd/dev_build.py
index 71ab195b64..c5a7b5c3bb 100644
--- a/lib/spack/spack/test/cmd/dev_build.py
+++ b/lib/spack/spack/test/cmd/dev_build.py
@@ -11,6 +11,7 @@ import llnl.util.filesystem as fs
import spack.build_environment
import spack.environment as ev
+import spack.error
import spack.spec
import spack.store
from spack.main import SpackCommand
@@ -237,7 +238,7 @@ spack:
env("create", "test", "./spack.yaml")
with ev.read("test"):
- with pytest.raises(RuntimeError):
+ with pytest.raises((RuntimeError, spack.error.UnsatisfiableSpecError)):
install()
diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py
index 06e9ed5225..683835641a 100644
--- a/lib/spack/spack/util/parallel.py
+++ b/lib/spack/spack/util/parallel.py
@@ -2,14 +2,11 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
-import contextlib
import multiprocessing
import os
import sys
import traceback
-from .cpus import cpus_available
-
class ErrorFromWorker:
"""Wrapper class to report an error from a worker process"""
@@ -56,79 +53,25 @@ class Task:
return value
-def raise_if_errors(*results, **kwargs):
- """Analyze results from worker Processes to search for ErrorFromWorker
- objects. If found print all of them and raise an exception.
-
- Args:
- *results: results from worker processes
- debug: if True show complete stacktraces
-
- Raise:
- RuntimeError: if ErrorFromWorker objects are in the results
- """
- debug = kwargs.get("debug", False) # This can be a keyword only arg in Python 3
- errors = [x for x in results if isinstance(x, ErrorFromWorker)]
- if not errors:
- return
-
- msg = "\n".join([error.stacktrace if debug else str(error) for error in errors])
-
- error_fmt = "{0}"
- if len(errors) > 1 and not debug:
- error_fmt = "errors occurred during concretization of the environment:\n{0}"
-
- raise RuntimeError(error_fmt.format(msg))
-
-
-@contextlib.contextmanager
-def pool(*args, **kwargs):
- """Context manager to start and terminate a pool of processes, similar to the
- default one provided in Python 3.X
-
- Arguments are forwarded to the multiprocessing.Pool.__init__ method.
- """
- try:
- p = multiprocessing.Pool(*args, **kwargs)
- yield p
- finally:
- p.terminate()
- p.join()
-
-
-def num_processes(max_processes=None):
- """Return the number of processes in a pool.
-
- Currently the function return the minimum between the maximum number
- of processes and the cpus available.
-
- When a maximum number of processes is not specified return the cpus available.
-
- Args:
- max_processes (int or None): maximum number of processes allowed
- """
- max_processes or cpus_available()
- return min(cpus_available(), max_processes)
-
-
-def parallel_map(func, arguments, max_processes=None, debug=False):
- """Map a task object to the list of arguments, return the list of results.
+def imap_unordered(f, list_of_args, *, processes: int, debug=False):
+ """Wrapper around multiprocessing.Pool.imap_unordered.
Args:
- func (Task): user defined task object
- arguments (list): list of arguments for the task
- max_processes (int or None): maximum number of processes allowed
- debug (bool): if False, raise an exception containing just the error messages
+ f: function to apply
+ list_of_args: list of tuples of args for the task
+ processes: maximum number of processes allowed
+ debug: if False, raise an exception containing just the error messages
from workers, if True an exception with complete stacktraces
Raises:
RuntimeError: if any error occurred in the worker processes
"""
- task_wrapper = Task(func)
- if sys.platform != "darwin" and sys.platform != "win32":
- with pool(processes=num_processes(max_processes=max_processes)) as p:
- results = p.map(task_wrapper, arguments)
- else:
- results = list(map(task_wrapper, arguments))
- raise_if_errors(*results, debug=debug)
- return results
+ if sys.platform in ("darwin", "win32") or len(list_of_args) == 1:
+ yield from map(f, list_of_args)
+ return
+
+ with multiprocessing.Pool(processes) as p:
+ for result in p.imap_unordered(Task(f), list_of_args):
+ if isinstance(result, ErrorFromWorker):
+ raise RuntimeError(result.stacktrace if debug else str(result))
+ yield result