diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/spack/spack/fetch_strategy.py | 49 | ||||
-rw-r--r-- | lib/spack/spack/gcs_handler.py | 26 | ||||
-rw-r--r-- | lib/spack/spack/test/gcs_fetch.py | 54 | ||||
-rw-r--r-- | lib/spack/spack/util/gcs.py | 215 | ||||
-rw-r--r-- | lib/spack/spack/util/web.py | 31 |
5 files changed, 375 insertions, 0 deletions
diff --git a/lib/spack/spack/fetch_strategy.py b/lib/spack/spack/fetch_strategy.py index 436fbf39bf..888cad7bf3 100644 --- a/lib/spack/spack/fetch_strategy.py +++ b/lib/spack/spack/fetch_strategy.py @@ -1407,6 +1407,55 @@ class S3FetchStrategy(URLFetchStrategy): raise FailedDownloadError(self.url) +@fetcher +class GCSFetchStrategy(URLFetchStrategy): + """FetchStrategy that pulls from a GCS bucket.""" + url_attr = 'gs' + + def __init__(self, *args, **kwargs): + try: + super(GCSFetchStrategy, self).__init__(*args, **kwargs) + except ValueError: + if not kwargs.get('url'): + raise ValueError( + "GCSFetchStrategy requires a url for fetching.") + + @_needs_stage + def fetch(self): + import spack.util.web as web_util + if self.archive_file: + tty.debug('Already downloaded {0}'.format(self.archive_file)) + return + + parsed_url = url_util.parse(self.url) + if parsed_url.scheme != 'gs': + raise FetchError( + 'GCSFetchStrategy can only fetch from gs:// urls.') + + tty.debug('Fetching {0}'.format(self.url)) + + basename = os.path.basename(parsed_url.path) + + with working_dir(self.stage.path): + _, headers, stream = web_util.read_from_url(self.url) + + with open(basename, 'wb') as f: + shutil.copyfileobj(stream, f) + + content_type = web_util.get_header(headers, 'Content-type') + + if content_type == 'text/html': + warn_content_type_mismatch(self.archive_file or "the archive") + + if self.stage.save_filename: + os.rename( + os.path.join(self.stage.path, basename), + self.stage.save_filename) + + if not self.archive_file: + raise FailedDownloadError(self.url) + + def stable_target(fetcher): """Returns whether the fetcher target is expected to have a stable checksum. This is only true if the target is a preexisting archive diff --git a/lib/spack/spack/gcs_handler.py b/lib/spack/spack/gcs_handler.py new file mode 100644 index 0000000000..1f120e78d8 --- /dev/null +++ b/lib/spack/spack/gcs_handler.py @@ -0,0 +1,26 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) + +import six.moves.urllib.response as urllib_response + +import spack.util.url as url_util +import spack.util.web as web_util + + +def gcs_open(req, *args, **kwargs): + """Open a reader stream to a blob object on GCS + """ + import spack.util.gcs as gcs_util + + url = url_util.parse(req.get_full_url()) + gcsblob = gcs_util.GCSBlob(url) + + if not gcsblob.exists(): + raise web_util.SpackWebError('GCS blob {0} does not exist'.format( + gcsblob.blob_path)) + stream = gcsblob.get_blob_byte_stream() + headers = gcsblob.get_blob_headers() + + return urllib_response.addinfourl(stream, headers, url) diff --git a/lib/spack/spack/test/gcs_fetch.py b/lib/spack/spack/test/gcs_fetch.py new file mode 100644 index 0000000000..82b0099048 --- /dev/null +++ b/lib/spack/spack/test/gcs_fetch.py @@ -0,0 +1,54 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) + +import os + +import pytest + +import spack.config +import spack.fetch_strategy +import spack.stage + + +@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib']) +def test_gcsfetchstrategy_without_url(_fetch_method): + """Ensure constructor with no URL fails.""" + with spack.config.override('config:url_fetch_method', _fetch_method): + with pytest.raises(ValueError): + spack.fetch_strategy.GCSFetchStrategy(None) + + +@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib']) +def test_gcsfetchstrategy_bad_url(tmpdir, _fetch_method): + """Ensure fetch with bad URL fails as expected.""" + testpath = str(tmpdir) + + with spack.config.override('config:url_fetch_method', _fetch_method): + fetcher = spack.fetch_strategy.GCSFetchStrategy(url='file:///does-not-exist') + assert fetcher is not None + + with spack.stage.Stage(fetcher, path=testpath) as stage: + assert stage is not None + assert fetcher.archive_file is None + with pytest.raises(spack.fetch_strategy.FetchError): + fetcher.fetch() + + +@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib']) +def test_gcsfetchstrategy_downloaded(tmpdir, _fetch_method): + """Ensure fetch with archive file already downloaded is a noop.""" + testpath = str(tmpdir) + archive = os.path.join(testpath, 'gcs.tar.gz') + + with spack.config.override('config:url_fetch_method', _fetch_method): + class Archived_GCSFS(spack.fetch_strategy.GCSFetchStrategy): + @property + def archive_file(self): + return archive + + url = 'gcs:///{0}'.format(archive) + fetcher = Archived_GCSFS(url=url) + with spack.stage.Stage(fetcher, path=testpath): + fetcher.fetch() diff --git a/lib/spack/spack/util/gcs.py b/lib/spack/spack/util/gcs.py new file mode 100644 index 0000000000..57fda7e59c --- /dev/null +++ b/lib/spack/spack/util/gcs.py @@ -0,0 +1,215 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) + +""" +This file contains the definition of the GCS Blob storage Class used to +integrate GCS Blob storage with spack buildcache. +""" + +import os +import sys + +import llnl.util.tty as tty + + +def gcs_client(): + """Create a GCS client + Creates an authenticated GCS client to access GCS buckets and blobs + """ + + try: + import google.auth + from google.cloud import storage + except ImportError as ex: + tty.error('{0}, google-cloud-storage python module is missing.'.format(ex) + + ' Please install to use the gs:// backend.') + sys.exit(1) + + storage_credentials, storage_project = google.auth.default() + storage_client = storage.Client(storage_project, + storage_credentials) + return storage_client + + +class GCSBucket(object): + """GCS Bucket Object + Create a wrapper object for a GCS Bucket. Provides methods to wrap spack + related tasks, such as destroy. + """ + def __init__(self, url, client=None): + """Constructor for GCSBucket objects + + Args: + url (str): The url pointing to the GCS bucket to build an object out of + client (google.cloud.storage.client.Client): A pre-defined storage + client that will be used to access the GCS bucket. + """ + if url.scheme != 'gs': + raise ValueError('Can not create GCS bucket connection with scheme {SCHEME}' + .format(SCHEME=url.scheme)) + self.url = url + self.name = self.url.netloc + if self.url.path[0] == '/': + self.prefix = self.url.path[1:] + else: + self.prefix = self.url.path + + self.client = client or gcs_client() + + self.bucket = None + tty.debug('New GCS bucket:') + tty.debug(" name: {0}".format(self.name)) + tty.debug(" prefix: {0}".format(self.prefix)) + + def exists(self): + from google.cloud.exceptions import NotFound + if not self.bucket: + try: + self.bucket = self.client.bucket(self.name) + except NotFound as ex: + tty.error("{0}, Failed check for bucket existence".format(ex)) + sys.exit(1) + return self.bucket is not None + + def create(self): + if not self.bucket: + self.bucket = self.client.create_bucket(self.name) + + def get_blob(self, blob_path): + if self.exists(): + return self.bucket.get_blob(blob_path) + return None + + def blob(self, blob_path): + if self.exists(): + return self.bucket.blob(blob_path) + return None + + def get_all_blobs(self, recursive=True, relative=True): + """Get a list of all blobs + Returns a list of all blobs within this bucket. + + Args: + relative: If true (default), print blob paths + relative to 'build_cache' directory. + If false, print absolute blob paths (useful for + destruction of bucket) + """ + tty.debug('Getting GCS blobs... Recurse {0} -- Rel: {1}'.format( + recursive, relative)) + + converter = str + if relative: + converter = self._relative_blob_name + + if self.exists(): + all_blobs = self.bucket.list_blobs(prefix=self.prefix) + blob_list = [] + + base_dirs = len(self.prefix.split('/')) + 1 + + for blob in all_blobs: + if not recursive: + num_dirs = len(blob.name.split('/')) + if num_dirs <= base_dirs: + blob_list.append(converter(blob.name)) + else: + blob_list.append(converter(blob.name)) + + return blob_list + + def _relative_blob_name(self, blob_name): + return os.path.relpath(blob_name, self.prefix) + + def destroy(self, recursive=False, **kwargs): + """Bucket destruction method + + Deletes all blobs within the bucket, and then deletes the bucket itself. + + Uses GCS Batch operations to bundle several delete operations together. + """ + from google.cloud.exceptions import NotFound + tty.debug("Bucket.destroy(recursive={0})".format(recursive)) + try: + bucket_blobs = self.get_all_blobs(recursive=recursive, relative=False) + batch_size = 1000 + + num_blobs = len(bucket_blobs) + for i in range(0, num_blobs, batch_size): + with self.client.batch(): + for j in range(i, min(i + batch_size, num_blobs)): + blob = self.blob(bucket_blobs[j]) + blob.delete() + except NotFound as ex: + tty.error("{0}, Could not delete a blob in bucket {1}.".format( + ex, self.name)) + sys.exit(1) + + +class GCSBlob(object): + """GCS Blob object + + Wraps some blob methods for spack functionality + """ + def __init__(self, url, client=None): + + self.url = url + if url.scheme != 'gs': + raise ValueError('Can not create GCS blob connection with scheme: {SCHEME}' + .format(SCHEME=url.scheme)) + + self.client = client or gcs_client() + + self.bucket = GCSBucket(url) + + self.blob_path = self.url.path.lstrip('/') + + tty.debug("New GCSBlob") + tty.debug(" blob_path = {0}".format(self.blob_path)) + + if not self.bucket.exists(): + tty.warn("The bucket {0} does not exist, it will be created" + .format(self.bucket.name)) + self.bucket.create() + + def get(self): + return self.bucket.get_blob(self.blob_path) + + def exists(self): + from google.cloud.exceptions import NotFound + try: + blob = self.bucket.blob(self.blob_path) + exists = blob.exists() + except NotFound: + return False + + return exists + + def delete_blob(self): + from google.cloud.exceptions import NotFound + try: + blob = self.bucket.blob(self.blob_path) + blob.delete() + except NotFound as ex: + tty.error("{0}, Could not delete gcs blob {1}".format(ex, self.blob_path)) + + def upload_to_blob(self, local_file_path): + blob = self.bucket.blob(self.blob_path) + blob.upload_from_filename(local_file_path) + + def get_blob_byte_stream(self): + return self.bucket.get_blob(self.blob_path).open(mode='rb') + + def get_blob_headers(self): + blob = self.bucket.get_blob(self.blob_path) + + headers = { + 'Content-type': blob.content_type, + 'Content-encoding': blob.content_encoding, + 'Content-language': blob.content_language, + 'MD5Hash': blob.md5_hash + } + + return headers diff --git a/lib/spack/spack/util/web.py b/lib/spack/spack/util/web.py index 72f7abc2c4..f1b01ae310 100644 --- a/lib/spack/spack/util/web.py +++ b/lib/spack/spack/util/web.py @@ -28,6 +28,7 @@ import spack.config import spack.error import spack.url import spack.util.crypto +import spack.util.gcs as gcs_util import spack.util.s3 as s3_util import spack.util.url as url_util from spack.util.compression import ALLOWED_ARCHIVE_TYPES @@ -74,6 +75,10 @@ def uses_ssl(parsed_url): if url_util.parse(endpoint_url, scheme='https').scheme == 'https': return True + elif parsed_url.scheme == 'gs': + tty.debug("(uses_ssl) GCS Blob is https") + return True + return False @@ -195,6 +200,12 @@ def push_to_url( if not keep_original: os.remove(local_file_path) + elif remote_url.scheme == 'gs': + gcs = gcs_util.GCSBlob(remote_url) + gcs.upload_to_blob(local_file_path) + if not keep_original: + os.remove(local_file_path) + else: raise NotImplementedError( 'Unrecognized URL scheme: {SCHEME}'.format( @@ -217,6 +228,10 @@ def url_exists(url): return False raise err + elif url.scheme == 'gs': + gcs = gcs_util.GCSBlob(url) + return gcs.exists() + # otherwise, just try to "read" from the URL, and assume that *any* # non-throwing response contains the resource represented by the URL try: @@ -279,6 +294,15 @@ def remove_url(url, recursive=False): s3.delete_object(Bucket=bucket, Key=url.path.lstrip('/')) return + elif url.scheme == 'gs': + if recursive: + bucket = gcs_util.GCSBucket(url) + bucket.destroy(recursive=recursive) + else: + blob = gcs_util.GCSBlob(url) + blob.delete_blob() + return + # Don't even try for other URL schemes. @@ -358,6 +382,10 @@ def list_url(url, recursive=False): key.split('/', 1)[0] for key in _iter_s3_prefix(s3, url))) + elif url.scheme == 'gs': + gcs = gcs_util.GCSBucket(url) + return gcs.get_all_blobs(recursive=recursive) + def spider(root_urls, depth=0, concurrency=32): """Get web pages from root URLs. @@ -516,6 +544,9 @@ def _urlopen(req, *args, **kwargs): if url_util.parse(url).scheme == 's3': import spack.s3_handler opener = spack.s3_handler.open + elif url_util.parse(url).scheme == 'gs': + import spack.gcs_handler + opener = spack.gcs_handler.gcs_open try: return opener(req, *args, **kwargs) |