summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/spack/environment/environment.py3
-rw-r--r--lib/spack/spack/util/parallel.py39
2 files changed, 25 insertions, 17 deletions
diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py
index f08113bfc0..5304f2b135 100644
--- a/lib/spack/spack/environment/environment.py
+++ b/lib/spack/spack/environment/environment.py
@@ -1161,7 +1161,8 @@ class Environment(object):
tty.msg(msg)
concretized_root_specs = spack.util.parallel.parallel_map(
- _concretize_task, arguments, max_processes=max_processes
+ _concretize_task, arguments, max_processes=max_processes,
+ debug=tty.is_debug()
)
finish = time.time()
diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py
index 04562a1801..b2607f8b0e 100644
--- a/lib/spack/spack/util/parallel.py
+++ b/lib/spack/spack/util/parallel.py
@@ -10,8 +10,6 @@ import os
import sys
import traceback
-import six
-
from .cpus import cpus_available
@@ -28,12 +26,17 @@ class ErrorFromWorker(object):
exc: exception raised from the worker process
"""
self.pid = os.getpid()
- self.error_message = ''.join(traceback.format_exception(exc_cls, exc, tb))
+ self.error_message = str(exc)
+ self.stacktrace_message = ''.join(traceback.format_exception(exc_cls, exc, tb))
- def __str__(self):
- msg = "[PID={0.pid}] {0.error_message}"
+ @property
+ def stacktrace(self):
+ msg = "[PID={0.pid}] {0.stacktrace_message}"
return msg.format(self)
+ def __str__(self):
+ return self.error_message
+
class Task(object):
"""Wrapped task that trap every Exception and return it as an
@@ -53,29 +56,31 @@ class Task(object):
return value
-def raise_if_errors(*results):
+def raise_if_errors(*results, **kwargs):
"""Analyze results from worker Processes to search for ErrorFromWorker
objects. If found print all of them and raise an exception.
Args:
*results: results from worker processes
+ debug: if True show complete stacktraces
Raise:
RuntimeError: if ErrorFromWorker objects are in the results
"""
- err_stream = six.StringIO() # sys.stderr
+ debug = kwargs.get('debug', False) # This can be a keyword only arg in Python 3
errors = [x for x in results if isinstance(x, ErrorFromWorker)]
if not errors:
return
- # Report the errors and then raise
- for error in errors:
- print(error, file=err_stream)
+ msg = '\n'.join([
+ error.stacktrace if debug else str(error) for error in errors
+ ])
+
+ error_fmt = '{0}'
+ if len(errors) > 1 and not debug:
+ error_fmt = 'errors occurred during concretization of the environment:\n{0}'
- print('[PARENT PROCESS]:', file=err_stream)
- traceback.print_stack(file=err_stream)
- error_msg = 'errors occurred in worker processes:\n{0}'
- raise RuntimeError(error_msg.format(err_stream.getvalue()))
+ raise RuntimeError(error_fmt.format(msg))
@contextlib.contextmanager
@@ -108,13 +113,15 @@ def num_processes(max_processes=None):
return min(cpus_available(), max_processes)
-def parallel_map(func, arguments, max_processes=None):
+def parallel_map(func, arguments, max_processes=None, debug=False):
"""Map a task object to the list of arguments, return the list of results.
Args:
func (Task): user defined task object
arguments (list): list of arguments for the task
max_processes (int or None): maximum number of processes allowed
+ debug (bool): if False, raise an exception containing just the error messages
+ from workers, if True an exception with complete stacktraces
Raises:
RuntimeError: if any error occurred in the worker processes
@@ -125,5 +132,5 @@ def parallel_map(func, arguments, max_processes=None):
results = p.map(task_wrapper, arguments)
else:
results = list(map(task_wrapper, arguments))
- raise_if_errors(*results)
+ raise_if_errors(*results, debug=debug)
return results