diff options
author | Doug Jacobsen <jacobsen.douglas@gmail.com> | 2021-10-21 22:22:38 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-22 06:22:38 +0200 |
commit | d1d002164729a6aa60fdf18d0248b41132b78c19 (patch) | |
tree | 784da6beddf1ea6f2f45f2ab02dd86b085db8eaf | |
parent | d024faf044357a7db41f15baf94db226443361fe (diff) | |
download | spack-d1d002164729a6aa60fdf18d0248b41132b78c19.tar.gz spack-d1d002164729a6aa60fdf18d0248b41132b78c19.tar.bz2 spack-d1d002164729a6aa60fdf18d0248b41132b78c19.tar.xz spack-d1d002164729a6aa60fdf18d0248b41132b78c19.zip |
Add GCS Bucket Mirrors (#26382)
This commit contains changes to support Google Cloud Storage
buckets as mirrors, meant for hosting Spack build-caches. This
feature is beneficial for folks that are running infrastructure on
Google Cloud Platform. On public cloud systems, resources are
ephemeral and in many cases, installing compilers, MPI flavors,
and user packages from scratch takes up considerable time.
Giving users the ability to host a Spack mirror that can store build
caches in GCS buckets offers a clean solution for reducing
application rebuilds for Google Cloud infrastructure.
Co-authored-by: Joe Schoonover <joe@fluidnumerics.com>
-rw-r--r-- | lib/spack/spack/fetch_strategy.py | 49 | ||||
-rw-r--r-- | lib/spack/spack/gcs_handler.py | 26 | ||||
-rw-r--r-- | lib/spack/spack/test/gcs_fetch.py | 54 | ||||
-rw-r--r-- | lib/spack/spack/util/gcs.py | 215 | ||||
-rw-r--r-- | lib/spack/spack/util/web.py | 31 |
5 files changed, 375 insertions, 0 deletions
diff --git a/lib/spack/spack/fetch_strategy.py b/lib/spack/spack/fetch_strategy.py index 436fbf39bf..888cad7bf3 100644 --- a/lib/spack/spack/fetch_strategy.py +++ b/lib/spack/spack/fetch_strategy.py @@ -1407,6 +1407,55 @@ class S3FetchStrategy(URLFetchStrategy): raise FailedDownloadError(self.url) +@fetcher +class GCSFetchStrategy(URLFetchStrategy): + """FetchStrategy that pulls from a GCS bucket.""" + url_attr = 'gs' + + def __init__(self, *args, **kwargs): + try: + super(GCSFetchStrategy, self).__init__(*args, **kwargs) + except ValueError: + if not kwargs.get('url'): + raise ValueError( + "GCSFetchStrategy requires a url for fetching.") + + @_needs_stage + def fetch(self): + import spack.util.web as web_util + if self.archive_file: + tty.debug('Already downloaded {0}'.format(self.archive_file)) + return + + parsed_url = url_util.parse(self.url) + if parsed_url.scheme != 'gs': + raise FetchError( + 'GCSFetchStrategy can only fetch from gs:// urls.') + + tty.debug('Fetching {0}'.format(self.url)) + + basename = os.path.basename(parsed_url.path) + + with working_dir(self.stage.path): + _, headers, stream = web_util.read_from_url(self.url) + + with open(basename, 'wb') as f: + shutil.copyfileobj(stream, f) + + content_type = web_util.get_header(headers, 'Content-type') + + if content_type == 'text/html': + warn_content_type_mismatch(self.archive_file or "the archive") + + if self.stage.save_filename: + os.rename( + os.path.join(self.stage.path, basename), + self.stage.save_filename) + + if not self.archive_file: + raise FailedDownloadError(self.url) + + def stable_target(fetcher): """Returns whether the fetcher target is expected to have a stable checksum. This is only true if the target is a preexisting archive diff --git a/lib/spack/spack/gcs_handler.py b/lib/spack/spack/gcs_handler.py new file mode 100644 index 0000000000..1f120e78d8 --- /dev/null +++ b/lib/spack/spack/gcs_handler.py @@ -0,0 +1,26 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) + +import six.moves.urllib.response as urllib_response + +import spack.util.url as url_util +import spack.util.web as web_util + + +def gcs_open(req, *args, **kwargs): + """Open a reader stream to a blob object on GCS + """ + import spack.util.gcs as gcs_util + + url = url_util.parse(req.get_full_url()) + gcsblob = gcs_util.GCSBlob(url) + + if not gcsblob.exists(): + raise web_util.SpackWebError('GCS blob {0} does not exist'.format( + gcsblob.blob_path)) + stream = gcsblob.get_blob_byte_stream() + headers = gcsblob.get_blob_headers() + + return urllib_response.addinfourl(stream, headers, url) diff --git a/lib/spack/spack/test/gcs_fetch.py b/lib/spack/spack/test/gcs_fetch.py new file mode 100644 index 0000000000..82b0099048 --- /dev/null +++ b/lib/spack/spack/test/gcs_fetch.py @@ -0,0 +1,54 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) + +import os + +import pytest + +import spack.config +import spack.fetch_strategy +import spack.stage + + +@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib']) +def test_gcsfetchstrategy_without_url(_fetch_method): + """Ensure constructor with no URL fails.""" + with spack.config.override('config:url_fetch_method', _fetch_method): + with pytest.raises(ValueError): + spack.fetch_strategy.GCSFetchStrategy(None) + + +@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib']) +def test_gcsfetchstrategy_bad_url(tmpdir, _fetch_method): + """Ensure fetch with bad URL fails as expected.""" + testpath = str(tmpdir) + + with spack.config.override('config:url_fetch_method', _fetch_method): + fetcher = spack.fetch_strategy.GCSFetchStrategy(url='file:///does-not-exist') + assert fetcher is not None + + with spack.stage.Stage(fetcher, path=testpath) as stage: + assert stage is not None + assert fetcher.archive_file is None + with pytest.raises(spack.fetch_strategy.FetchError): + fetcher.fetch() + + +@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib']) +def test_gcsfetchstrategy_downloaded(tmpdir, _fetch_method): + """Ensure fetch with archive file already downloaded is a noop.""" + testpath = str(tmpdir) + archive = os.path.join(testpath, 'gcs.tar.gz') + + with spack.config.override('config:url_fetch_method', _fetch_method): + class Archived_GCSFS(spack.fetch_strategy.GCSFetchStrategy): + @property + def archive_file(self): + return archive + + url = 'gcs:///{0}'.format(archive) + fetcher = Archived_GCSFS(url=url) + with spack.stage.Stage(fetcher, path=testpath): + fetcher.fetch() diff --git a/lib/spack/spack/util/gcs.py b/lib/spack/spack/util/gcs.py new file mode 100644 index 0000000000..57fda7e59c --- /dev/null +++ b/lib/spack/spack/util/gcs.py @@ -0,0 +1,215 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) + +""" +This file contains the definition of the GCS Blob storage Class used to +integrate GCS Blob storage with spack buildcache. +""" + +import os +import sys + +import llnl.util.tty as tty + + +def gcs_client(): + """Create a GCS client + Creates an authenticated GCS client to access GCS buckets and blobs + """ + + try: + import google.auth + from google.cloud import storage + except ImportError as ex: + tty.error('{0}, google-cloud-storage python module is missing.'.format(ex) + + ' Please install to use the gs:// backend.') + sys.exit(1) + + storage_credentials, storage_project = google.auth.default() + storage_client = storage.Client(storage_project, + storage_credentials) + return storage_client + + +class GCSBucket(object): + """GCS Bucket Object + Create a wrapper object for a GCS Bucket. Provides methods to wrap spack + related tasks, such as destroy. + """ + def __init__(self, url, client=None): + """Constructor for GCSBucket objects + + Args: + url (str): The url pointing to the GCS bucket to build an object out of + client (google.cloud.storage.client.Client): A pre-defined storage + client that will be used to access the GCS bucket. + """ + if url.scheme != 'gs': + raise ValueError('Can not create GCS bucket connection with scheme {SCHEME}' + .format(SCHEME=url.scheme)) + self.url = url + self.name = self.url.netloc + if self.url.path[0] == '/': + self.prefix = self.url.path[1:] + else: + self.prefix = self.url.path + + self.client = client or gcs_client() + + self.bucket = None + tty.debug('New GCS bucket:') + tty.debug(" name: {0}".format(self.name)) + tty.debug(" prefix: {0}".format(self.prefix)) + + def exists(self): + from google.cloud.exceptions import NotFound + if not self.bucket: + try: + self.bucket = self.client.bucket(self.name) + except NotFound as ex: + tty.error("{0}, Failed check for bucket existence".format(ex)) + sys.exit(1) + return self.bucket is not None + + def create(self): + if not self.bucket: + self.bucket = self.client.create_bucket(self.name) + + def get_blob(self, blob_path): + if self.exists(): + return self.bucket.get_blob(blob_path) + return None + + def blob(self, blob_path): + if self.exists(): + return self.bucket.blob(blob_path) + return None + + def get_all_blobs(self, recursive=True, relative=True): + """Get a list of all blobs + Returns a list of all blobs within this bucket. + + Args: + relative: If true (default), print blob paths + relative to 'build_cache' directory. + If false, print absolute blob paths (useful for + destruction of bucket) + """ + tty.debug('Getting GCS blobs... Recurse {0} -- Rel: {1}'.format( + recursive, relative)) + + converter = str + if relative: + converter = self._relative_blob_name + + if self.exists(): + all_blobs = self.bucket.list_blobs(prefix=self.prefix) + blob_list = [] + + base_dirs = len(self.prefix.split('/')) + 1 + + for blob in all_blobs: + if not recursive: + num_dirs = len(blob.name.split('/')) + if num_dirs <= base_dirs: + blob_list.append(converter(blob.name)) + else: + blob_list.append(converter(blob.name)) + + return blob_list + + def _relative_blob_name(self, blob_name): + return os.path.relpath(blob_name, self.prefix) + + def destroy(self, recursive=False, **kwargs): + """Bucket destruction method + + Deletes all blobs within the bucket, and then deletes the bucket itself. + + Uses GCS Batch operations to bundle several delete operations together. + """ + from google.cloud.exceptions import NotFound + tty.debug("Bucket.destroy(recursive={0})".format(recursive)) + try: + bucket_blobs = self.get_all_blobs(recursive=recursive, relative=False) + batch_size = 1000 + + num_blobs = len(bucket_blobs) + for i in range(0, num_blobs, batch_size): + with self.client.batch(): + for j in range(i, min(i + batch_size, num_blobs)): + blob = self.blob(bucket_blobs[j]) + blob.delete() + except NotFound as ex: + tty.error("{0}, Could not delete a blob in bucket {1}.".format( + ex, self.name)) + sys.exit(1) + + +class GCSBlob(object): + """GCS Blob object + + Wraps some blob methods for spack functionality + """ + def __init__(self, url, client=None): + + self.url = url + if url.scheme != 'gs': + raise ValueError('Can not create GCS blob connection with scheme: {SCHEME}' + .format(SCHEME=url.scheme)) + + self.client = client or gcs_client() + + self.bucket = GCSBucket(url) + + self.blob_path = self.url.path.lstrip('/') + + tty.debug("New GCSBlob") + tty.debug(" blob_path = {0}".format(self.blob_path)) + + if not self.bucket.exists(): + tty.warn("The bucket {0} does not exist, it will be created" + .format(self.bucket.name)) + self.bucket.create() + + def get(self): + return self.bucket.get_blob(self.blob_path) + + def exists(self): + from google.cloud.exceptions import NotFound + try: + blob = self.bucket.blob(self.blob_path) + exists = blob.exists() + except NotFound: + return False + + return exists + + def delete_blob(self): + from google.cloud.exceptions import NotFound + try: + blob = self.bucket.blob(self.blob_path) + blob.delete() + except NotFound as ex: + tty.error("{0}, Could not delete gcs blob {1}".format(ex, self.blob_path)) + + def upload_to_blob(self, local_file_path): + blob = self.bucket.blob(self.blob_path) + blob.upload_from_filename(local_file_path) + + def get_blob_byte_stream(self): + return self.bucket.get_blob(self.blob_path).open(mode='rb') + + def get_blob_headers(self): + blob = self.bucket.get_blob(self.blob_path) + + headers = { + 'Content-type': blob.content_type, + 'Content-encoding': blob.content_encoding, + 'Content-language': blob.content_language, + 'MD5Hash': blob.md5_hash + } + + return headers diff --git a/lib/spack/spack/util/web.py b/lib/spack/spack/util/web.py index 72f7abc2c4..f1b01ae310 100644 --- a/lib/spack/spack/util/web.py +++ b/lib/spack/spack/util/web.py @@ -28,6 +28,7 @@ import spack.config import spack.error import spack.url import spack.util.crypto +import spack.util.gcs as gcs_util import spack.util.s3 as s3_util import spack.util.url as url_util from spack.util.compression import ALLOWED_ARCHIVE_TYPES @@ -74,6 +75,10 @@ def uses_ssl(parsed_url): if url_util.parse(endpoint_url, scheme='https').scheme == 'https': return True + elif parsed_url.scheme == 'gs': + tty.debug("(uses_ssl) GCS Blob is https") + return True + return False @@ -195,6 +200,12 @@ def push_to_url( if not keep_original: os.remove(local_file_path) + elif remote_url.scheme == 'gs': + gcs = gcs_util.GCSBlob(remote_url) + gcs.upload_to_blob(local_file_path) + if not keep_original: + os.remove(local_file_path) + else: raise NotImplementedError( 'Unrecognized URL scheme: {SCHEME}'.format( @@ -217,6 +228,10 @@ def url_exists(url): return False raise err + elif url.scheme == 'gs': + gcs = gcs_util.GCSBlob(url) + return gcs.exists() + # otherwise, just try to "read" from the URL, and assume that *any* # non-throwing response contains the resource represented by the URL try: @@ -279,6 +294,15 @@ def remove_url(url, recursive=False): s3.delete_object(Bucket=bucket, Key=url.path.lstrip('/')) return + elif url.scheme == 'gs': + if recursive: + bucket = gcs_util.GCSBucket(url) + bucket.destroy(recursive=recursive) + else: + blob = gcs_util.GCSBlob(url) + blob.delete_blob() + return + # Don't even try for other URL schemes. @@ -358,6 +382,10 @@ def list_url(url, recursive=False): key.split('/', 1)[0] for key in _iter_s3_prefix(s3, url))) + elif url.scheme == 'gs': + gcs = gcs_util.GCSBucket(url) + return gcs.get_all_blobs(recursive=recursive) + def spider(root_urls, depth=0, concurrency=32): """Get web pages from root URLs. @@ -516,6 +544,9 @@ def _urlopen(req, *args, **kwargs): if url_util.parse(url).scheme == 's3': import spack.s3_handler opener = spack.s3_handler.open + elif url_util.parse(url).scheme == 'gs': + import spack.gcs_handler + opener = spack.gcs_handler.gcs_open try: return opener(req, *args, **kwargs) |