diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/spack/llnl/util/filesystem.py | 65 | ||||
-rw-r--r-- | lib/spack/spack/binary_distribution.py | 3 | ||||
-rw-r--r-- | lib/spack/spack/fetch_strategy.py | 4 | ||||
-rw-r--r-- | lib/spack/spack/relocate.py | 31 | ||||
-rw-r--r-- | lib/spack/spack/test/data/compression/Foo.cxx | 0 | ||||
-rw-r--r-- | lib/spack/spack/test/util/compression.py | 32 | ||||
-rw-r--r-- | lib/spack/spack/url.py | 9 | ||||
-rw-r--r-- | lib/spack/spack/util/compression.py | 508 | ||||
-rw-r--r-- | lib/spack/spack/util/path.py | 9 |
9 files changed, 524 insertions, 137 deletions
diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py index a5da826217..ad91e7c876 100644 --- a/lib/spack/llnl/util/filesystem.py +++ b/lib/spack/llnl/util/filesystem.py @@ -24,7 +24,7 @@ from llnl.util.compat import Sequence from llnl.util.lang import dedupe, memoized from llnl.util.symlink import islink, symlink -from spack.util.executable import Executable +from spack.util.executable import CommandNotFoundError, Executable, which from spack.util.path import path_to_os_path, system_path_filter is_windows = _platform == "win32" @@ -113,6 +113,69 @@ def path_contains_subdirectory(path, root): return norm_path.startswith(norm_root) +@memoized +def file_command(*args): + """Creates entry point to `file` system command with provided arguments""" + try: + file_cmd = which("file", required=True) + except CommandNotFoundError as e: + if is_windows: + raise CommandNotFoundError("`file` utility is not available on Windows") + else: + raise e + for arg in args: + file_cmd.add_default_arg(arg) + return file_cmd + + +@memoized +def _get_mime_type(): + """Generate method to call `file` system command to aquire mime type + for a specified path + """ + return file_command("-b", "-h", "--mime-type") + + +@memoized +def _get_mime_type_compressed(): + """Same as _get_mime_type but attempts to check for + compression first + """ + mime_uncompressed = _get_mime_type() + mime_uncompressed.add_default_arg("-Z") + return mime_uncompressed + + +def mime_type(filename): + """Returns the mime type and subtype of a file. + + Args: + filename: file to be analyzed + + Returns: + Tuple containing the MIME type and subtype + """ + output = _get_mime_type()(filename, output=str, error=str).strip() + tty.debug("==> " + output) + type, _, subtype = output.partition("/") + return type, subtype + + +def compressed_mime_type(filename): + """Same as mime_type but checks for type that has been compressed + + Args: + filename (str): file to be analyzed + + Returns: + Tuple containing the MIME type and subtype + """ + output = _get_mime_type_compressed()(filename, output=str, error=str).strip() + tty.debug("==> " + output) + type, _, subtype = output.partition("/") + return type, subtype + + #: This generates the library filenames that may appear on any OS. library_extensions = ["a", "la", "so", "tbd", "dylib"] diff --git a/lib/spack/spack/binary_distribution.py b/lib/spack/spack/binary_distribution.py index e51d7d4842..c329287de8 100644 --- a/lib/spack/spack/binary_distribution.py +++ b/lib/spack/spack/binary_distribution.py @@ -19,6 +19,7 @@ from contextlib import closing import ruamel.yaml as yaml from six.moves.urllib.error import HTTPError, URLError +import llnl.util.filesystem as fsys import llnl.util.lang import llnl.util.tty as tty from llnl.util.filesystem import mkdirp @@ -653,7 +654,7 @@ def get_buildfile_manifest(spec): for filename in files: path_name = os.path.join(root, filename) - m_type, m_subtype = relocate.mime_type(path_name) + m_type, m_subtype = fsys.mime_type(path_name) rel_path_name = os.path.relpath(path_name, spec.prefix) added = False diff --git a/lib/spack/spack/fetch_strategy.py b/lib/spack/spack/fetch_strategy.py index 5ed46c3278..ea85c6a682 100644 --- a/lib/spack/spack/fetch_strategy.py +++ b/lib/spack/spack/fetch_strategy.py @@ -54,7 +54,7 @@ import spack.util.pattern as pattern import spack.util.url as url_util import spack.util.web as web_util import spack.version -from spack.util.compression import decompressor_for, extension +from spack.util.compression import decompressor_for, extension_from_path from spack.util.executable import CommandNotFoundError, which from spack.util.string import comma_and, quote @@ -613,7 +613,7 @@ class VCSFetchStrategy(FetchStrategy): @_needs_stage def archive(self, destination, **kwargs): - assert extension(destination) == "tar.gz" + assert extension_from_path(destination) == "tar.gz" assert self.stage.source_path.startswith(self.stage.path) tar = which("tar", required=True) diff --git a/lib/spack/spack/relocate.py b/lib/spack/spack/relocate.py index 8212093a12..3ef332c204 100644 --- a/lib/spack/spack/relocate.py +++ b/lib/spack/spack/relocate.py @@ -11,6 +11,7 @@ import shutil import macholib.mach_o import macholib.MachO +import llnl.util.filesystem as fs import llnl.util.lang import llnl.util.tty as tty from llnl.util.lang import memoized @@ -887,7 +888,7 @@ def file_is_relocatable(filename, paths_to_relocate=None): # Remove the RPATHS from the strings in the executable set_of_strings = set(strings(filename, output=str).split()) - m_type, m_subtype = mime_type(filename) + m_type, m_subtype = fs.mime_type(filename) if m_type == "application": tty.debug("{0},{1}".format(m_type, m_subtype), level=2) @@ -923,7 +924,7 @@ def is_binary(filename): Returns: True or False """ - m_type, _ = mime_type(filename) + m_type, _ = fs.mime_type(filename) msg = "[{0}] -> ".format(filename) if m_type == "application": @@ -934,30 +935,6 @@ def is_binary(filename): return False -@llnl.util.lang.memoized -def _get_mime_type(): - file_cmd = executable.which("file") - for arg in ["-b", "-h", "--mime-type"]: - file_cmd.add_default_arg(arg) - return file_cmd - - -@llnl.util.lang.memoized -def mime_type(filename): - """Returns the mime type and subtype of a file. - - Args: - filename: file to be analyzed - - Returns: - Tuple containing the MIME type and subtype - """ - output = _get_mime_type()(filename, output=str, error=str).strip() - tty.debug("==> " + output, level=2) - type, _, subtype = output.partition("/") - return type, subtype - - # Memoize this due to repeated calls to libraries in the same directory. @llnl.util.lang.memoized def _exists_dir(dirname): @@ -975,7 +952,7 @@ def fixup_macos_rpath(root, filename): True if fixups were applied, else False """ abspath = os.path.join(root, filename) - if mime_type(abspath) != ("application", "x-mach-binary"): + if fs.mime_type(abspath) != ("application", "x-mach-binary"): return False # Get Mach-O header commands diff --git a/lib/spack/spack/test/data/compression/Foo.cxx b/lib/spack/spack/test/data/compression/Foo.cxx new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/lib/spack/spack/test/data/compression/Foo.cxx diff --git a/lib/spack/spack/test/util/compression.py b/lib/spack/spack/test/util/compression.py index 13d1a44a73..907b1946b8 100644 --- a/lib/spack/spack/test/util/compression.py +++ b/lib/spack/spack/test/util/compression.py @@ -22,6 +22,9 @@ ext_archive = {} for ext in scomp.ALLOWED_ARCHIVE_TYPES if "TAR" not in ext ] +# Spack does not use Python native handling for tarballs or zip +# Don't test tarballs or zip in native test +native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key] def support_stub(): @@ -30,10 +33,9 @@ def support_stub(): @pytest.fixture def compr_support_check(monkeypatch): - monkeypatch.setattr(scomp, "lzma_support", support_stub) - monkeypatch.setattr(scomp, "tar_support", support_stub) - monkeypatch.setattr(scomp, "gzip_support", support_stub) - monkeypatch.setattr(scomp, "bz2_support", support_stub) + monkeypatch.setattr(scomp, "is_lzma_supported", support_stub) + monkeypatch.setattr(scomp, "is_gzip_supported", support_stub) + monkeypatch.setattr(scomp, "is_bz2_supported", support_stub) @pytest.fixture @@ -46,10 +48,9 @@ def archive_file(tmpdir_factory, request): return os.path.join(str(tmpdir), "Foo.%s" % extension) -@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True) +@pytest.mark.parametrize("archive_file", native_archive_list, indirect=True) def test_native_unpacking(tmpdir_factory, archive_file): - extension = scomp.extension(archive_file) - util = scomp.decompressor_for(archive_file, extension) + util = scomp.decompressor_for(archive_file) tmpdir = tmpdir_factory.mktemp("comp_test") with working_dir(str(tmpdir)): assert not os.listdir(os.getcwd()) @@ -63,9 +64,8 @@ def test_native_unpacking(tmpdir_factory, archive_file): @pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True) def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check): - extension = scomp.extension(archive_file) # actually run test - util = scomp.decompressor_for(archive_file, extension) + util = scomp.decompressor_for(archive_file) tmpdir = tmpdir_factory.mktemp("system_comp_test") with working_dir(str(tmpdir)): assert not os.listdir(os.getcwd()) @@ -78,23 +78,25 @@ def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check): def test_unallowed_extension(): - bad_ext_archive = "Foo.py" + # use a cxx file as python files included for the test + # are picked up by the linter and break style checks + bad_ext_archive = "Foo.cxx" with pytest.raises(CommandNotFoundError): - scomp.decompressor_for(bad_ext_archive, "py") + scomp.decompressor_for(bad_ext_archive) @pytest.mark.parametrize("archive", ext_archive.values()) def test_get_extension(archive): - ext = scomp.extension(archive) + ext = scomp.extension_from_path(archive) assert ext_archive[ext] == archive def test_get_bad_extension(): - archive = "Foo.py" - ext = scomp.extension(archive) + archive = "Foo.cxx" + ext = scomp.extension_from_path(archive) assert ext is None @pytest.mark.parametrize("path", ext_archive.values()) -def test_allowed_archvie(path): +def test_allowed_archive(path): assert scomp.allowed_archive(path) diff --git a/lib/spack/spack/url.py b/lib/spack/spack/url.py index 00c7d68063..08eef72e93 100644 --- a/lib/spack/spack/url.py +++ b/lib/spack/spack/url.py @@ -36,6 +36,7 @@ from llnl.util.tty.color import cescape, colorize import spack.error import spack.util.compression as comp +import spack.util.path as spath import spack.version @@ -366,17 +367,15 @@ def split_url_extension(path): # Strip off sourceforge download suffix. # e.g. https://sourceforge.net/projects/glew/files/glew/2.0.0/glew-2.0.0.tgz/download - match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path) - if match: - prefix, suffix = match.groups() + prefix, suffix = spath.find_sourceforge_suffix(path) - ext = comp.extension(prefix) + ext = comp.extension_from_path(prefix) if ext is not None: prefix = comp.strip_extension(prefix) else: prefix, suf = strip_query_and_fragment(prefix) - ext = comp.extension(prefix) + ext = comp.extension_from_path(prefix) prefix = comp.strip_extension(prefix) suffix = suf + suffix if ext is None: diff --git a/lib/spack/spack/util/compression.py b/lib/spack/spack/util/compression.py index d9c1f5bd18..2411daa6ad 100644 --- a/lib/spack/spack/util/compression.py +++ b/lib/spack/spack/util/compression.py @@ -3,61 +3,67 @@ # # SPDX-License-Identifier: (Apache-2.0 OR MIT) +import inspect +import io import os import re import shutil import sys from itertools import product +from llnl.util import tty + +import spack.util.path as spath from spack.util.executable import CommandNotFoundError, which # Supported archive extensions. PRE_EXTS = ["tar", "TAR"] EXTS = ["gz", "bz2", "xz", "Z"] -NOTAR_EXTS = ["zip", "tgz", "tbz", "tbz2", "txz"] +NOTAR_EXTS = ["zip", "tgz", "tbz2", "tbz", "txz"] # Add PRE_EXTS and EXTS last so that .tar.gz is matched *before* .tar or .gz ALLOWED_ARCHIVE_TYPES = ( [".".join(ext) for ext in product(PRE_EXTS, EXTS)] + PRE_EXTS + EXTS + NOTAR_EXTS ) +ALLOWED_SINGLE_EXT_ARCHIVE_TYPES = PRE_EXTS + EXTS + NOTAR_EXTS + is_windows = sys.platform == "win32" +try: + import bz2 # noqa -def bz2_support(): - try: - import bz2 # noqa: F401 + _bz2_support = True +except ImportError: + _bz2_support = False - return True - except ImportError: - return False +try: + import gzip # noqa -def gzip_support(): - try: - import gzip # noqa: F401 + _gzip_support = True +except ImportError: + _gzip_support = False - return True - except ImportError: - return False +try: + import lzma # noqa # novermin -def lzma_support(): - try: - import lzma # noqa: F401 # novm + _lzma_support = True +except ImportError: + _lzma_support = False - return True - except ImportError: - return False +def is_lzma_supported(): + return _lzma_support -def tar_support(): - try: - import tarfile # noqa: F401 - return True - except ImportError: - return False +def is_gzip_supported(): + return _gzip_support + + +def is_bz2_supported(): + return _bz2_support def allowed_archive(path): @@ -75,8 +81,7 @@ def _untar(archive_file): archive_file (str): absolute path to the archive to be extracted. Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz). """ - _, ext = os.path.splitext(archive_file) - outfile = os.path.basename(archive_file.strip(ext)) + outfile = os.path.basename(strip_extension(archive_file, "tar")) tar = which("tar", required=True) tar.add_default_arg("-oxf") @@ -91,15 +96,12 @@ def _bunzip2(archive_file): Args: archive_file (str): absolute path to the bz2 archive to be decompressed """ - _, ext = os.path.splitext(archive_file) compressed_file_name = os.path.basename(archive_file) - decompressed_file = os.path.basename(archive_file.strip(ext)) + decompressed_file = os.path.basename(strip_extension(archive_file, "bz2")) working_dir = os.getcwd() archive_out = os.path.join(working_dir, decompressed_file) copy_path = os.path.join(working_dir, compressed_file_name) - if bz2_support(): - import bz2 - + if is_bz2_supported(): f_bz = bz2.BZ2File(archive_file, mode="rb") with open(archive_out, "wb") as ar: shutil.copyfileobj(f_bz, ar) @@ -121,13 +123,10 @@ def _gunzip(archive_file): Args: archive_file (str): absolute path of the file to be decompressed """ - _, ext = os.path.splitext(archive_file) - decompressed_file = os.path.basename(archive_file.strip(ext)) + decompressed_file = os.path.basename(strip_extension(archive_file, "gz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) - if gzip_support(): - import gzip - + if is_gzip_supported(): f_in = gzip.open(archive_file, "rb") with open(destination_abspath, "wb") as f_out: shutil.copyfileobj(f_in, f_out) @@ -138,8 +137,7 @@ def _gunzip(archive_file): def _system_gunzip(archive_file): - _, ext = os.path.splitext(archive_file) - decompressed_file = os.path.basename(archive_file.strip(ext)) + decompressed_file = os.path.basename(strip_extension(archive_file, "gz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) compressed_file = os.path.basename(archive_file) @@ -159,17 +157,16 @@ def _unzip(archive_file): Args: archive_file (str): absolute path of the file to be decompressed """ - - destination_abspath = os.getcwd() - exe = "unzip" - arg = "-q" + extracted_file = os.path.basename(strip_extension(archive_file, "zip")) if is_windows: - exe = "tar" - arg = "-xf" - unzip = which(exe, required=True) - unzip.add_default_arg(arg) - unzip(archive_file) - return destination_abspath + return _untar(archive_file) + else: + exe = "unzip" + arg = "-q" + unzip = which(exe, required=True) + unzip.add_default_arg(arg) + unzip(archive_file) + return extracted_file def _unZ(archive_file): @@ -185,11 +182,8 @@ def _lzma_decomp(archive_file): lzma module, but fall back on command line xz tooling to find available Python support. This is the xz command on Unix and 7z on Windows""" - if lzma_support(): - import lzma # novermin - - _, ext = os.path.splitext(archive_file) - decompressed_file = os.path.basename(archive_file.strip(ext)) + if is_lzma_supported(): + decompressed_file = os.path.basename(strip_extension(archive_file, "xz")) archive_out = os.path.join(os.getcwd(), decompressed_file) with open(archive_out, "wb") as ar: with lzma.open(archive_file) as lar: @@ -201,14 +195,41 @@ def _lzma_decomp(archive_file): return _xz(archive_file) +def _win_compressed_tarball_handler(archive_file): + """Decompress and extract compressed tarballs on Windows. + This method uses 7zip in conjunction with the tar utility + to perform decompression and extraction in a two step process + first using 7zip to decompress, and tar to extract. + + The motivation for this method is the inability of 7zip + to directly decompress and extract compressed archives + in a single shot without undocumented workarounds, and + the Windows tar utility's lack of access to the xz tool (unsupported on Windows) + """ + # perform intermediate extraction step + # record name of new archive so we can extract + # and later clean up + decomped_tarball = _7zip(archive_file) + # 7zip is able to one shot extract compressed archives + # that have been named .txz. If that is the case, there will + # be no intermediate archvie to extract. + if check_extension(decomped_tarball, "tar"): + # run tar on newly decomped archive + outfile = _untar(decomped_tarball) + # clean intermediate archive to mimic end result + # produced by one shot decomp/extraction + os.remove(decomped_tarball) + return outfile + return decomped_tarball + + def _xz(archive_file): """Decompress lzma compressed .xz files via xz command line tool. Available only on Unix """ if is_windows: raise RuntimeError("XZ tool unavailable on Windows") - _, ext = os.path.splitext(archive_file) - decompressed_file = os.path.basename(archive_file.strip(ext)) + decompressed_file = os.path.basename(strip_extension(archive_file, "xz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) compressed_file = os.path.basename(archive_file) @@ -234,84 +255,399 @@ def _7zip(archive_file): Args: archive_file (str): absolute path of file to be unarchived """ - _, ext = os.path.splitext(archive_file) - outfile = os.path.basename(archive_file.strip(ext)) + outfile = os.path.basename(strip_last_extension(archive_file)) _7z = which("7z") if not _7z: raise CommandNotFoundError( "7z unavailable,\ unable to extract %s files. 7z can be installed via Spack" - % ext + % extension_from_path(archive_file) ) _7z.add_default_arg("e") _7z(archive_file) return outfile -def decompressor_for(path, ext): +def decompressor_for(path, extension=None): """Returns a function pointer to appropriate decompression algorithm based on extension type. Args: path (str): path of the archive file requiring decompression - ext (str): Extension of archive file """ - if not allowed_archive(ext): + if not extension: + extension = extension_from_file(path, decompress=True) + + if not allowed_archive(extension): raise CommandNotFoundError( "Cannot extract archive, \ unrecognized file extension: '%s'" - % ext + % extension ) - if re.match(r"\.?zip$", ext) or path.endswith(".zip"): + if re.match(r"\.?zip$", extension) or path.endswith(".zip"): return _unzip - if re.match(r"gz", ext): + if re.match(r"gz", extension): return _gunzip - if re.match(r"bz2", ext): + if re.match(r"bz2", extension): return _bunzip2 # Python does not have native support # of any kind for .Z files. In these cases, # we rely on external tools such as tar, # 7z, or uncompressZ - if re.match(r"Z$", ext): + if re.match(r"Z$", extension): return _unZ # Python and platform may not have support for lzma # compression. If no lzma support, use tools available on systems # 7zip on Windows and the xz tool on Unix systems. - if re.match(r"xz", ext): + if re.match(r"xz", extension): return _lzma_decomp - if ("xz" in ext or "Z" in ext) and is_windows: - return _7zip + # Catch tar.xz/tar.Z files here for Windows + # as the tar utility on Windows cannot handle such + # compression types directly + if ("xz" in extension or "Z" in extension) and is_windows: + return _win_compressed_tarball_handler return _untar -def strip_extension(path): - """Get the part of a path that does not include its compressed - type extension.""" - for type in ALLOWED_ARCHIVE_TYPES: - suffix = r"\.%s$" % type - if re.search(suffix, path): - return re.sub(suffix, "", path) - return path +class FileTypeInterface: + """ + Base interface class for describing and querying file type information. + FileType describes information about a single file type + such as extension, and byte header properties, and provides an interface + to check a given file against said type based on magic number. + + This class should be subclassed each time a new type is to be + described. + + Note: This class should not be used directly as it does not define any specific + file. Attempts to directly use this class will fail, as it does not define + a magic number or extension string. + + Subclasses should each describe a different + type of file. In order to do so, they must define + the extension string, magic number, and header offset (if non zero). + If a class has multiple magic numbers, it will need to + override the method describin that file types magic numbers and + the method that checks a types magic numbers against a given file's. + """ + + OFFSET = 0 + compressed = False + + @staticmethod + def name(): + raise NotImplementedError + + @classmethod + def magic_number(cls): + """Return a list of all potential magic numbers for a filetype""" + return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")] + + @classmethod + def header_size(cls): + """Return size of largest magic number associated with file type""" + return max([len(x) for x in cls.magic_number()]) + + @classmethod + def _bytes_check(cls, magic_bytes): + for magic in cls.magic_number(): + if magic_bytes.startswith(magic): + return True + return False + + @classmethod + def is_file_of_type(cls, iostream): + """Query byte stream for appropriate magic number + + Args: + iostream: file byte stream + + Returns: + Bool denoting whether file is of class file type + based on magic number + """ + if not iostream: + return False + # move to location of magic bytes + iostream.seek(cls.OFFSET) + magic_bytes = iostream.read(cls.header_size()) + # return to beginning of file + iostream.seek(0) + if cls._bytes_check(magic_bytes): + return True + return False + +class CompressedFileTypeInterface(FileTypeInterface): + """Interface class for FileTypes that include compression information""" + + compressed = True -def extension(path): - """Get the archive extension for a path.""" + @staticmethod + def decomp_in_memory(stream): + """This method decompresses and loads the first 200 or so bytes of a compressed file + to check for compressed archives. This does not decompress the entire file and should + not be used for direct expansion of archives/compressed files + """ + raise NotImplementedError("Implementation by compression subclass required") + + +class BZipFileType(CompressedFileTypeInterface): + _MAGIC_NUMBER = b"\x42\x5a\x68" + extension = "bz2" + + @staticmethod + def name(): + return "bzip2 compressed data" + + @staticmethod + def decomp_in_memory(stream): + if is_bz2_supported(): + # checking for underlying archive, only decomp as many bytes + # as is absolutely neccesary for largest archive header (tar) + comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size()) + return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream)) + return None + + +class ZCompressedFileType(CompressedFileTypeInterface): + _MAGIC_NUMBER_LZW = b"\x1f\x9d" + _MAGIC_NUMBER_LZH = b"\x1f\xa0" + extension = "Z" + + @staticmethod + def name(): + return "compress'd data" + + @staticmethod + def decomp_in_memory(stream): + # python has no method of decompressing `.Z` files in memory + return None + + +class GZipFileType(CompressedFileTypeInterface): + _MAGIC_NUMBER = b"\x1f\x8b\x08" + extension = "gz" + + @staticmethod + def name(): + return "gzip compressed data" + + @staticmethod + def decomp_in_memory(stream): + if is_gzip_supported(): + # checking for underlying archive, only decomp as many bytes + # as is absolutely neccesary for largest archive header (tar) + return io.BytesIO( + initial_bytes=gzip.GzipFile(fileobj=stream).read( + TarFileType.OFFSET + TarFileType.header_size() + ) + ) + return None + + +class LzmaFileType(CompressedFileTypeInterface): + _MAGIC_NUMBER = b"\xfd7zXZ" + extension = "xz" + + @staticmethod + def name(): + return "xz compressed data" + + @staticmethod + def decomp_in_memory(stream): + if is_lzma_supported(): + # checking for underlying archive, only decomp as many bytes + # as is absolutely neccesary for largest archive header (tar) + max_size = TarFileType.OFFSET + TarFileType.header_size() + return io.BytesIO( + initial_bytes=lzma.LZMADecompressor().decompress( + stream.read(max_size), max_length=max_size + ) + ) + return None + + +class TarFileType(FileTypeInterface): + OFFSET = 257 + _MAGIC_NUMBER_GNU = b"ustar \0" + _MAGIC_NUMBER_POSIX = b"ustar\x0000" + extension = "tar" + + @staticmethod + def name(): + return "tar archive" + + +class ZipFleType(FileTypeInterface): + _MAGIC_NUMBER = b"PK\003\004" + extension = "zip" + + @staticmethod + def name(): + return "Zip archive data" + + +# collection of valid Spack recognized archive and compression +# file type identifier classes. +VALID_FILETYPES = [ + BZipFileType, + ZCompressedFileType, + GZipFileType, + LzmaFileType, + TarFileType, + ZipFleType, +] + + +def extension_from_stream(stream, decompress=False): + """Return extension represented by stream corresponding to archive file + If stream does not represent an archive type recongized by Spack + (see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None + + Extension type is derived by searching for identifying bytes + in file stream. + + Args: + stream : stream representing a file on system + decompress (bool) : if True, compressed files are checked + for archive types beneath compression i.e. tar.gz + default is False, otherwise, return top level type i.e. gz + + Return: + A string represting corresponding archive extension + or None as relevant. + + """ + for arc_type in VALID_FILETYPES: + if arc_type.is_file_of_type(stream): + suffix_ext = arc_type.extension + prefix_ext = "" + if arc_type.compressed and decompress: + # stream represents compressed file + # get decompressed stream (if possible) + decomp_stream = arc_type.decomp_in_memory(stream) + prefix_ext = extension_from_stream(decomp_stream, decompress=decompress) + if not prefix_ext: + # We were unable to decompress or unable to derive + # a nested extension from decompressed file. + # Try to use filename parsing to check for + # potential nested extensions if there are any + tty.debug( + "Cannot derive file extension from magic number;" + " falling back to regex path parsing." + ) + return extension_from_path(stream.name) + resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext]) + tty.debug("File extension %s successfully derived by magic number." % resultant_ext) + return resultant_ext + return None + + +def extension_from_file(file, decompress=False): + """Return extension from archive file path + Extension is derived based on magic number parsing similar + to the `file` utility. Attempts to return abbreviated file extensions + whenever a file has an abbreviated extension such as `.tgz` or `.txz`. + This distinction in abbreivated extension names is accomplished + by string parsing. + + Args: + file (os.PathLike): path descibing file on system for which ext + will be determined. + decompress (bool): If True, method will peek into compressed + files to check for archive file types. default is False. + If false, method will be unable to distinguish `.tar.gz` from `.gz` + or similar. + Return: + Spack recognized archive file extension as determined by file's magic number and + file name. If file is not on system or is of an type not recognized by Spack as + an archive or compression type, None is returned. + """ + if os.path.exists(file): + with open(file, "rb") as f: + ext = extension_from_stream(f, decompress) + # based on magic number, file is compressed + # tar archive. Check to see if file is abbreviated as + # t[xz|gz|bz2|bz] + if ext and ext.startswith("tar."): + suf = ext.split(".")[1] + abbr = "t" + suf + if check_extension(file, abbr): + return abbr + if not ext: + # If unable to parse extension from stream, + # attempt to fall back to string parsing + ext = extension_from_path(file) + return ext + return None + + +def extension_from_path(path): + """Get the allowed archive extension for a path. + If path does not include a valid archive extension + (see`spack.util.compression.ALLOWED_ARCHIVE_TYPES`) return None + """ if path is None: raise ValueError("Can't call extension() on None") - # Strip sourceforge suffix. - if re.search(r"((?:sourceforge.net|sf.net)/.*)/download$", path): - path = os.path.dirname(path) - for t in ALLOWED_ARCHIVE_TYPES: - suffix = r"\.%s$" % t - if re.search(suffix, path): + if check_extension(path, t): return t return None + + +def strip_last_extension(path): + """Strips last supported archive extension from path""" + if path: + for ext in ALLOWED_SINGLE_EXT_ARCHIVE_TYPES: + mod_path = check_and_remove_ext(path, ext) + if mod_path != path: + return mod_path + return path + + +def strip_extension(path, ext=None): + """Get the part of a path that does not include its compressed + type extension.""" + if ext: + return check_and_remove_ext(path, ext) + for t in ALLOWED_ARCHIVE_TYPES: + mod_path = check_and_remove_ext(path, t) + if mod_path != path: + return mod_path + return path + + +def check_extension(path, ext): + """Check if extension is present in path""" + # Strip sourceforge suffix. + prefix, _ = spath.find_sourceforge_suffix(path) + if not ext.startswith(r"\."): + ext = r"\.%s$" % ext + if re.search(ext, prefix): + return True + return False + + +def reg_remove_ext(path, ext): + """Regex remove ext from path""" + if path and ext: + suffix = r"\.%s$" % ext + return re.sub(suffix, "", path) + return path + + +def check_and_remove_ext(path, ext): + """If given extension is present in path, remove and return, + otherwise just return path""" + if check_extension(path, ext): + return reg_remove_ext(path, ext) + return path diff --git a/lib/spack/spack/util/path.py b/lib/spack/spack/util/path.py index 981a6b672d..fe45541321 100644 --- a/lib/spack/spack/util/path.py +++ b/lib/spack/spack/util/path.py @@ -71,6 +71,15 @@ def win_exe_ext(): return ".exe" +def find_sourceforge_suffix(path): + """find and match sourceforge filepath components + Return match object""" + match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path) + if match: + return match.groups() + return path, "" + + def path_to_os_path(*pths): """ Takes an arbitrary number of positional parameters |