From fe5d7881f5a5ec72e2ffbeccc647968dee44214e Mon Sep 17 00:00:00 2001 From: Harmen Stoppels Date: Thu, 19 Sep 2024 10:34:23 +0200 Subject: avoid multiprocessing in tests (#46439) - silences a few pytest warnings related to forking in xdist - speeds up a few tests / avoids possible oversubscription in xdist --- lib/spack/spack/detection/path.py | 3 ++- lib/spack/spack/stage.py | 4 ++-- lib/spack/spack/test/conftest.py | 8 +++++--- lib/spack/spack/util/parallel.py | 19 ++++++++++++------- lib/spack/spack/util/web.py | 4 ++-- 5 files changed, 23 insertions(+), 15 deletions(-) (limited to 'lib') diff --git a/lib/spack/spack/detection/path.py b/lib/spack/spack/detection/path.py index 3588d96115..7316d9124a 100644 --- a/lib/spack/spack/detection/path.py +++ b/lib/spack/spack/detection/path.py @@ -23,6 +23,7 @@ import spack.util.elf as elf_utils import spack.util.environment import spack.util.environment as environment import spack.util.ld_so_conf +import spack.util.parallel from .common import ( WindowsCompilerExternalPaths, @@ -407,7 +408,7 @@ def by_path( result = collections.defaultdict(list) repository = spack.repo.PATH.ensure_unwrapped() - with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: + with spack.util.parallel.make_concurrent_executor(max_workers, require_fork=False) as executor: for pkg in packages_to_search: executable_future = executor.submit( executables_finder.find, diff --git a/lib/spack/spack/stage.py b/lib/spack/spack/stage.py index 776b3ce6b9..ee3b4e95e5 100644 --- a/lib/spack/spack/stage.py +++ b/lib/spack/spack/stage.py @@ -2,7 +2,6 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) -import concurrent.futures import errno import getpass import glob @@ -40,6 +39,7 @@ import spack.resource import spack.spec import spack.util.crypto import spack.util.lock +import spack.util.parallel import spack.util.path as sup import spack.util.pattern as pattern import spack.util.url as url_util @@ -1132,7 +1132,7 @@ def get_checksums_for_versions( if checksum is not None: version_hashes[version] = checksum - with concurrent.futures.ProcessPoolExecutor(max_workers=concurrency) as executor: + with spack.util.parallel.make_concurrent_executor(concurrency, require_fork=False) as executor: results = [] for url, version in search_arguments: future = executor.submit(_fetch_and_checksum, url, fetch_options, keep_stage) diff --git a/lib/spack/spack/test/conftest.py b/lib/spack/spack/test/conftest.py index 607844265f..2726061cb3 100644 --- a/lib/spack/spack/test/conftest.py +++ b/lib/spack/spack/test/conftest.py @@ -1980,12 +1980,14 @@ def pytest_runtest_setup(item): pytest.skip(*not_on_windows_marker.args) +def _sequential_executor(*args, **kwargs): + return spack.util.parallel.SequentialExecutor() + + @pytest.fixture(autouse=True) def disable_parallel_buildcache_push(monkeypatch): """Disable process pools in tests.""" - monkeypatch.setattr( - spack.util.parallel, "make_concurrent_executor", spack.util.parallel.SequentialExecutor - ) + monkeypatch.setattr(spack.util.parallel, "make_concurrent_executor", _sequential_executor) def _root_path(x, y, *, path): diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py index 9bbdf5dd7a..b16adb8858 100644 --- a/lib/spack/spack/util/parallel.py +++ b/lib/spack/spack/util/parallel.py @@ -9,7 +9,7 @@ import sys import traceback from typing import Optional -from spack.config import determine_number_of_jobs +import spack.config class ErrorFromWorker: @@ -98,9 +98,14 @@ class SequentialExecutor(concurrent.futures.Executor): return future -def make_concurrent_executor() -> concurrent.futures.Executor: - """Can't use threading because it's unsafe, and can't use spawned processes because of globals. - That leaves only forking.""" - if multiprocessing.get_start_method() == "fork": - return concurrent.futures.ProcessPoolExecutor(determine_number_of_jobs(parallel=True)) - return SequentialExecutor() +def make_concurrent_executor( + jobs: Optional[int] = None, *, require_fork: bool = True +) -> concurrent.futures.Executor: + """Create a concurrent executor. If require_fork is True, then the executor is sequential + if the platform does not enable forking as the default start method. Effectively + require_fork=True makes the executor sequential in the current process on Windows, macOS, and + Linux from Python 3.14+ (which changes defaults)""" + if require_fork and multiprocessing.get_start_method() != "fork": + return SequentialExecutor() + jobs = jobs or spack.config.determine_number_of_jobs(parallel=True) + return concurrent.futures.ProcessPoolExecutor(jobs) diff --git a/lib/spack/spack/util/web.py b/lib/spack/spack/util/web.py index 9a0f1d6e4b..892b64d333 100644 --- a/lib/spack/spack/util/web.py +++ b/lib/spack/spack/util/web.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: (Apache-2.0 OR MIT) import codecs -import concurrent.futures import email.message import errno import json @@ -30,6 +29,7 @@ from llnl.util.filesystem import mkdirp, rename, working_dir import spack.config import spack.error import spack.util.executable +import spack.util.parallel import spack.util.path import spack.util.url as url_util @@ -641,7 +641,7 @@ def spider( root = urllib.parse.urlparse(root_str) spider_args.append((root, go_deeper, _visited)) - with concurrent.futures.ProcessPoolExecutor(max_workers=concurrency) as tp: + with spack.util.parallel.make_concurrent_executor(concurrency, require_fork=False) as tp: while current_depth <= depth: tty.debug( f"SPIDER: [depth={current_depth}, max_depth={depth}, urls={len(spider_args)}]" -- cgit v1.2.3-70-g09d2