summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/spack/spack/environment/environment.py6
-rw-r--r--lib/spack/spack/util/parallel.py9
2 files changed, 12 insertions, 3 deletions
diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py
index 3fd75f3d70..85c10e366b 100644
--- a/lib/spack/spack/environment/environment.py
+++ b/lib/spack/spack/environment/environment.py
@@ -1525,7 +1525,11 @@ class Environment:
batch = []
for j, (i, concrete, duration) in enumerate(
spack.util.parallel.imap_unordered(
- _concretize_task, args, processes=num_procs, debug=tty.is_debug()
+ _concretize_task,
+ args,
+ processes=num_procs,
+ debug=tty.is_debug(),
+ maxtaskperchild=1,
)
):
batch.append((i, concrete))
diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py
index 683835641a..c8e6ef7907 100644
--- a/lib/spack/spack/util/parallel.py
+++ b/lib/spack/spack/util/parallel.py
@@ -6,6 +6,7 @@ import multiprocessing
import os
import sys
import traceback
+from typing import Optional
class ErrorFromWorker:
@@ -53,7 +54,9 @@ class Task:
return value
-def imap_unordered(f, list_of_args, *, processes: int, debug=False):
+def imap_unordered(
+ f, list_of_args, *, processes: int, maxtaskperchild: Optional[int] = None, debug=False
+):
"""Wrapper around multiprocessing.Pool.imap_unordered.
Args:
@@ -62,6 +65,8 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False):
processes: maximum number of processes allowed
debug: if False, raise an exception containing just the error messages
from workers, if True an exception with complete stacktraces
+ maxtaskperchild: number of tasks to be executed by a child before being
+ killed and substituted
Raises:
RuntimeError: if any error occurred in the worker processes
@@ -70,7 +75,7 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False):
yield from map(f, list_of_args)
return
- with multiprocessing.Pool(processes) as p:
+ with multiprocessing.Pool(processes, maxtasksperchild=maxtaskperchild) as p:
for result in p.imap_unordered(Task(f), list_of_args):
if isinstance(result, ErrorFromWorker):
raise RuntimeError(result.stacktrace if debug else str(result))