summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Wittenburg <scott.wittenburg@kitware.com>2022-11-07 13:31:14 -0700
committerGitHub <noreply@github.com>2022-11-07 21:31:14 +0100
commit28a77c2821d62a2800a63921f116fcf45a8f7342 (patch)
tree204a6842e65e37421597f1d4c3a59dcc543e9cd3
parent01a578851707281b8e1da736086b41e98bfbe5f4 (diff)
downloadspack-28a77c2821d62a2800a63921f116fcf45a8f7342.tar.gz
spack-28a77c2821d62a2800a63921f116fcf45a8f7342.tar.bz2
spack-28a77c2821d62a2800a63921f116fcf45a8f7342.tar.xz
spack-28a77c2821d62a2800a63921f116fcf45a8f7342.zip
binary_distribution: Speed up buildcache update-index (#32796)
This change uses the aws cli, if available, to retrieve spec files from the mirror to a local temp directory, then parallelizes the reading of those files from disk using multiprocessing.ThreadPool. If the aws cli is not available, then a ThreadPool is used to fetch and read the spec files from the mirror. Using aws cli results in ~16 times speed up to recreate the binary mirror index, while just parallelizing the fetching and reading results in ~3 speed up.
-rw-r--r--lib/spack/spack/binary_distribution.py204
1 files changed, 168 insertions, 36 deletions
diff --git a/lib/spack/spack/binary_distribution.py b/lib/spack/spack/binary_distribution.py
index 3967afdfd8..1e9843c473 100644
--- a/lib/spack/spack/binary_distribution.py
+++ b/lib/spack/spack/binary_distribution.py
@@ -7,6 +7,7 @@ import codecs
import collections
import hashlib
import json
+import multiprocessing.pool
import os
import shutil
import sys
@@ -45,6 +46,7 @@ from spack.caches import misc_cache_location
from spack.relocate import utf8_paths_to_single_binary_regex
from spack.spec import Spec
from spack.stage import Stage
+from spack.util.executable import which
_build_cache_relative_path = "build_cache"
_build_cache_keys_relative_path = "_pgp"
@@ -72,6 +74,10 @@ class FetchCacheError(Exception):
super(FetchCacheError, self).__init__(self.message)
+class ListMirrorSpecsError(spack.error.SpackError):
+ """Raised when unable to retrieve list of specs from the mirror"""
+
+
class BinaryCacheIndex(object):
"""
The BinaryCacheIndex tracks what specs are available on (usually remote)
@@ -881,37 +887,52 @@ def sign_specfile(key, force, specfile_path):
spack.util.gpg.sign(key, specfile_path, signed_specfile_path, clearsign=True)
-def _fetch_spec_from_mirror(spec_url):
- s = None
- tty.debug("fetching {0}".format(spec_url))
- _, _, spec_file = web_util.read_from_url(spec_url)
- spec_file_contents = codecs.getreader("utf-8")(spec_file).read()
- # Need full spec.json name or this gets confused with index.json.
- if spec_url.endswith(".json.sig"):
- specfile_json = Spec.extract_json_from_clearsig(spec_file_contents)
- s = Spec.from_dict(specfile_json)
- elif spec_url.endswith(".json"):
- s = Spec.from_json(spec_file_contents)
- elif spec_url.endswith(".yaml"):
- s = Spec.from_yaml(spec_file_contents)
- return s
-
-
-def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir):
- for file_path in file_list:
- try:
- s = _fetch_spec_from_mirror(url_util.join(cache_prefix, file_path))
- except (URLError, web_util.SpackWebError) as url_err:
- tty.error("Error reading specfile: {0}".format(file_path))
- tty.error(url_err)
+def _read_specs_and_push_index(file_list, read_method, cache_prefix, db, temp_dir, concurrency):
+ """Read all the specs listed in the provided list, using thread given thread parallelism,
+ generate the index, and push it to the mirror.
+
+ Args:
+ file_list (list(str)): List of urls or file paths pointing at spec files to read
+ read_method: A function taking a single argument, either a url or a file path,
+ and which reads the spec file at that location, and returns the spec.
+ cache_prefix (str): prefix of the build cache on s3 where index should be pushed.
+ db: A spack database used for adding specs and then writing the index.
+ temp_dir (str): Location to write index.json and hash for pushing
+ concurrency (int): Number of parallel processes to use when fetching
- if s:
- db.add(s, None)
- db.mark(s, "in_buildcache", True)
+ Return:
+ None
+ """
+
+ def _fetch_spec_from_mirror(spec_url):
+ spec_file_contents = read_method(spec_url)
+
+ if spec_file_contents:
+ # Need full spec.json name or this gets confused with index.json.
+ if spec_url.endswith(".json.sig"):
+ specfile_json = Spec.extract_json_from_clearsig(spec_file_contents)
+ return Spec.from_dict(specfile_json)
+ if spec_url.endswith(".json"):
+ return Spec.from_json(spec_file_contents)
+ if spec_url.endswith(".yaml"):
+ return Spec.from_yaml(spec_file_contents)
+
+ tp = multiprocessing.pool.ThreadPool(processes=concurrency)
+ try:
+ fetched_specs = tp.map(
+ llnl.util.lang.star(_fetch_spec_from_mirror), [(f,) for f in file_list]
+ )
+ finally:
+ tp.terminate()
+ tp.join()
+
+ for fetched_spec in fetched_specs:
+ db.add(fetched_spec, None)
+ db.mark(fetched_spec, "in_buildcache", True)
# Now generate the index, compute its hash, and push the two files to
# the mirror.
- index_json_path = os.path.join(db_root_dir, "index.json")
+ index_json_path = os.path.join(temp_dir, "index.json")
with open(index_json_path, "w") as f:
db._write_to_file(f)
@@ -921,7 +942,7 @@ def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir):
index_hash = compute_hash(index_string)
# Write the hash out to a local file
- index_hash_path = os.path.join(db_root_dir, "index.json.hash")
+ index_hash_path = os.path.join(temp_dir, "index.json.hash")
with open(index_hash_path, "w") as f:
f.write(index_hash)
@@ -942,31 +963,142 @@ def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir):
)
-def generate_package_index(cache_prefix):
- """Create the build cache index page.
+def _specs_from_cache_aws_cli(cache_prefix):
+ """Use aws cli to sync all the specs into a local temporary directory.
- Creates (or replaces) the "index.json" page at the location given in
- cache_prefix. This page contains a link for each binary package (.yaml or
- .json) under cache_prefix.
+ Args:
+ cache_prefix (str): prefix of the build cache on s3
+
+ Return:
+ List of the local file paths and a function that can read each one from the file system.
+ """
+ read_fn = None
+ file_list = None
+ aws = which("aws")
+
+ def file_read_method(file_path):
+ with open(file_path) as fd:
+ return fd.read()
+
+ tmpspecsdir = tempfile.mkdtemp()
+ sync_command_args = [
+ "s3",
+ "sync",
+ "--exclude",
+ "*",
+ "--include",
+ "*.spec.json.sig",
+ "--include",
+ "*.spec.json",
+ "--include",
+ "*.spec.yaml",
+ cache_prefix,
+ tmpspecsdir,
+ ]
+
+ try:
+ tty.debug(
+ "Using aws s3 sync to download specs from {0} to {1}".format(cache_prefix, tmpspecsdir)
+ )
+ aws(*sync_command_args, output=os.devnull, error=os.devnull)
+ file_list = fsys.find(tmpspecsdir, ["*.spec.json.sig", "*.spec.json", "*.spec.yaml"])
+ read_fn = file_read_method
+ except Exception:
+ tty.warn("Failed to use aws s3 sync to retrieve specs, falling back to parallel fetch")
+ shutil.rmtree(tmpspecsdir)
+
+ return file_list, read_fn
+
+
+def _specs_from_cache_fallback(cache_prefix):
+ """Use spack.util.web module to get a list of all the specs at the remote url.
+
+ Args:
+ cache_prefix (str): Base url of mirror (location of spec files)
+
+ Return:
+ The list of complete spec file urls and a function that can read each one from its
+ remote location (also using the spack.util.web module).
"""
+ read_fn = None
+ file_list = None
+
+ def url_read_method(url):
+ contents = None
+ try:
+ _, _, spec_file = web_util.read_from_url(url)
+ contents = codecs.getreader("utf-8")(spec_file).read()
+ except (URLError, web_util.SpackWebError) as url_err:
+ tty.error("Error reading specfile: {0}".format(url))
+ tty.error(url_err)
+ return contents
+
try:
file_list = [
- entry
+ url_util.join(cache_prefix, entry)
for entry in web_util.list_url(cache_prefix)
if entry.endswith(".yaml")
or entry.endswith("spec.json")
or entry.endswith("spec.json.sig")
]
+ read_fn = url_read_method
except KeyError as inst:
msg = "No packages at {0}: {1}".format(cache_prefix, inst)
tty.warn(msg)
- return
except Exception as err:
# If we got some kind of S3 (access denied or other connection
# error), the first non boto-specific class in the exception
# hierarchy is Exception. Just print a warning and return
msg = "Encountered problem listing packages at {0}: {1}".format(cache_prefix, err)
tty.warn(msg)
+
+ return file_list, read_fn
+
+
+def _spec_files_from_cache(cache_prefix):
+ """Get a list of all the spec files in the mirror and a function to
+ read them.
+
+ Args:
+ cache_prefix (str): Base url of mirror (location of spec files)
+
+ Return:
+ A tuple where the first item is a list of absolute file paths or
+ urls pointing to the specs that should be read from the mirror,
+ and the second item is a function taking a url or file path and
+ returning the spec read from that location.
+ """
+ callbacks = []
+ if cache_prefix.startswith("s3"):
+ callbacks.append(_specs_from_cache_aws_cli)
+
+ callbacks.append(_specs_from_cache_fallback)
+
+ for specs_from_cache_fn in callbacks:
+ file_list, read_fn = specs_from_cache_fn(cache_prefix)
+ if file_list:
+ return file_list, read_fn
+
+ raise ListMirrorSpecsError("Failed to get list of specs from {0}".format(cache_prefix))
+
+
+def generate_package_index(cache_prefix, concurrency=32):
+ """Create or replace the build cache index on the given mirror. The
+ buildcache index contains an entry for each binary package under the
+ cache_prefix.
+
+ Args:
+ cache_prefix(str): Base url of binary mirror.
+ concurrency: (int): The desired threading concurrency to use when
+ fetching the spec files from the mirror.
+
+ Return:
+ None
+ """
+ try:
+ file_list, read_fn = _spec_files_from_cache(cache_prefix)
+ except ListMirrorSpecsError as err:
+ tty.error("Unabled to generate package index, {0}".format(err))
return
if any(x.endswith(".yaml") for x in file_list):
@@ -989,7 +1121,7 @@ def generate_package_index(cache_prefix):
)
try:
- _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir)
+ _read_specs_and_push_index(file_list, read_fn, cache_prefix, db, db_root_dir, concurrency)
except Exception as err:
msg = "Encountered problem pushing package index to {0}: {1}".format(cache_prefix, err)
tty.warn(msg)