diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/spack/spack/environment/environment.py | 6 | ||||
-rw-r--r-- | lib/spack/spack/util/parallel.py | 9 |
2 files changed, 12 insertions, 3 deletions
diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py index 3fd75f3d70..85c10e366b 100644 --- a/lib/spack/spack/environment/environment.py +++ b/lib/spack/spack/environment/environment.py @@ -1525,7 +1525,11 @@ class Environment: batch = [] for j, (i, concrete, duration) in enumerate( spack.util.parallel.imap_unordered( - _concretize_task, args, processes=num_procs, debug=tty.is_debug() + _concretize_task, + args, + processes=num_procs, + debug=tty.is_debug(), + maxtaskperchild=1, ) ): batch.append((i, concrete)) diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py index 683835641a..c8e6ef7907 100644 --- a/lib/spack/spack/util/parallel.py +++ b/lib/spack/spack/util/parallel.py @@ -6,6 +6,7 @@ import multiprocessing import os import sys import traceback +from typing import Optional class ErrorFromWorker: @@ -53,7 +54,9 @@ class Task: return value -def imap_unordered(f, list_of_args, *, processes: int, debug=False): +def imap_unordered( + f, list_of_args, *, processes: int, maxtaskperchild: Optional[int] = None, debug=False +): """Wrapper around multiprocessing.Pool.imap_unordered. Args: @@ -62,6 +65,8 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False): processes: maximum number of processes allowed debug: if False, raise an exception containing just the error messages from workers, if True an exception with complete stacktraces + maxtaskperchild: number of tasks to be executed by a child before being + killed and substituted Raises: RuntimeError: if any error occurred in the worker processes @@ -70,7 +75,7 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False): yield from map(f, list_of_args) return - with multiprocessing.Pool(processes) as p: + with multiprocessing.Pool(processes, maxtasksperchild=maxtaskperchild) as p: for result in p.imap_unordered(Task(f), list_of_args): if isinstance(result, ErrorFromWorker): raise RuntimeError(result.stacktrace if debug else str(result)) |