diff options
author | Harmen Stoppels <me@harmenstoppels.nl> | 2024-08-13 08:12:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-13 08:12:48 +0200 |
commit | a66586d749197841bd74e289802126f2359287a8 (patch) | |
tree | 89557db34408e4f19d7de725dd89d4276c702fb0 /lib | |
parent | 6b73f00310db34c054878c51d8ccf5092bc2e607 (diff) | |
download | spack-a66586d749197841bd74e289802126f2359287a8.tar.gz spack-a66586d749197841bd74e289802126f2359287a8.tar.bz2 spack-a66586d749197841bd74e289802126f2359287a8.tar.xz spack-a66586d749197841bd74e289802126f2359287a8.zip |
spack buildcache push: best effort (#45631)
"spack buildcache push" for partially installed environments pushes all it
can by default, and only dumps errors towards the end.
If --fail-fast is provided, error out before pushing anything if any
of the packages is uninstalled
oci build caches using parallel push now use futures to ensure pushing
goes in best-effort style.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/spack/docs/conf.py | 1 | ||||
-rw-r--r-- | lib/spack/spack/binary_distribution.py | 56 | ||||
-rw-r--r-- | lib/spack/spack/cmd/buildcache.py | 278 | ||||
-rw-r--r-- | lib/spack/spack/test/cmd/buildcache.py | 53 | ||||
-rw-r--r-- | lib/spack/spack/test/conftest.py | 7 | ||||
-rw-r--r-- | lib/spack/spack/test/oci/integration_test.py | 75 | ||||
-rw-r--r-- | lib/spack/spack/util/parallel.py | 24 |
7 files changed, 323 insertions, 171 deletions
diff --git a/lib/spack/docs/conf.py b/lib/spack/docs/conf.py index e0542640a2..9819d6f4d3 100644 --- a/lib/spack/docs/conf.py +++ b/lib/spack/docs/conf.py @@ -206,6 +206,7 @@ nitpick_ignore = [ ("py:class", "six.moves.urllib.parse.ParseResult"), ("py:class", "TextIO"), ("py:class", "hashlib._Hash"), + ("py:class", "concurrent.futures._base.Executor"), # Spack classes that are private and we don't want to expose ("py:class", "spack.provider_index._IndexBase"), ("py:class", "spack.repo._PrependFileLoader"), diff --git a/lib/spack/spack/binary_distribution.py b/lib/spack/spack/binary_distribution.py index 6108e69548..26b6396f26 100644 --- a/lib/spack/spack/binary_distribution.py +++ b/lib/spack/spack/binary_distribution.py @@ -22,7 +22,7 @@ import urllib.parse import urllib.request import warnings from contextlib import closing -from typing import Dict, Iterable, List, NamedTuple, Optional, Set, Tuple +from typing import Dict, Iterable, NamedTuple, Optional, Set, Tuple import llnl.util.filesystem as fsys import llnl.util.lang @@ -46,7 +46,6 @@ import spack.relocate as relocate import spack.repo import spack.stage import spack.store -import spack.traverse as traverse import spack.util.archive import spack.util.crypto import spack.util.file_cache as file_cache @@ -1201,59 +1200,6 @@ def _build_tarball_in_stage_dir(spec: Spec, out_url: str, stage_dir: str, option generate_package_index(url_util.join(out_url, os.path.relpath(cache_prefix, stage_dir))) -class NotInstalledError(spack.error.SpackError): - """Raised when a spec is not installed but picked to be packaged.""" - - def __init__(self, specs: List[Spec]): - super().__init__( - "Cannot push non-installed packages", - ", ".join(s.cformat("{name}{@version}{/hash:7}") for s in specs), - ) - - -def specs_to_be_packaged( - specs: List[Spec], root: bool = True, dependencies: bool = True -) -> List[Spec]: - """Return the list of nodes to be packaged, given a list of specs. - Raises NotInstalledError if a spec is not installed but picked to be packaged. - - Args: - specs: list of root specs to be processed - root: include the root of each spec in the nodes - dependencies: include the dependencies of each - spec in the nodes - """ - - if not root and not dependencies: - return [] - - # Filter packageable roots - with spack.store.STORE.db.read_transaction(): - if root: - # Error on uninstalled roots, when roots are requested - uninstalled_roots = list(s for s in specs if not s.installed) - if uninstalled_roots: - raise NotInstalledError(uninstalled_roots) - roots = specs - else: - roots = [] - - if dependencies: - # Error on uninstalled deps, when deps are requested - deps = list( - traverse.traverse_nodes( - specs, deptype="all", order="breadth", root=False, key=traverse.by_dag_hash - ) - ) - uninstalled_deps = list(s for s in deps if not s.installed) - if uninstalled_deps: - raise NotInstalledError(uninstalled_deps) - else: - deps = [] - - return [s for s in itertools.chain(roots, deps) if not s.external] - - def try_verify(specfile_path): """Utility function to attempt to verify a local file. Assumes the file is a clearsigned signature file. diff --git a/lib/spack/spack/cmd/buildcache.py b/lib/spack/spack/cmd/buildcache.py index a2dace6003..78d8e7a00c 100644 --- a/lib/spack/spack/cmd/buildcache.py +++ b/lib/spack/spack/cmd/buildcache.py @@ -3,25 +3,25 @@ # # SPDX-License-Identifier: (Apache-2.0 OR MIT) import argparse +import concurrent.futures import copy import glob import hashlib import json -import multiprocessing -import multiprocessing.pool import os import shutil import sys import tempfile -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple import llnl.util.tty as tty from llnl.string import plural -from llnl.util.lang import elide_list +from llnl.util.lang import elide_list, stable_partition import spack.binary_distribution as bindist import spack.cmd import spack.config +import spack.deptypes as dt import spack.environment as ev import spack.error import spack.hash_types as ht @@ -35,10 +35,10 @@ import spack.stage import spack.store import spack.user_environment import spack.util.crypto +import spack.util.parallel import spack.util.url as url_util import spack.util.web as web_util from spack import traverse -from spack.build_environment import determine_number_of_jobs from spack.cmd import display_specs from spack.cmd.common import arguments from spack.oci.image import ( @@ -112,6 +112,17 @@ def setup_parser(subparser: argparse.ArgumentParser): "Alternatively, one can decide to build a cache for only the package or only the " "dependencies", ) + with_or_without_build_deps = push.add_mutually_exclusive_group() + with_or_without_build_deps.add_argument( + "--with-build-dependencies", + action="store_true", + help="include build dependencies in the buildcache", + ) + with_or_without_build_deps.add_argument( + "--without-build-dependencies", + action="store_true", + help="exclude build dependencies from the buildcache", + ) push.add_argument( "--fail-fast", action="store_true", @@ -336,32 +347,6 @@ def _progress(i: int, total: int): return "" -class NoPool: - def map(self, func, args): - return [func(a) for a in args] - - def starmap(self, func, args): - return [func(*a) for a in args] - - def __enter__(self): - return self - - def __exit__(self, *args): - pass - - -MaybePool = Union[multiprocessing.pool.Pool, NoPool] - - -def _make_pool() -> MaybePool: - """Can't use threading because it's unsafe, and can't use spawned processes because of globals. - That leaves only forking""" - if multiprocessing.get_start_method() == "fork": - return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True)) - else: - return NoPool() - - def _skip_no_redistribute_for_public(specs): remaining_specs = list() removed_specs = list() @@ -381,6 +366,47 @@ def _skip_no_redistribute_for_public(specs): return remaining_specs +class PackagesAreNotInstalledError(spack.error.SpackError): + """Raised when a list of specs is not installed but picked to be packaged.""" + + def __init__(self, specs: List[Spec]): + super().__init__( + "Cannot push non-installed packages", + ", ".join(elide_list(list(_format_spec(s) for s in specs), 5)), + ) + + +class PackageNotInstalledError(spack.error.SpackError): + """Raised when a spec is not installed but picked to be packaged.""" + + +class MissingLayerError(spack.error.SpackError): + """Raised when a required layer for a dependency is missing in an OCI registry.""" + + +def _specs_to_be_packaged( + requested: List[Spec], things_to_install: str, build_deps: bool +) -> List[Spec]: + """Collect all non-external with or without roots and dependencies""" + if "dependencies" not in things_to_install: + deptype = dt.NONE + elif build_deps: + deptype = dt.ALL + else: + deptype = dt.RUN | dt.LINK | dt.TEST + return [ + s + for s in traverse.traverse_nodes( + requested, + root="package" in things_to_install, + deptype=deptype, + order="breadth", + key=traverse.by_dag_hash, + ) + if not s.external + ] + + def push_fn(args): """create a binary package and push it to a mirror""" if args.spec_file: @@ -420,40 +446,52 @@ def push_fn(args): "Use --unsigned to silence this warning." ) - # This is a list of installed, non-external specs. - specs = bindist.specs_to_be_packaged( + specs = _specs_to_be_packaged( roots, - root="package" in args.things_to_install, - dependencies="dependencies" in args.things_to_install, + things_to_install=args.things_to_install, + build_deps=args.with_build_dependencies or not args.without_build_dependencies, ) + if not args.private: specs = _skip_no_redistribute_for_public(specs) - # When pushing multiple specs, print the url once ahead of time, as well as how - # many specs are being pushed. if len(specs) > 1: tty.info(f"Selected {len(specs)} specs to push to {push_url}") - failed = [] + # Pushing not installed specs is an error. Either fail fast or populate the error list and + # push installed package in best effort mode. + failed: List[Tuple[Spec, BaseException]] = [] + with spack.store.STORE.db.read_transaction(): + if any(not s.installed for s in specs): + specs, not_installed = stable_partition(specs, lambda s: s.installed) + if args.fail_fast: + raise PackagesAreNotInstalledError(not_installed) + else: + failed.extend( + (s, PackageNotInstalledError("package not installed")) for s in not_installed + ) - # TODO: unify this logic in the future. + # TODO: move into bindist.push_or_raise if target_image: base_image = ImageReference.from_string(args.base_image) if args.base_image else None with tempfile.TemporaryDirectory( dir=spack.stage.get_stage_root() - ) as tmpdir, _make_pool() as pool: - skipped, base_images, checksums = _push_oci( + ) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor: + skipped, base_images, checksums, upload_errors = _push_oci( target_image=target_image, base_image=base_image, installed_specs_with_deps=specs, force=args.force, tmpdir=tmpdir, - pool=pool, + executor=executor, ) + if upload_errors: + failed.extend(upload_errors) + # Apart from creating manifests for each individual spec, we allow users to create a # separate image tag for all root specs and their runtime dependencies. - if args.tag: + elif args.tag: tagged_image = target_image.with_tag(args.tag) # _push_oci may not populate base_images if binaries were already in the registry for spec in roots: @@ -496,7 +534,7 @@ def push_fn(args): e, (bindist.PickKeyException, bindist.NoKeyException) ): raise - failed.append((_format_spec(spec), e)) + failed.append((spec, e)) if skipped: if len(specs) == 1: @@ -519,7 +557,13 @@ def push_fn(args): raise spack.error.SpackError( f"The following {len(failed)} errors occurred while pushing specs to the buildcache", "\n".join( - elide_list([f" {spec}: {e.__class__.__name__}: {e}" for spec, e in failed], 5) + elide_list( + [ + f" {_format_spec(spec)}: {e.__class__.__name__}: {e}" + for spec, e in failed + ], + 5, + ) ), ) @@ -529,8 +573,8 @@ def push_fn(args): if target_image and len(skipped) < len(specs) and args.update_index: with tempfile.TemporaryDirectory( dir=spack.stage.get_stage_root() - ) as tmpdir, _make_pool() as pool: - _update_index_oci(target_image, tmpdir, pool) + ) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor: + _update_index_oci(target_image, tmpdir, executor) def _get_spack_binary_blob(image_ref: ImageReference) -> Optional[spack.oci.oci.Blob]: @@ -604,17 +648,12 @@ def _put_manifest( ): architecture = _archspec_to_gooarch(specs[0]) - dependencies = list( - reversed( - list( - s - for s in traverse.traverse_nodes( - specs, order="topo", deptype=("link", "run"), root=True - ) - if not s.external - ) - ) - ) + expected_blobs: List[Spec] = [ + s + for s in traverse.traverse_nodes(specs, order="topo", deptype=("link", "run"), root=True) + if not s.external + ] + expected_blobs.reverse() base_manifest, base_config = base_images[architecture] env = _retrieve_env_dict_from_config(base_config) @@ -633,9 +672,16 @@ def _put_manifest( # Create an oci.image.config file config = copy.deepcopy(base_config) - # Add the diff ids of the dependencies - for s in dependencies: - config["rootfs"]["diff_ids"].append(str(checksums[s.dag_hash()].uncompressed_digest)) + # Add the diff ids of the blobs + for s in expected_blobs: + # If a layer for a dependency has gone missing (due to removed manifest in the registry, a + # failed push, or a local forced uninstall), we cannot create a runnable container image. + # If an OCI registry is only used for storage, this is not a hard error, but for now we + # raise an exception unconditionally, until someone requests a more lenient behavior. + checksum = checksums.get(s.dag_hash()) + if not checksum: + raise MissingLayerError(f"missing layer for {_format_spec(s)}") + config["rootfs"]["diff_ids"].append(str(checksum.uncompressed_digest)) # Set the environment variables config["config"]["Env"] = [f"{k}={v}" for k, v in env.items()] @@ -679,7 +725,7 @@ def _put_manifest( "digest": str(checksums[s.dag_hash()].compressed_digest), "size": checksums[s.dag_hash()].size, } - for s in dependencies + for s in expected_blobs ), ], } @@ -724,9 +770,14 @@ def _push_oci( base_image: Optional[ImageReference], installed_specs_with_deps: List[Spec], tmpdir: str, - pool: MaybePool, + executor: concurrent.futures.Executor, force: bool = False, -) -> Tuple[List[str], Dict[str, Tuple[dict, dict]], Dict[str, spack.oci.oci.Blob]]: +) -> Tuple[ + List[str], + Dict[str, Tuple[dict, dict]], + Dict[str, spack.oci.oci.Blob], + List[Tuple[Spec, BaseException]], +]: """Push specs to an OCI registry Args: @@ -739,7 +790,8 @@ def _push_oci( Returns: A tuple consisting of the list of skipped specs already in the build cache, a dictionary mapping architectures to base image manifests and configs, - and a dictionary mapping each spec's dag hash to a blob. + a dictionary mapping each spec's dag hash to a blob, + and a list of tuples of specs with errors of failed uploads. """ # Reverse the order @@ -756,39 +808,50 @@ def _push_oci( if not force: tty.info("Checking for existing specs in the buildcache") - to_be_uploaded = [] + blobs_to_upload = [] tags_to_check = (target_image.with_tag(default_tag(s)) for s in installed_specs_with_deps) - available_blobs = pool.map(_get_spack_binary_blob, tags_to_check) + available_blobs = executor.map(_get_spack_binary_blob, tags_to_check) for spec, maybe_blob in zip(installed_specs_with_deps, available_blobs): if maybe_blob is not None: checksums[spec.dag_hash()] = maybe_blob skipped.append(_format_spec(spec)) else: - to_be_uploaded.append(spec) + blobs_to_upload.append(spec) else: - to_be_uploaded = installed_specs_with_deps + blobs_to_upload = installed_specs_with_deps - if not to_be_uploaded: - return skipped, base_images, checksums + if not blobs_to_upload: + return skipped, base_images, checksums, [] tty.info( - f"{len(to_be_uploaded)} specs need to be pushed to " + f"{len(blobs_to_upload)} specs need to be pushed to " f"{target_image.domain}/{target_image.name}" ) # Upload blobs - new_blobs = pool.starmap( - _push_single_spack_binary_blob, ((target_image, spec, tmpdir) for spec in to_be_uploaded) - ) - - # And update the spec to blob mapping - for spec, blob in zip(to_be_uploaded, new_blobs): - checksums[spec.dag_hash()] = blob + blob_futures = [ + executor.submit(_push_single_spack_binary_blob, target_image, spec, tmpdir) + for spec in blobs_to_upload + ] + + concurrent.futures.wait(blob_futures) + + manifests_to_upload: List[Spec] = [] + errors: List[Tuple[Spec, BaseException]] = [] + + # And update the spec to blob mapping for successful uploads + for spec, blob_future in zip(blobs_to_upload, blob_futures): + error = blob_future.exception() + if error is None: + manifests_to_upload.append(spec) + checksums[spec.dag_hash()] = blob_future.result() + else: + errors.append((spec, error)) # Copy base images if necessary - for spec in to_be_uploaded: + for spec in manifests_to_upload: _update_base_images( base_image=base_image, target_image=target_image, @@ -807,30 +870,35 @@ def _push_oci( # Upload manifests tty.info("Uploading manifests") - pool.starmap( - _put_manifest, - ( - ( - base_images, - checksums, - target_image.with_tag(default_tag(spec)), - tmpdir, - extra_config(spec), - {"org.opencontainers.image.description": spec.format()}, - spec, - ) - for spec in to_be_uploaded - ), - ) + manifest_futures = [ + executor.submit( + _put_manifest, + base_images, + checksums, + target_image.with_tag(default_tag(spec)), + tmpdir, + extra_config(spec), + {"org.opencontainers.image.description": spec.format()}, + spec, + ) + for spec in manifests_to_upload + ] + + concurrent.futures.wait(manifest_futures) # Print the image names of the top-level specs - for spec in to_be_uploaded: - tty.info(f"Pushed {_format_spec(spec)} to {target_image.with_tag(default_tag(spec))}") + for spec, manifest_future in zip(manifests_to_upload, manifest_futures): + error = manifest_future.exception() + if error is None: + tty.info(f"Pushed {_format_spec(spec)} to {target_image.with_tag(default_tag(spec))}") + else: + errors.append((spec, error)) - return skipped, base_images, checksums + return skipped, base_images, checksums, errors -def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]: +def _config_from_tag(image_ref_and_tag: Tuple[ImageReference, str]) -> Optional[dict]: + image_ref, tag = image_ref_and_tag # Don't allow recursion here, since Spack itself always uploads # vnd.oci.image.manifest.v1+json, not vnd.oci.image.index.v1+json _, config = get_manifest_and_config_with_retry(image_ref.with_tag(tag), tag, recurse=0) @@ -840,13 +908,13 @@ def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]: return config if "spec" in config else None -def _update_index_oci(image_ref: ImageReference, tmpdir: str, pool: MaybePool) -> None: +def _update_index_oci( + image_ref: ImageReference, tmpdir: str, pool: concurrent.futures.Executor +) -> None: tags = list_tags(image_ref) # Fetch all image config files in parallel - spec_dicts = pool.starmap( - _config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag)) - ) + spec_dicts = pool.map(_config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag))) # Populate the database db_root_dir = os.path.join(tmpdir, "db_root") @@ -1182,8 +1250,8 @@ def update_index(mirror: spack.mirror.Mirror, update_keys=False): if image_ref: with tempfile.TemporaryDirectory( dir=spack.stage.get_stage_root() - ) as tmpdir, _make_pool() as pool: - _update_index_oci(image_ref, tmpdir, pool) + ) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor: + _update_index_oci(image_ref, tmpdir, executor) return # Otherwise, assume a normal mirror. diff --git a/lib/spack/spack/test/cmd/buildcache.py b/lib/spack/spack/test/cmd/buildcache.py index b634e37116..eee8c160f1 100644 --- a/lib/spack/spack/test/cmd/buildcache.py +++ b/lib/spack/spack/test/cmd/buildcache.py @@ -12,7 +12,9 @@ import pytest import spack.binary_distribution import spack.cmd.buildcache +import spack.deptypes import spack.environment as ev +import spack.error import spack.main import spack.spec import spack.util.url @@ -443,3 +445,54 @@ def test_skip_no_redistribute(mock_packages, config): filtered = spack.cmd.buildcache._skip_no_redistribute_for_public(specs) assert not any(s.name == "no-redistribute" for s in filtered) assert any(s.name == "no-redistribute-dependent" for s in filtered) + + +def test_best_effort_vs_fail_fast_when_dep_not_installed(tmp_path, mutable_database): + """When --fail-fast is passed, the push command should fail if it immediately finds an + uninstalled dependency. Otherwise, failure to push one dependency shouldn't prevent the + others from being pushed.""" + + mirror("add", "--unsigned", "my-mirror", str(tmp_path)) + + # Uninstall mpich so that its dependent mpileaks can't be pushed + for s in mutable_database.query_local("mpich"): + s.package.do_uninstall(force=True) + + with pytest.raises(spack.cmd.buildcache.PackagesAreNotInstalledError, match="mpich"): + buildcache("push", "--update-index", "--fail-fast", "my-mirror", "mpileaks^mpich") + + # nothing should be pushed due to --fail-fast. + assert not os.listdir(tmp_path) + assert not spack.binary_distribution.update_cache_and_get_specs() + + with pytest.raises(spack.cmd.buildcache.PackageNotInstalledError): + buildcache("push", "--update-index", "my-mirror", "mpileaks^mpich") + + specs = spack.binary_distribution.update_cache_and_get_specs() + + # everything but mpich should be pushed + mpileaks = mutable_database.query_local("mpileaks^mpich")[0] + assert set(specs) == {s for s in mpileaks.traverse() if s.name != "mpich"} + + +def test_push_without_build_deps(tmp_path, temporary_store, mock_packages, mutable_config): + """Spack should not error when build deps are uninstalled and --without-build-dependenies is + passed.""" + + mirror("add", "--unsigned", "my-mirror", str(tmp_path)) + + s = spack.spec.Spec("dtrun3").concretized() + s.package.do_install(fake=True) + s["dtbuild3"].package.do_uninstall() + + # fails when build deps are required + with pytest.raises(spack.error.SpackError, match="package not installed"): + buildcache( + "push", "--update-index", "--with-build-dependencies", "my-mirror", f"/{s.dag_hash()}" + ) + + # succeeds when build deps are not required + buildcache( + "push", "--update-index", "--without-build-dependencies", "my-mirror", f"/{s.dag_hash()}" + ) + assert spack.binary_distribution.update_cache_and_get_specs() == [s] diff --git a/lib/spack/spack/test/conftest.py b/lib/spack/spack/test/conftest.py index 00d7980a55..8a1f887fbe 100644 --- a/lib/spack/spack/test/conftest.py +++ b/lib/spack/spack/test/conftest.py @@ -56,6 +56,7 @@ import spack.test.cray_manifest import spack.util.executable import spack.util.git import spack.util.gpg +import spack.util.parallel import spack.util.spack_yaml as syaml import spack.util.url as url_util import spack.version @@ -1961,10 +1962,12 @@ def pytest_runtest_setup(item): pytest.skip(*not_on_windows_marker.args) -@pytest.fixture(scope="function") +@pytest.fixture(autouse=True) def disable_parallel_buildcache_push(monkeypatch): """Disable process pools in tests.""" - monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", spack.cmd.buildcache.NoPool) + monkeypatch.setattr( + spack.util.parallel, "make_concurrent_executor", spack.util.parallel.SequentialExecutor + ) def _root_path(x, y, *, path): diff --git a/lib/spack/spack/test/oci/integration_test.py b/lib/spack/spack/test/oci/integration_test.py index c4e2636619..cefd95f92d 100644 --- a/lib/spack/spack/test/oci/integration_test.py +++ b/lib/spack/spack/test/oci/integration_test.py @@ -10,12 +10,19 @@ import hashlib import json import os import pathlib +import re from contextlib import contextmanager +import pytest + +import spack.cmd.buildcache +import spack.database import spack.environment as ev +import spack.error import spack.oci.opener +import spack.spec from spack.main import SpackCommand -from spack.oci.image import Digest, ImageReference, default_config, default_manifest +from spack.oci.image import Digest, ImageReference, default_config, default_manifest, default_tag from spack.oci.oci import blob_exists, get_manifest_and_config, upload_blob, upload_manifest from spack.test.oci.mock_registry import DummyServer, InMemoryOCIRegistry, create_opener from spack.util.archive import gzip_compressed_tarfile @@ -34,7 +41,7 @@ def oci_servers(*servers: DummyServer): spack.oci.opener.urlopen = old_opener -def test_buildcache_push_command(mutable_database, disable_parallel_buildcache_push): +def test_buildcache_push_command(mutable_database): with oci_servers(InMemoryOCIRegistry("example.com")): mirror("add", "oci-test", "oci://example.com/image") @@ -57,9 +64,7 @@ def test_buildcache_push_command(mutable_database, disable_parallel_buildcache_p assert os.path.exists(os.path.join(spec.prefix, "bin", "mpileaks")) -def test_buildcache_tag( - install_mockery, mock_fetch, mutable_mock_env_path, disable_parallel_buildcache_push -): +def test_buildcache_tag(install_mockery, mock_fetch, mutable_mock_env_path): """Tests whether we can create an OCI image from a full environment with multiple roots.""" env("create", "test") with ev.read("test"): @@ -97,9 +102,7 @@ def test_buildcache_tag( assert len(manifest["layers"]) == 1 -def test_buildcache_push_with_base_image_command( - mutable_database, tmpdir, disable_parallel_buildcache_push -): +def test_buildcache_push_with_base_image_command(mutable_database, tmpdir): """Test that we can push a package with a base image to an OCI registry. This test is a bit involved, cause we have to create a small base image.""" @@ -200,7 +203,7 @@ def test_buildcache_push_with_base_image_command( def test_uploading_with_base_image_in_docker_image_manifest_v2_format( - tmp_path: pathlib.Path, mutable_database, disable_parallel_buildcache_push + tmp_path: pathlib.Path, mutable_database ): """If the base image uses an old manifest schema, Spack should also use that. That is necessary for container images to work with Apptainer, which is rather strict about @@ -286,3 +289,57 @@ def test_uploading_with_base_image_in_docker_image_manifest_v2_format( for layer in m["layers"]: assert layer["mediaType"] == "application/vnd.docker.image.rootfs.diff.tar.gzip" assert "annotations" not in m + + +def test_best_effort_upload(mutable_database: spack.database.Database, monkeypatch): + """Failure to upload a blob or manifest should not prevent others from being uploaded""" + + _push_blob = spack.cmd.buildcache._push_single_spack_binary_blob + _push_manifest = spack.cmd.buildcache._put_manifest + + def push_blob(image_ref, spec, tmpdir): + # fail to upload the blob of mpich + if spec.name == "mpich": + raise Exception("Blob Server Error") + return _push_blob(image_ref, spec, tmpdir) + + def put_manifest(base_images, checksums, image_ref, tmpdir, extra_config, annotations, *specs): + # fail to upload the manifest of libdwarf + if "libdwarf" in (s.name for s in specs): + raise Exception("Manifest Server Error") + return _push_manifest( + base_images, checksums, image_ref, tmpdir, extra_config, annotations, *specs + ) + + monkeypatch.setattr(spack.cmd.buildcache, "_push_single_spack_binary_blob", push_blob) + monkeypatch.setattr(spack.cmd.buildcache, "_put_manifest", put_manifest) + + registry = InMemoryOCIRegistry("example.com") + with oci_servers(registry): + mirror("add", "oci-test", "oci://example.com/image") + + with pytest.raises(spack.error.SpackError, match="The following 4 errors occurred") as e: + buildcache("push", "--update-index", "oci-test", "mpileaks^mpich") + + error = str(e.value) + + # mpich's blob failed to upload + assert re.search("mpich.+: Exception: Blob Server Error", error) + + # libdwarf's manifest failed to upload + assert re.search("libdwarf.+: Exception: Manifest Server Error", error) + + # since there is no blob for mpich, runtime dependents cannot refer to it in their + # manifests, which is a transitive error. + assert re.search("callpath.+: MissingLayerError: missing layer for mpich", error) + assert re.search("mpileaks.+: MissingLayerError: missing layer for mpich", error) + + mpileaks: spack.spec.Spec = mutable_database.query_local("mpileaks^mpich")[0] + + # ensure that packages not affected by errors were uploaded still. + uploaded_tags = {tag for _, tag in registry.manifests.keys()} + failures = {"mpich", "libdwarf", "callpath", "mpileaks"} + expected_tags = {default_tag(s) for s in mpileaks.traverse() if s.name not in failures} + + assert expected_tags + assert uploaded_tags == expected_tags diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py index 9c0ff1ab8e..28c55b7d1e 100644 --- a/lib/spack/spack/util/parallel.py +++ b/lib/spack/spack/util/parallel.py @@ -2,12 +2,15 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) +import concurrent.futures import multiprocessing import os import sys import traceback from typing import Optional +from spack.util.cpus import determine_number_of_jobs + class ErrorFromWorker: """Wrapper class to report an error from a worker process""" @@ -80,3 +83,24 @@ def imap_unordered( if isinstance(result, ErrorFromWorker): raise RuntimeError(result.stacktrace if debug else str(result)) yield result + + +class SequentialExecutor(concurrent.futures.Executor): + """Executor that runs tasks sequentially in the current thread.""" + + def submit(self, fn, *args, **kwargs): + """Submit a function to be executed.""" + future = concurrent.futures.Future() + try: + future.set_result(fn(*args, **kwargs)) + except Exception as e: + future.set_exception(e) + return future + + +def make_concurrent_executor() -> concurrent.futures.Executor: + """Can't use threading because it's unsafe, and can't use spawned processes because of globals. + That leaves only forking.""" + if multiprocessing.get_start_method() == "fork": + return concurrent.futures.ProcessPoolExecutor(determine_number_of_jobs(parallel=True)) + return SequentialExecutor() |