summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorHarmen Stoppels <me@harmenstoppels.nl>2024-08-13 08:12:48 +0200
committerGitHub <noreply@github.com>2024-08-13 08:12:48 +0200
commita66586d749197841bd74e289802126f2359287a8 (patch)
tree89557db34408e4f19d7de725dd89d4276c702fb0 /lib
parent6b73f00310db34c054878c51d8ccf5092bc2e607 (diff)
downloadspack-a66586d749197841bd74e289802126f2359287a8.tar.gz
spack-a66586d749197841bd74e289802126f2359287a8.tar.bz2
spack-a66586d749197841bd74e289802126f2359287a8.tar.xz
spack-a66586d749197841bd74e289802126f2359287a8.zip
spack buildcache push: best effort (#45631)
"spack buildcache push" for partially installed environments pushes all it can by default, and only dumps errors towards the end. If --fail-fast is provided, error out before pushing anything if any of the packages is uninstalled oci build caches using parallel push now use futures to ensure pushing goes in best-effort style.
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/docs/conf.py1
-rw-r--r--lib/spack/spack/binary_distribution.py56
-rw-r--r--lib/spack/spack/cmd/buildcache.py278
-rw-r--r--lib/spack/spack/test/cmd/buildcache.py53
-rw-r--r--lib/spack/spack/test/conftest.py7
-rw-r--r--lib/spack/spack/test/oci/integration_test.py75
-rw-r--r--lib/spack/spack/util/parallel.py24
7 files changed, 323 insertions, 171 deletions
diff --git a/lib/spack/docs/conf.py b/lib/spack/docs/conf.py
index e0542640a2..9819d6f4d3 100644
--- a/lib/spack/docs/conf.py
+++ b/lib/spack/docs/conf.py
@@ -206,6 +206,7 @@ nitpick_ignore = [
("py:class", "six.moves.urllib.parse.ParseResult"),
("py:class", "TextIO"),
("py:class", "hashlib._Hash"),
+ ("py:class", "concurrent.futures._base.Executor"),
# Spack classes that are private and we don't want to expose
("py:class", "spack.provider_index._IndexBase"),
("py:class", "spack.repo._PrependFileLoader"),
diff --git a/lib/spack/spack/binary_distribution.py b/lib/spack/spack/binary_distribution.py
index 6108e69548..26b6396f26 100644
--- a/lib/spack/spack/binary_distribution.py
+++ b/lib/spack/spack/binary_distribution.py
@@ -22,7 +22,7 @@ import urllib.parse
import urllib.request
import warnings
from contextlib import closing
-from typing import Dict, Iterable, List, NamedTuple, Optional, Set, Tuple
+from typing import Dict, Iterable, NamedTuple, Optional, Set, Tuple
import llnl.util.filesystem as fsys
import llnl.util.lang
@@ -46,7 +46,6 @@ import spack.relocate as relocate
import spack.repo
import spack.stage
import spack.store
-import spack.traverse as traverse
import spack.util.archive
import spack.util.crypto
import spack.util.file_cache as file_cache
@@ -1201,59 +1200,6 @@ def _build_tarball_in_stage_dir(spec: Spec, out_url: str, stage_dir: str, option
generate_package_index(url_util.join(out_url, os.path.relpath(cache_prefix, stage_dir)))
-class NotInstalledError(spack.error.SpackError):
- """Raised when a spec is not installed but picked to be packaged."""
-
- def __init__(self, specs: List[Spec]):
- super().__init__(
- "Cannot push non-installed packages",
- ", ".join(s.cformat("{name}{@version}{/hash:7}") for s in specs),
- )
-
-
-def specs_to_be_packaged(
- specs: List[Spec], root: bool = True, dependencies: bool = True
-) -> List[Spec]:
- """Return the list of nodes to be packaged, given a list of specs.
- Raises NotInstalledError if a spec is not installed but picked to be packaged.
-
- Args:
- specs: list of root specs to be processed
- root: include the root of each spec in the nodes
- dependencies: include the dependencies of each
- spec in the nodes
- """
-
- if not root and not dependencies:
- return []
-
- # Filter packageable roots
- with spack.store.STORE.db.read_transaction():
- if root:
- # Error on uninstalled roots, when roots are requested
- uninstalled_roots = list(s for s in specs if not s.installed)
- if uninstalled_roots:
- raise NotInstalledError(uninstalled_roots)
- roots = specs
- else:
- roots = []
-
- if dependencies:
- # Error on uninstalled deps, when deps are requested
- deps = list(
- traverse.traverse_nodes(
- specs, deptype="all", order="breadth", root=False, key=traverse.by_dag_hash
- )
- )
- uninstalled_deps = list(s for s in deps if not s.installed)
- if uninstalled_deps:
- raise NotInstalledError(uninstalled_deps)
- else:
- deps = []
-
- return [s for s in itertools.chain(roots, deps) if not s.external]
-
-
def try_verify(specfile_path):
"""Utility function to attempt to verify a local file. Assumes the
file is a clearsigned signature file.
diff --git a/lib/spack/spack/cmd/buildcache.py b/lib/spack/spack/cmd/buildcache.py
index a2dace6003..78d8e7a00c 100644
--- a/lib/spack/spack/cmd/buildcache.py
+++ b/lib/spack/spack/cmd/buildcache.py
@@ -3,25 +3,25 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import argparse
+import concurrent.futures
import copy
import glob
import hashlib
import json
-import multiprocessing
-import multiprocessing.pool
import os
import shutil
import sys
import tempfile
-from typing import Dict, List, Optional, Tuple, Union
+from typing import Dict, List, Optional, Tuple
import llnl.util.tty as tty
from llnl.string import plural
-from llnl.util.lang import elide_list
+from llnl.util.lang import elide_list, stable_partition
import spack.binary_distribution as bindist
import spack.cmd
import spack.config
+import spack.deptypes as dt
import spack.environment as ev
import spack.error
import spack.hash_types as ht
@@ -35,10 +35,10 @@ import spack.stage
import spack.store
import spack.user_environment
import spack.util.crypto
+import spack.util.parallel
import spack.util.url as url_util
import spack.util.web as web_util
from spack import traverse
-from spack.build_environment import determine_number_of_jobs
from spack.cmd import display_specs
from spack.cmd.common import arguments
from spack.oci.image import (
@@ -112,6 +112,17 @@ def setup_parser(subparser: argparse.ArgumentParser):
"Alternatively, one can decide to build a cache for only the package or only the "
"dependencies",
)
+ with_or_without_build_deps = push.add_mutually_exclusive_group()
+ with_or_without_build_deps.add_argument(
+ "--with-build-dependencies",
+ action="store_true",
+ help="include build dependencies in the buildcache",
+ )
+ with_or_without_build_deps.add_argument(
+ "--without-build-dependencies",
+ action="store_true",
+ help="exclude build dependencies from the buildcache",
+ )
push.add_argument(
"--fail-fast",
action="store_true",
@@ -336,32 +347,6 @@ def _progress(i: int, total: int):
return ""
-class NoPool:
- def map(self, func, args):
- return [func(a) for a in args]
-
- def starmap(self, func, args):
- return [func(*a) for a in args]
-
- def __enter__(self):
- return self
-
- def __exit__(self, *args):
- pass
-
-
-MaybePool = Union[multiprocessing.pool.Pool, NoPool]
-
-
-def _make_pool() -> MaybePool:
- """Can't use threading because it's unsafe, and can't use spawned processes because of globals.
- That leaves only forking"""
- if multiprocessing.get_start_method() == "fork":
- return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True))
- else:
- return NoPool()
-
-
def _skip_no_redistribute_for_public(specs):
remaining_specs = list()
removed_specs = list()
@@ -381,6 +366,47 @@ def _skip_no_redistribute_for_public(specs):
return remaining_specs
+class PackagesAreNotInstalledError(spack.error.SpackError):
+ """Raised when a list of specs is not installed but picked to be packaged."""
+
+ def __init__(self, specs: List[Spec]):
+ super().__init__(
+ "Cannot push non-installed packages",
+ ", ".join(elide_list(list(_format_spec(s) for s in specs), 5)),
+ )
+
+
+class PackageNotInstalledError(spack.error.SpackError):
+ """Raised when a spec is not installed but picked to be packaged."""
+
+
+class MissingLayerError(spack.error.SpackError):
+ """Raised when a required layer for a dependency is missing in an OCI registry."""
+
+
+def _specs_to_be_packaged(
+ requested: List[Spec], things_to_install: str, build_deps: bool
+) -> List[Spec]:
+ """Collect all non-external with or without roots and dependencies"""
+ if "dependencies" not in things_to_install:
+ deptype = dt.NONE
+ elif build_deps:
+ deptype = dt.ALL
+ else:
+ deptype = dt.RUN | dt.LINK | dt.TEST
+ return [
+ s
+ for s in traverse.traverse_nodes(
+ requested,
+ root="package" in things_to_install,
+ deptype=deptype,
+ order="breadth",
+ key=traverse.by_dag_hash,
+ )
+ if not s.external
+ ]
+
+
def push_fn(args):
"""create a binary package and push it to a mirror"""
if args.spec_file:
@@ -420,40 +446,52 @@ def push_fn(args):
"Use --unsigned to silence this warning."
)
- # This is a list of installed, non-external specs.
- specs = bindist.specs_to_be_packaged(
+ specs = _specs_to_be_packaged(
roots,
- root="package" in args.things_to_install,
- dependencies="dependencies" in args.things_to_install,
+ things_to_install=args.things_to_install,
+ build_deps=args.with_build_dependencies or not args.without_build_dependencies,
)
+
if not args.private:
specs = _skip_no_redistribute_for_public(specs)
- # When pushing multiple specs, print the url once ahead of time, as well as how
- # many specs are being pushed.
if len(specs) > 1:
tty.info(f"Selected {len(specs)} specs to push to {push_url}")
- failed = []
+ # Pushing not installed specs is an error. Either fail fast or populate the error list and
+ # push installed package in best effort mode.
+ failed: List[Tuple[Spec, BaseException]] = []
+ with spack.store.STORE.db.read_transaction():
+ if any(not s.installed for s in specs):
+ specs, not_installed = stable_partition(specs, lambda s: s.installed)
+ if args.fail_fast:
+ raise PackagesAreNotInstalledError(not_installed)
+ else:
+ failed.extend(
+ (s, PackageNotInstalledError("package not installed")) for s in not_installed
+ )
- # TODO: unify this logic in the future.
+ # TODO: move into bindist.push_or_raise
if target_image:
base_image = ImageReference.from_string(args.base_image) if args.base_image else None
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
- ) as tmpdir, _make_pool() as pool:
- skipped, base_images, checksums = _push_oci(
+ ) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor:
+ skipped, base_images, checksums, upload_errors = _push_oci(
target_image=target_image,
base_image=base_image,
installed_specs_with_deps=specs,
force=args.force,
tmpdir=tmpdir,
- pool=pool,
+ executor=executor,
)
+ if upload_errors:
+ failed.extend(upload_errors)
+
# Apart from creating manifests for each individual spec, we allow users to create a
# separate image tag for all root specs and their runtime dependencies.
- if args.tag:
+ elif args.tag:
tagged_image = target_image.with_tag(args.tag)
# _push_oci may not populate base_images if binaries were already in the registry
for spec in roots:
@@ -496,7 +534,7 @@ def push_fn(args):
e, (bindist.PickKeyException, bindist.NoKeyException)
):
raise
- failed.append((_format_spec(spec), e))
+ failed.append((spec, e))
if skipped:
if len(specs) == 1:
@@ -519,7 +557,13 @@ def push_fn(args):
raise spack.error.SpackError(
f"The following {len(failed)} errors occurred while pushing specs to the buildcache",
"\n".join(
- elide_list([f" {spec}: {e.__class__.__name__}: {e}" for spec, e in failed], 5)
+ elide_list(
+ [
+ f" {_format_spec(spec)}: {e.__class__.__name__}: {e}"
+ for spec, e in failed
+ ],
+ 5,
+ )
),
)
@@ -529,8 +573,8 @@ def push_fn(args):
if target_image and len(skipped) < len(specs) and args.update_index:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
- ) as tmpdir, _make_pool() as pool:
- _update_index_oci(target_image, tmpdir, pool)
+ ) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor:
+ _update_index_oci(target_image, tmpdir, executor)
def _get_spack_binary_blob(image_ref: ImageReference) -> Optional[spack.oci.oci.Blob]:
@@ -604,17 +648,12 @@ def _put_manifest(
):
architecture = _archspec_to_gooarch(specs[0])
- dependencies = list(
- reversed(
- list(
- s
- for s in traverse.traverse_nodes(
- specs, order="topo", deptype=("link", "run"), root=True
- )
- if not s.external
- )
- )
- )
+ expected_blobs: List[Spec] = [
+ s
+ for s in traverse.traverse_nodes(specs, order="topo", deptype=("link", "run"), root=True)
+ if not s.external
+ ]
+ expected_blobs.reverse()
base_manifest, base_config = base_images[architecture]
env = _retrieve_env_dict_from_config(base_config)
@@ -633,9 +672,16 @@ def _put_manifest(
# Create an oci.image.config file
config = copy.deepcopy(base_config)
- # Add the diff ids of the dependencies
- for s in dependencies:
- config["rootfs"]["diff_ids"].append(str(checksums[s.dag_hash()].uncompressed_digest))
+ # Add the diff ids of the blobs
+ for s in expected_blobs:
+ # If a layer for a dependency has gone missing (due to removed manifest in the registry, a
+ # failed push, or a local forced uninstall), we cannot create a runnable container image.
+ # If an OCI registry is only used for storage, this is not a hard error, but for now we
+ # raise an exception unconditionally, until someone requests a more lenient behavior.
+ checksum = checksums.get(s.dag_hash())
+ if not checksum:
+ raise MissingLayerError(f"missing layer for {_format_spec(s)}")
+ config["rootfs"]["diff_ids"].append(str(checksum.uncompressed_digest))
# Set the environment variables
config["config"]["Env"] = [f"{k}={v}" for k, v in env.items()]
@@ -679,7 +725,7 @@ def _put_manifest(
"digest": str(checksums[s.dag_hash()].compressed_digest),
"size": checksums[s.dag_hash()].size,
}
- for s in dependencies
+ for s in expected_blobs
),
],
}
@@ -724,9 +770,14 @@ def _push_oci(
base_image: Optional[ImageReference],
installed_specs_with_deps: List[Spec],
tmpdir: str,
- pool: MaybePool,
+ executor: concurrent.futures.Executor,
force: bool = False,
-) -> Tuple[List[str], Dict[str, Tuple[dict, dict]], Dict[str, spack.oci.oci.Blob]]:
+) -> Tuple[
+ List[str],
+ Dict[str, Tuple[dict, dict]],
+ Dict[str, spack.oci.oci.Blob],
+ List[Tuple[Spec, BaseException]],
+]:
"""Push specs to an OCI registry
Args:
@@ -739,7 +790,8 @@ def _push_oci(
Returns:
A tuple consisting of the list of skipped specs already in the build cache,
a dictionary mapping architectures to base image manifests and configs,
- and a dictionary mapping each spec's dag hash to a blob.
+ a dictionary mapping each spec's dag hash to a blob,
+ and a list of tuples of specs with errors of failed uploads.
"""
# Reverse the order
@@ -756,39 +808,50 @@ def _push_oci(
if not force:
tty.info("Checking for existing specs in the buildcache")
- to_be_uploaded = []
+ blobs_to_upload = []
tags_to_check = (target_image.with_tag(default_tag(s)) for s in installed_specs_with_deps)
- available_blobs = pool.map(_get_spack_binary_blob, tags_to_check)
+ available_blobs = executor.map(_get_spack_binary_blob, tags_to_check)
for spec, maybe_blob in zip(installed_specs_with_deps, available_blobs):
if maybe_blob is not None:
checksums[spec.dag_hash()] = maybe_blob
skipped.append(_format_spec(spec))
else:
- to_be_uploaded.append(spec)
+ blobs_to_upload.append(spec)
else:
- to_be_uploaded = installed_specs_with_deps
+ blobs_to_upload = installed_specs_with_deps
- if not to_be_uploaded:
- return skipped, base_images, checksums
+ if not blobs_to_upload:
+ return skipped, base_images, checksums, []
tty.info(
- f"{len(to_be_uploaded)} specs need to be pushed to "
+ f"{len(blobs_to_upload)} specs need to be pushed to "
f"{target_image.domain}/{target_image.name}"
)
# Upload blobs
- new_blobs = pool.starmap(
- _push_single_spack_binary_blob, ((target_image, spec, tmpdir) for spec in to_be_uploaded)
- )
-
- # And update the spec to blob mapping
- for spec, blob in zip(to_be_uploaded, new_blobs):
- checksums[spec.dag_hash()] = blob
+ blob_futures = [
+ executor.submit(_push_single_spack_binary_blob, target_image, spec, tmpdir)
+ for spec in blobs_to_upload
+ ]
+
+ concurrent.futures.wait(blob_futures)
+
+ manifests_to_upload: List[Spec] = []
+ errors: List[Tuple[Spec, BaseException]] = []
+
+ # And update the spec to blob mapping for successful uploads
+ for spec, blob_future in zip(blobs_to_upload, blob_futures):
+ error = blob_future.exception()
+ if error is None:
+ manifests_to_upload.append(spec)
+ checksums[spec.dag_hash()] = blob_future.result()
+ else:
+ errors.append((spec, error))
# Copy base images if necessary
- for spec in to_be_uploaded:
+ for spec in manifests_to_upload:
_update_base_images(
base_image=base_image,
target_image=target_image,
@@ -807,30 +870,35 @@ def _push_oci(
# Upload manifests
tty.info("Uploading manifests")
- pool.starmap(
- _put_manifest,
- (
- (
- base_images,
- checksums,
- target_image.with_tag(default_tag(spec)),
- tmpdir,
- extra_config(spec),
- {"org.opencontainers.image.description": spec.format()},
- spec,
- )
- for spec in to_be_uploaded
- ),
- )
+ manifest_futures = [
+ executor.submit(
+ _put_manifest,
+ base_images,
+ checksums,
+ target_image.with_tag(default_tag(spec)),
+ tmpdir,
+ extra_config(spec),
+ {"org.opencontainers.image.description": spec.format()},
+ spec,
+ )
+ for spec in manifests_to_upload
+ ]
+
+ concurrent.futures.wait(manifest_futures)
# Print the image names of the top-level specs
- for spec in to_be_uploaded:
- tty.info(f"Pushed {_format_spec(spec)} to {target_image.with_tag(default_tag(spec))}")
+ for spec, manifest_future in zip(manifests_to_upload, manifest_futures):
+ error = manifest_future.exception()
+ if error is None:
+ tty.info(f"Pushed {_format_spec(spec)} to {target_image.with_tag(default_tag(spec))}")
+ else:
+ errors.append((spec, error))
- return skipped, base_images, checksums
+ return skipped, base_images, checksums, errors
-def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]:
+def _config_from_tag(image_ref_and_tag: Tuple[ImageReference, str]) -> Optional[dict]:
+ image_ref, tag = image_ref_and_tag
# Don't allow recursion here, since Spack itself always uploads
# vnd.oci.image.manifest.v1+json, not vnd.oci.image.index.v1+json
_, config = get_manifest_and_config_with_retry(image_ref.with_tag(tag), tag, recurse=0)
@@ -840,13 +908,13 @@ def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]:
return config if "spec" in config else None
-def _update_index_oci(image_ref: ImageReference, tmpdir: str, pool: MaybePool) -> None:
+def _update_index_oci(
+ image_ref: ImageReference, tmpdir: str, pool: concurrent.futures.Executor
+) -> None:
tags = list_tags(image_ref)
# Fetch all image config files in parallel
- spec_dicts = pool.starmap(
- _config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag))
- )
+ spec_dicts = pool.map(_config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag)))
# Populate the database
db_root_dir = os.path.join(tmpdir, "db_root")
@@ -1182,8 +1250,8 @@ def update_index(mirror: spack.mirror.Mirror, update_keys=False):
if image_ref:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
- ) as tmpdir, _make_pool() as pool:
- _update_index_oci(image_ref, tmpdir, pool)
+ ) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor:
+ _update_index_oci(image_ref, tmpdir, executor)
return
# Otherwise, assume a normal mirror.
diff --git a/lib/spack/spack/test/cmd/buildcache.py b/lib/spack/spack/test/cmd/buildcache.py
index b634e37116..eee8c160f1 100644
--- a/lib/spack/spack/test/cmd/buildcache.py
+++ b/lib/spack/spack/test/cmd/buildcache.py
@@ -12,7 +12,9 @@ import pytest
import spack.binary_distribution
import spack.cmd.buildcache
+import spack.deptypes
import spack.environment as ev
+import spack.error
import spack.main
import spack.spec
import spack.util.url
@@ -443,3 +445,54 @@ def test_skip_no_redistribute(mock_packages, config):
filtered = spack.cmd.buildcache._skip_no_redistribute_for_public(specs)
assert not any(s.name == "no-redistribute" for s in filtered)
assert any(s.name == "no-redistribute-dependent" for s in filtered)
+
+
+def test_best_effort_vs_fail_fast_when_dep_not_installed(tmp_path, mutable_database):
+ """When --fail-fast is passed, the push command should fail if it immediately finds an
+ uninstalled dependency. Otherwise, failure to push one dependency shouldn't prevent the
+ others from being pushed."""
+
+ mirror("add", "--unsigned", "my-mirror", str(tmp_path))
+
+ # Uninstall mpich so that its dependent mpileaks can't be pushed
+ for s in mutable_database.query_local("mpich"):
+ s.package.do_uninstall(force=True)
+
+ with pytest.raises(spack.cmd.buildcache.PackagesAreNotInstalledError, match="mpich"):
+ buildcache("push", "--update-index", "--fail-fast", "my-mirror", "mpileaks^mpich")
+
+ # nothing should be pushed due to --fail-fast.
+ assert not os.listdir(tmp_path)
+ assert not spack.binary_distribution.update_cache_and_get_specs()
+
+ with pytest.raises(spack.cmd.buildcache.PackageNotInstalledError):
+ buildcache("push", "--update-index", "my-mirror", "mpileaks^mpich")
+
+ specs = spack.binary_distribution.update_cache_and_get_specs()
+
+ # everything but mpich should be pushed
+ mpileaks = mutable_database.query_local("mpileaks^mpich")[0]
+ assert set(specs) == {s for s in mpileaks.traverse() if s.name != "mpich"}
+
+
+def test_push_without_build_deps(tmp_path, temporary_store, mock_packages, mutable_config):
+ """Spack should not error when build deps are uninstalled and --without-build-dependenies is
+ passed."""
+
+ mirror("add", "--unsigned", "my-mirror", str(tmp_path))
+
+ s = spack.spec.Spec("dtrun3").concretized()
+ s.package.do_install(fake=True)
+ s["dtbuild3"].package.do_uninstall()
+
+ # fails when build deps are required
+ with pytest.raises(spack.error.SpackError, match="package not installed"):
+ buildcache(
+ "push", "--update-index", "--with-build-dependencies", "my-mirror", f"/{s.dag_hash()}"
+ )
+
+ # succeeds when build deps are not required
+ buildcache(
+ "push", "--update-index", "--without-build-dependencies", "my-mirror", f"/{s.dag_hash()}"
+ )
+ assert spack.binary_distribution.update_cache_and_get_specs() == [s]
diff --git a/lib/spack/spack/test/conftest.py b/lib/spack/spack/test/conftest.py
index 00d7980a55..8a1f887fbe 100644
--- a/lib/spack/spack/test/conftest.py
+++ b/lib/spack/spack/test/conftest.py
@@ -56,6 +56,7 @@ import spack.test.cray_manifest
import spack.util.executable
import spack.util.git
import spack.util.gpg
+import spack.util.parallel
import spack.util.spack_yaml as syaml
import spack.util.url as url_util
import spack.version
@@ -1961,10 +1962,12 @@ def pytest_runtest_setup(item):
pytest.skip(*not_on_windows_marker.args)
-@pytest.fixture(scope="function")
+@pytest.fixture(autouse=True)
def disable_parallel_buildcache_push(monkeypatch):
"""Disable process pools in tests."""
- monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", spack.cmd.buildcache.NoPool)
+ monkeypatch.setattr(
+ spack.util.parallel, "make_concurrent_executor", spack.util.parallel.SequentialExecutor
+ )
def _root_path(x, y, *, path):
diff --git a/lib/spack/spack/test/oci/integration_test.py b/lib/spack/spack/test/oci/integration_test.py
index c4e2636619..cefd95f92d 100644
--- a/lib/spack/spack/test/oci/integration_test.py
+++ b/lib/spack/spack/test/oci/integration_test.py
@@ -10,12 +10,19 @@ import hashlib
import json
import os
import pathlib
+import re
from contextlib import contextmanager
+import pytest
+
+import spack.cmd.buildcache
+import spack.database
import spack.environment as ev
+import spack.error
import spack.oci.opener
+import spack.spec
from spack.main import SpackCommand
-from spack.oci.image import Digest, ImageReference, default_config, default_manifest
+from spack.oci.image import Digest, ImageReference, default_config, default_manifest, default_tag
from spack.oci.oci import blob_exists, get_manifest_and_config, upload_blob, upload_manifest
from spack.test.oci.mock_registry import DummyServer, InMemoryOCIRegistry, create_opener
from spack.util.archive import gzip_compressed_tarfile
@@ -34,7 +41,7 @@ def oci_servers(*servers: DummyServer):
spack.oci.opener.urlopen = old_opener
-def test_buildcache_push_command(mutable_database, disable_parallel_buildcache_push):
+def test_buildcache_push_command(mutable_database):
with oci_servers(InMemoryOCIRegistry("example.com")):
mirror("add", "oci-test", "oci://example.com/image")
@@ -57,9 +64,7 @@ def test_buildcache_push_command(mutable_database, disable_parallel_buildcache_p
assert os.path.exists(os.path.join(spec.prefix, "bin", "mpileaks"))
-def test_buildcache_tag(
- install_mockery, mock_fetch, mutable_mock_env_path, disable_parallel_buildcache_push
-):
+def test_buildcache_tag(install_mockery, mock_fetch, mutable_mock_env_path):
"""Tests whether we can create an OCI image from a full environment with multiple roots."""
env("create", "test")
with ev.read("test"):
@@ -97,9 +102,7 @@ def test_buildcache_tag(
assert len(manifest["layers"]) == 1
-def test_buildcache_push_with_base_image_command(
- mutable_database, tmpdir, disable_parallel_buildcache_push
-):
+def test_buildcache_push_with_base_image_command(mutable_database, tmpdir):
"""Test that we can push a package with a base image to an OCI registry.
This test is a bit involved, cause we have to create a small base image."""
@@ -200,7 +203,7 @@ def test_buildcache_push_with_base_image_command(
def test_uploading_with_base_image_in_docker_image_manifest_v2_format(
- tmp_path: pathlib.Path, mutable_database, disable_parallel_buildcache_push
+ tmp_path: pathlib.Path, mutable_database
):
"""If the base image uses an old manifest schema, Spack should also use that.
That is necessary for container images to work with Apptainer, which is rather strict about
@@ -286,3 +289,57 @@ def test_uploading_with_base_image_in_docker_image_manifest_v2_format(
for layer in m["layers"]:
assert layer["mediaType"] == "application/vnd.docker.image.rootfs.diff.tar.gzip"
assert "annotations" not in m
+
+
+def test_best_effort_upload(mutable_database: spack.database.Database, monkeypatch):
+ """Failure to upload a blob or manifest should not prevent others from being uploaded"""
+
+ _push_blob = spack.cmd.buildcache._push_single_spack_binary_blob
+ _push_manifest = spack.cmd.buildcache._put_manifest
+
+ def push_blob(image_ref, spec, tmpdir):
+ # fail to upload the blob of mpich
+ if spec.name == "mpich":
+ raise Exception("Blob Server Error")
+ return _push_blob(image_ref, spec, tmpdir)
+
+ def put_manifest(base_images, checksums, image_ref, tmpdir, extra_config, annotations, *specs):
+ # fail to upload the manifest of libdwarf
+ if "libdwarf" in (s.name for s in specs):
+ raise Exception("Manifest Server Error")
+ return _push_manifest(
+ base_images, checksums, image_ref, tmpdir, extra_config, annotations, *specs
+ )
+
+ monkeypatch.setattr(spack.cmd.buildcache, "_push_single_spack_binary_blob", push_blob)
+ monkeypatch.setattr(spack.cmd.buildcache, "_put_manifest", put_manifest)
+
+ registry = InMemoryOCIRegistry("example.com")
+ with oci_servers(registry):
+ mirror("add", "oci-test", "oci://example.com/image")
+
+ with pytest.raises(spack.error.SpackError, match="The following 4 errors occurred") as e:
+ buildcache("push", "--update-index", "oci-test", "mpileaks^mpich")
+
+ error = str(e.value)
+
+ # mpich's blob failed to upload
+ assert re.search("mpich.+: Exception: Blob Server Error", error)
+
+ # libdwarf's manifest failed to upload
+ assert re.search("libdwarf.+: Exception: Manifest Server Error", error)
+
+ # since there is no blob for mpich, runtime dependents cannot refer to it in their
+ # manifests, which is a transitive error.
+ assert re.search("callpath.+: MissingLayerError: missing layer for mpich", error)
+ assert re.search("mpileaks.+: MissingLayerError: missing layer for mpich", error)
+
+ mpileaks: spack.spec.Spec = mutable_database.query_local("mpileaks^mpich")[0]
+
+ # ensure that packages not affected by errors were uploaded still.
+ uploaded_tags = {tag for _, tag in registry.manifests.keys()}
+ failures = {"mpich", "libdwarf", "callpath", "mpileaks"}
+ expected_tags = {default_tag(s) for s in mpileaks.traverse() if s.name not in failures}
+
+ assert expected_tags
+ assert uploaded_tags == expected_tags
diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py
index 9c0ff1ab8e..28c55b7d1e 100644
--- a/lib/spack/spack/util/parallel.py
+++ b/lib/spack/spack/util/parallel.py
@@ -2,12 +2,15 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
+import concurrent.futures
import multiprocessing
import os
import sys
import traceback
from typing import Optional
+from spack.util.cpus import determine_number_of_jobs
+
class ErrorFromWorker:
"""Wrapper class to report an error from a worker process"""
@@ -80,3 +83,24 @@ def imap_unordered(
if isinstance(result, ErrorFromWorker):
raise RuntimeError(result.stacktrace if debug else str(result))
yield result
+
+
+class SequentialExecutor(concurrent.futures.Executor):
+ """Executor that runs tasks sequentially in the current thread."""
+
+ def submit(self, fn, *args, **kwargs):
+ """Submit a function to be executed."""
+ future = concurrent.futures.Future()
+ try:
+ future.set_result(fn(*args, **kwargs))
+ except Exception as e:
+ future.set_exception(e)
+ return future
+
+
+def make_concurrent_executor() -> concurrent.futures.Executor:
+ """Can't use threading because it's unsafe, and can't use spawned processes because of globals.
+ That leaves only forking."""
+ if multiprocessing.get_start_method() == "fork":
+ return concurrent.futures.ProcessPoolExecutor(determine_number_of_jobs(parallel=True))
+ return SequentialExecutor()