diff options
-rw-r--r-- | lib/spack/spack/binary_distribution.py | 204 |
1 files changed, 168 insertions, 36 deletions
diff --git a/lib/spack/spack/binary_distribution.py b/lib/spack/spack/binary_distribution.py index 3967afdfd8..1e9843c473 100644 --- a/lib/spack/spack/binary_distribution.py +++ b/lib/spack/spack/binary_distribution.py @@ -7,6 +7,7 @@ import codecs import collections import hashlib import json +import multiprocessing.pool import os import shutil import sys @@ -45,6 +46,7 @@ from spack.caches import misc_cache_location from spack.relocate import utf8_paths_to_single_binary_regex from spack.spec import Spec from spack.stage import Stage +from spack.util.executable import which _build_cache_relative_path = "build_cache" _build_cache_keys_relative_path = "_pgp" @@ -72,6 +74,10 @@ class FetchCacheError(Exception): super(FetchCacheError, self).__init__(self.message) +class ListMirrorSpecsError(spack.error.SpackError): + """Raised when unable to retrieve list of specs from the mirror""" + + class BinaryCacheIndex(object): """ The BinaryCacheIndex tracks what specs are available on (usually remote) @@ -881,37 +887,52 @@ def sign_specfile(key, force, specfile_path): spack.util.gpg.sign(key, specfile_path, signed_specfile_path, clearsign=True) -def _fetch_spec_from_mirror(spec_url): - s = None - tty.debug("fetching {0}".format(spec_url)) - _, _, spec_file = web_util.read_from_url(spec_url) - spec_file_contents = codecs.getreader("utf-8")(spec_file).read() - # Need full spec.json name or this gets confused with index.json. - if spec_url.endswith(".json.sig"): - specfile_json = Spec.extract_json_from_clearsig(spec_file_contents) - s = Spec.from_dict(specfile_json) - elif spec_url.endswith(".json"): - s = Spec.from_json(spec_file_contents) - elif spec_url.endswith(".yaml"): - s = Spec.from_yaml(spec_file_contents) - return s - - -def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir): - for file_path in file_list: - try: - s = _fetch_spec_from_mirror(url_util.join(cache_prefix, file_path)) - except (URLError, web_util.SpackWebError) as url_err: - tty.error("Error reading specfile: {0}".format(file_path)) - tty.error(url_err) +def _read_specs_and_push_index(file_list, read_method, cache_prefix, db, temp_dir, concurrency): + """Read all the specs listed in the provided list, using thread given thread parallelism, + generate the index, and push it to the mirror. + + Args: + file_list (list(str)): List of urls or file paths pointing at spec files to read + read_method: A function taking a single argument, either a url or a file path, + and which reads the spec file at that location, and returns the spec. + cache_prefix (str): prefix of the build cache on s3 where index should be pushed. + db: A spack database used for adding specs and then writing the index. + temp_dir (str): Location to write index.json and hash for pushing + concurrency (int): Number of parallel processes to use when fetching - if s: - db.add(s, None) - db.mark(s, "in_buildcache", True) + Return: + None + """ + + def _fetch_spec_from_mirror(spec_url): + spec_file_contents = read_method(spec_url) + + if spec_file_contents: + # Need full spec.json name or this gets confused with index.json. + if spec_url.endswith(".json.sig"): + specfile_json = Spec.extract_json_from_clearsig(spec_file_contents) + return Spec.from_dict(specfile_json) + if spec_url.endswith(".json"): + return Spec.from_json(spec_file_contents) + if spec_url.endswith(".yaml"): + return Spec.from_yaml(spec_file_contents) + + tp = multiprocessing.pool.ThreadPool(processes=concurrency) + try: + fetched_specs = tp.map( + llnl.util.lang.star(_fetch_spec_from_mirror), [(f,) for f in file_list] + ) + finally: + tp.terminate() + tp.join() + + for fetched_spec in fetched_specs: + db.add(fetched_spec, None) + db.mark(fetched_spec, "in_buildcache", True) # Now generate the index, compute its hash, and push the two files to # the mirror. - index_json_path = os.path.join(db_root_dir, "index.json") + index_json_path = os.path.join(temp_dir, "index.json") with open(index_json_path, "w") as f: db._write_to_file(f) @@ -921,7 +942,7 @@ def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir): index_hash = compute_hash(index_string) # Write the hash out to a local file - index_hash_path = os.path.join(db_root_dir, "index.json.hash") + index_hash_path = os.path.join(temp_dir, "index.json.hash") with open(index_hash_path, "w") as f: f.write(index_hash) @@ -942,31 +963,142 @@ def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir): ) -def generate_package_index(cache_prefix): - """Create the build cache index page. +def _specs_from_cache_aws_cli(cache_prefix): + """Use aws cli to sync all the specs into a local temporary directory. - Creates (or replaces) the "index.json" page at the location given in - cache_prefix. This page contains a link for each binary package (.yaml or - .json) under cache_prefix. + Args: + cache_prefix (str): prefix of the build cache on s3 + + Return: + List of the local file paths and a function that can read each one from the file system. + """ + read_fn = None + file_list = None + aws = which("aws") + + def file_read_method(file_path): + with open(file_path) as fd: + return fd.read() + + tmpspecsdir = tempfile.mkdtemp() + sync_command_args = [ + "s3", + "sync", + "--exclude", + "*", + "--include", + "*.spec.json.sig", + "--include", + "*.spec.json", + "--include", + "*.spec.yaml", + cache_prefix, + tmpspecsdir, + ] + + try: + tty.debug( + "Using aws s3 sync to download specs from {0} to {1}".format(cache_prefix, tmpspecsdir) + ) + aws(*sync_command_args, output=os.devnull, error=os.devnull) + file_list = fsys.find(tmpspecsdir, ["*.spec.json.sig", "*.spec.json", "*.spec.yaml"]) + read_fn = file_read_method + except Exception: + tty.warn("Failed to use aws s3 sync to retrieve specs, falling back to parallel fetch") + shutil.rmtree(tmpspecsdir) + + return file_list, read_fn + + +def _specs_from_cache_fallback(cache_prefix): + """Use spack.util.web module to get a list of all the specs at the remote url. + + Args: + cache_prefix (str): Base url of mirror (location of spec files) + + Return: + The list of complete spec file urls and a function that can read each one from its + remote location (also using the spack.util.web module). """ + read_fn = None + file_list = None + + def url_read_method(url): + contents = None + try: + _, _, spec_file = web_util.read_from_url(url) + contents = codecs.getreader("utf-8")(spec_file).read() + except (URLError, web_util.SpackWebError) as url_err: + tty.error("Error reading specfile: {0}".format(url)) + tty.error(url_err) + return contents + try: file_list = [ - entry + url_util.join(cache_prefix, entry) for entry in web_util.list_url(cache_prefix) if entry.endswith(".yaml") or entry.endswith("spec.json") or entry.endswith("spec.json.sig") ] + read_fn = url_read_method except KeyError as inst: msg = "No packages at {0}: {1}".format(cache_prefix, inst) tty.warn(msg) - return except Exception as err: # If we got some kind of S3 (access denied or other connection # error), the first non boto-specific class in the exception # hierarchy is Exception. Just print a warning and return msg = "Encountered problem listing packages at {0}: {1}".format(cache_prefix, err) tty.warn(msg) + + return file_list, read_fn + + +def _spec_files_from_cache(cache_prefix): + """Get a list of all the spec files in the mirror and a function to + read them. + + Args: + cache_prefix (str): Base url of mirror (location of spec files) + + Return: + A tuple where the first item is a list of absolute file paths or + urls pointing to the specs that should be read from the mirror, + and the second item is a function taking a url or file path and + returning the spec read from that location. + """ + callbacks = [] + if cache_prefix.startswith("s3"): + callbacks.append(_specs_from_cache_aws_cli) + + callbacks.append(_specs_from_cache_fallback) + + for specs_from_cache_fn in callbacks: + file_list, read_fn = specs_from_cache_fn(cache_prefix) + if file_list: + return file_list, read_fn + + raise ListMirrorSpecsError("Failed to get list of specs from {0}".format(cache_prefix)) + + +def generate_package_index(cache_prefix, concurrency=32): + """Create or replace the build cache index on the given mirror. The + buildcache index contains an entry for each binary package under the + cache_prefix. + + Args: + cache_prefix(str): Base url of binary mirror. + concurrency: (int): The desired threading concurrency to use when + fetching the spec files from the mirror. + + Return: + None + """ + try: + file_list, read_fn = _spec_files_from_cache(cache_prefix) + except ListMirrorSpecsError as err: + tty.error("Unabled to generate package index, {0}".format(err)) return if any(x.endswith(".yaml") for x in file_list): @@ -989,7 +1121,7 @@ def generate_package_index(cache_prefix): ) try: - _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir) + _read_specs_and_push_index(file_list, read_fn, cache_prefix, db, db_root_dir, concurrency) except Exception as err: msg = "Encountered problem pushing package index to {0}: {1}".format(cache_prefix, err) tty.warn(msg) |