summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn W. Parent <45471568+johnwparent@users.noreply.github.com>2022-09-26 03:01:42 -0400
committerGitHub <noreply@github.com>2022-09-26 00:01:42 -0700
commit30f6fd8dc060a9b120e5c4ce94f80a60e7bf7e51 (patch)
tree8d9b04d76ae33968f9942d16904d9883da1f82a8
parenta5ea566bdfe066f5f41e9e19c636d32f4ed41810 (diff)
downloadspack-30f6fd8dc060a9b120e5c4ce94f80a60e7bf7e51.tar.gz
spack-30f6fd8dc060a9b120e5c4ce94f80a60e7bf7e51.tar.bz2
spack-30f6fd8dc060a9b120e5c4ce94f80a60e7bf7e51.tar.xz
spack-30f6fd8dc060a9b120e5c4ce94f80a60e7bf7e51.zip
Fetching/decompressing: use magic numbers (#31589)
Spack currently depends on parsing filenames of downloaded files to determine what type of archive they are and how to decompress them. This commit adds a preliminary check based on magic numbers to determine archive type (but falls back on name parsing if the extension type cannot be determined). As part of this work, this commit also enables decompression of .tar.xz-compressed archives on Windows.
-rw-r--r--lib/spack/llnl/util/filesystem.py65
-rw-r--r--lib/spack/spack/binary_distribution.py3
-rw-r--r--lib/spack/spack/fetch_strategy.py4
-rw-r--r--lib/spack/spack/relocate.py31
-rw-r--r--lib/spack/spack/test/data/compression/Foo.cxx0
-rw-r--r--lib/spack/spack/test/util/compression.py32
-rw-r--r--lib/spack/spack/url.py9
-rw-r--r--lib/spack/spack/util/compression.py508
-rw-r--r--lib/spack/spack/util/path.py9
9 files changed, 524 insertions, 137 deletions
diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py
index a5da826217..ad91e7c876 100644
--- a/lib/spack/llnl/util/filesystem.py
+++ b/lib/spack/llnl/util/filesystem.py
@@ -24,7 +24,7 @@ from llnl.util.compat import Sequence
from llnl.util.lang import dedupe, memoized
from llnl.util.symlink import islink, symlink
-from spack.util.executable import Executable
+from spack.util.executable import CommandNotFoundError, Executable, which
from spack.util.path import path_to_os_path, system_path_filter
is_windows = _platform == "win32"
@@ -113,6 +113,69 @@ def path_contains_subdirectory(path, root):
return norm_path.startswith(norm_root)
+@memoized
+def file_command(*args):
+ """Creates entry point to `file` system command with provided arguments"""
+ try:
+ file_cmd = which("file", required=True)
+ except CommandNotFoundError as e:
+ if is_windows:
+ raise CommandNotFoundError("`file` utility is not available on Windows")
+ else:
+ raise e
+ for arg in args:
+ file_cmd.add_default_arg(arg)
+ return file_cmd
+
+
+@memoized
+def _get_mime_type():
+ """Generate method to call `file` system command to aquire mime type
+ for a specified path
+ """
+ return file_command("-b", "-h", "--mime-type")
+
+
+@memoized
+def _get_mime_type_compressed():
+ """Same as _get_mime_type but attempts to check for
+ compression first
+ """
+ mime_uncompressed = _get_mime_type()
+ mime_uncompressed.add_default_arg("-Z")
+ return mime_uncompressed
+
+
+def mime_type(filename):
+ """Returns the mime type and subtype of a file.
+
+ Args:
+ filename: file to be analyzed
+
+ Returns:
+ Tuple containing the MIME type and subtype
+ """
+ output = _get_mime_type()(filename, output=str, error=str).strip()
+ tty.debug("==> " + output)
+ type, _, subtype = output.partition("/")
+ return type, subtype
+
+
+def compressed_mime_type(filename):
+ """Same as mime_type but checks for type that has been compressed
+
+ Args:
+ filename (str): file to be analyzed
+
+ Returns:
+ Tuple containing the MIME type and subtype
+ """
+ output = _get_mime_type_compressed()(filename, output=str, error=str).strip()
+ tty.debug("==> " + output)
+ type, _, subtype = output.partition("/")
+ return type, subtype
+
+
#: This generates the library filenames that may appear on any OS.
library_extensions = ["a", "la", "so", "tbd", "dylib"]
diff --git a/lib/spack/spack/binary_distribution.py b/lib/spack/spack/binary_distribution.py
index e51d7d4842..c329287de8 100644
--- a/lib/spack/spack/binary_distribution.py
+++ b/lib/spack/spack/binary_distribution.py
@@ -19,6 +19,7 @@ from contextlib import closing
import ruamel.yaml as yaml
from six.moves.urllib.error import HTTPError, URLError
+import llnl.util.filesystem as fsys
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.filesystem import mkdirp
@@ -653,7 +654,7 @@ def get_buildfile_manifest(spec):
for filename in files:
path_name = os.path.join(root, filename)
- m_type, m_subtype = relocate.mime_type(path_name)
+ m_type, m_subtype = fsys.mime_type(path_name)
rel_path_name = os.path.relpath(path_name, spec.prefix)
added = False
diff --git a/lib/spack/spack/fetch_strategy.py b/lib/spack/spack/fetch_strategy.py
index 5ed46c3278..ea85c6a682 100644
--- a/lib/spack/spack/fetch_strategy.py
+++ b/lib/spack/spack/fetch_strategy.py
@@ -54,7 +54,7 @@ import spack.util.pattern as pattern
import spack.util.url as url_util
import spack.util.web as web_util
import spack.version
-from spack.util.compression import decompressor_for, extension
+from spack.util.compression import decompressor_for, extension_from_path
from spack.util.executable import CommandNotFoundError, which
from spack.util.string import comma_and, quote
@@ -613,7 +613,7 @@ class VCSFetchStrategy(FetchStrategy):
@_needs_stage
def archive(self, destination, **kwargs):
- assert extension(destination) == "tar.gz"
+ assert extension_from_path(destination) == "tar.gz"
assert self.stage.source_path.startswith(self.stage.path)
tar = which("tar", required=True)
diff --git a/lib/spack/spack/relocate.py b/lib/spack/spack/relocate.py
index 8212093a12..3ef332c204 100644
--- a/lib/spack/spack/relocate.py
+++ b/lib/spack/spack/relocate.py
@@ -11,6 +11,7 @@ import shutil
import macholib.mach_o
import macholib.MachO
+import llnl.util.filesystem as fs
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.lang import memoized
@@ -887,7 +888,7 @@ def file_is_relocatable(filename, paths_to_relocate=None):
# Remove the RPATHS from the strings in the executable
set_of_strings = set(strings(filename, output=str).split())
- m_type, m_subtype = mime_type(filename)
+ m_type, m_subtype = fs.mime_type(filename)
if m_type == "application":
tty.debug("{0},{1}".format(m_type, m_subtype), level=2)
@@ -923,7 +924,7 @@ def is_binary(filename):
Returns:
True or False
"""
- m_type, _ = mime_type(filename)
+ m_type, _ = fs.mime_type(filename)
msg = "[{0}] -> ".format(filename)
if m_type == "application":
@@ -934,30 +935,6 @@ def is_binary(filename):
return False
-@llnl.util.lang.memoized
-def _get_mime_type():
- file_cmd = executable.which("file")
- for arg in ["-b", "-h", "--mime-type"]:
- file_cmd.add_default_arg(arg)
- return file_cmd
-
-
-@llnl.util.lang.memoized
-def mime_type(filename):
- """Returns the mime type and subtype of a file.
-
- Args:
- filename: file to be analyzed
-
- Returns:
- Tuple containing the MIME type and subtype
- """
- output = _get_mime_type()(filename, output=str, error=str).strip()
- tty.debug("==> " + output, level=2)
- type, _, subtype = output.partition("/")
- return type, subtype
-
-
# Memoize this due to repeated calls to libraries in the same directory.
@llnl.util.lang.memoized
def _exists_dir(dirname):
@@ -975,7 +952,7 @@ def fixup_macos_rpath(root, filename):
True if fixups were applied, else False
"""
abspath = os.path.join(root, filename)
- if mime_type(abspath) != ("application", "x-mach-binary"):
+ if fs.mime_type(abspath) != ("application", "x-mach-binary"):
return False
# Get Mach-O header commands
diff --git a/lib/spack/spack/test/data/compression/Foo.cxx b/lib/spack/spack/test/data/compression/Foo.cxx
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/lib/spack/spack/test/data/compression/Foo.cxx
diff --git a/lib/spack/spack/test/util/compression.py b/lib/spack/spack/test/util/compression.py
index 13d1a44a73..907b1946b8 100644
--- a/lib/spack/spack/test/util/compression.py
+++ b/lib/spack/spack/test/util/compression.py
@@ -22,6 +22,9 @@ ext_archive = {}
for ext in scomp.ALLOWED_ARCHIVE_TYPES
if "TAR" not in ext
]
+# Spack does not use Python native handling for tarballs or zip
+# Don't test tarballs or zip in native test
+native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key]
def support_stub():
@@ -30,10 +33,9 @@ def support_stub():
@pytest.fixture
def compr_support_check(monkeypatch):
- monkeypatch.setattr(scomp, "lzma_support", support_stub)
- monkeypatch.setattr(scomp, "tar_support", support_stub)
- monkeypatch.setattr(scomp, "gzip_support", support_stub)
- monkeypatch.setattr(scomp, "bz2_support", support_stub)
+ monkeypatch.setattr(scomp, "is_lzma_supported", support_stub)
+ monkeypatch.setattr(scomp, "is_gzip_supported", support_stub)
+ monkeypatch.setattr(scomp, "is_bz2_supported", support_stub)
@pytest.fixture
@@ -46,10 +48,9 @@ def archive_file(tmpdir_factory, request):
return os.path.join(str(tmpdir), "Foo.%s" % extension)
-@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True)
+@pytest.mark.parametrize("archive_file", native_archive_list, indirect=True)
def test_native_unpacking(tmpdir_factory, archive_file):
- extension = scomp.extension(archive_file)
- util = scomp.decompressor_for(archive_file, extension)
+ util = scomp.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@@ -63,9 +64,8 @@ def test_native_unpacking(tmpdir_factory, archive_file):
@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True)
def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check):
- extension = scomp.extension(archive_file)
# actually run test
- util = scomp.decompressor_for(archive_file, extension)
+ util = scomp.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("system_comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@@ -78,23 +78,25 @@ def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check):
def test_unallowed_extension():
- bad_ext_archive = "Foo.py"
+ # use a cxx file as python files included for the test
+ # are picked up by the linter and break style checks
+ bad_ext_archive = "Foo.cxx"
with pytest.raises(CommandNotFoundError):
- scomp.decompressor_for(bad_ext_archive, "py")
+ scomp.decompressor_for(bad_ext_archive)
@pytest.mark.parametrize("archive", ext_archive.values())
def test_get_extension(archive):
- ext = scomp.extension(archive)
+ ext = scomp.extension_from_path(archive)
assert ext_archive[ext] == archive
def test_get_bad_extension():
- archive = "Foo.py"
- ext = scomp.extension(archive)
+ archive = "Foo.cxx"
+ ext = scomp.extension_from_path(archive)
assert ext is None
@pytest.mark.parametrize("path", ext_archive.values())
-def test_allowed_archvie(path):
+def test_allowed_archive(path):
assert scomp.allowed_archive(path)
diff --git a/lib/spack/spack/url.py b/lib/spack/spack/url.py
index 00c7d68063..08eef72e93 100644
--- a/lib/spack/spack/url.py
+++ b/lib/spack/spack/url.py
@@ -36,6 +36,7 @@ from llnl.util.tty.color import cescape, colorize
import spack.error
import spack.util.compression as comp
+import spack.util.path as spath
import spack.version
@@ -366,17 +367,15 @@ def split_url_extension(path):
# Strip off sourceforge download suffix.
# e.g. https://sourceforge.net/projects/glew/files/glew/2.0.0/glew-2.0.0.tgz/download
- match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path)
- if match:
- prefix, suffix = match.groups()
+ prefix, suffix = spath.find_sourceforge_suffix(path)
- ext = comp.extension(prefix)
+ ext = comp.extension_from_path(prefix)
if ext is not None:
prefix = comp.strip_extension(prefix)
else:
prefix, suf = strip_query_and_fragment(prefix)
- ext = comp.extension(prefix)
+ ext = comp.extension_from_path(prefix)
prefix = comp.strip_extension(prefix)
suffix = suf + suffix
if ext is None:
diff --git a/lib/spack/spack/util/compression.py b/lib/spack/spack/util/compression.py
index d9c1f5bd18..2411daa6ad 100644
--- a/lib/spack/spack/util/compression.py
+++ b/lib/spack/spack/util/compression.py
@@ -3,61 +3,67 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
+import inspect
+import io
import os
import re
import shutil
import sys
from itertools import product
+from llnl.util import tty
+
+import spack.util.path as spath
from spack.util.executable import CommandNotFoundError, which
# Supported archive extensions.
PRE_EXTS = ["tar", "TAR"]
EXTS = ["gz", "bz2", "xz", "Z"]
-NOTAR_EXTS = ["zip", "tgz", "tbz", "tbz2", "txz"]
+NOTAR_EXTS = ["zip", "tgz", "tbz2", "tbz", "txz"]
# Add PRE_EXTS and EXTS last so that .tar.gz is matched *before* .tar or .gz
ALLOWED_ARCHIVE_TYPES = (
[".".join(ext) for ext in product(PRE_EXTS, EXTS)] + PRE_EXTS + EXTS + NOTAR_EXTS
)
+ALLOWED_SINGLE_EXT_ARCHIVE_TYPES = PRE_EXTS + EXTS + NOTAR_EXTS
+
is_windows = sys.platform == "win32"
+try:
+ import bz2 # noqa
-def bz2_support():
- try:
- import bz2 # noqa: F401
+ _bz2_support = True
+except ImportError:
+ _bz2_support = False
- return True
- except ImportError:
- return False
+try:
+ import gzip # noqa
-def gzip_support():
- try:
- import gzip # noqa: F401
+ _gzip_support = True
+except ImportError:
+ _gzip_support = False
- return True
- except ImportError:
- return False
+try:
+ import lzma # noqa # novermin
-def lzma_support():
- try:
- import lzma # noqa: F401 # novm
+ _lzma_support = True
+except ImportError:
+ _lzma_support = False
- return True
- except ImportError:
- return False
+def is_lzma_supported():
+ return _lzma_support
-def tar_support():
- try:
- import tarfile # noqa: F401
- return True
- except ImportError:
- return False
+def is_gzip_supported():
+ return _gzip_support
+
+
+def is_bz2_supported():
+ return _bz2_support
def allowed_archive(path):
@@ -75,8 +81,7 @@ def _untar(archive_file):
archive_file (str): absolute path to the archive to be extracted.
Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz).
"""
- _, ext = os.path.splitext(archive_file)
- outfile = os.path.basename(archive_file.strip(ext))
+ outfile = os.path.basename(strip_extension(archive_file, "tar"))
tar = which("tar", required=True)
tar.add_default_arg("-oxf")
@@ -91,15 +96,12 @@ def _bunzip2(archive_file):
Args:
archive_file (str): absolute path to the bz2 archive to be decompressed
"""
- _, ext = os.path.splitext(archive_file)
compressed_file_name = os.path.basename(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "bz2"))
working_dir = os.getcwd()
archive_out = os.path.join(working_dir, decompressed_file)
copy_path = os.path.join(working_dir, compressed_file_name)
- if bz2_support():
- import bz2
-
+ if is_bz2_supported():
f_bz = bz2.BZ2File(archive_file, mode="rb")
with open(archive_out, "wb") as ar:
shutil.copyfileobj(f_bz, ar)
@@ -121,13 +123,10 @@ def _gunzip(archive_file):
Args:
archive_file (str): absolute path of the file to be decompressed
"""
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
- if gzip_support():
- import gzip
-
+ if is_gzip_supported():
f_in = gzip.open(archive_file, "rb")
with open(destination_abspath, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
@@ -138,8 +137,7 @@ def _gunzip(archive_file):
def _system_gunzip(archive_file):
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
@@ -159,17 +157,16 @@ def _unzip(archive_file):
Args:
archive_file (str): absolute path of the file to be decompressed
"""
-
- destination_abspath = os.getcwd()
- exe = "unzip"
- arg = "-q"
+ extracted_file = os.path.basename(strip_extension(archive_file, "zip"))
if is_windows:
- exe = "tar"
- arg = "-xf"
- unzip = which(exe, required=True)
- unzip.add_default_arg(arg)
- unzip(archive_file)
- return destination_abspath
+ return _untar(archive_file)
+ else:
+ exe = "unzip"
+ arg = "-q"
+ unzip = which(exe, required=True)
+ unzip.add_default_arg(arg)
+ unzip(archive_file)
+ return extracted_file
def _unZ(archive_file):
@@ -185,11 +182,8 @@ def _lzma_decomp(archive_file):
lzma module, but fall back on command line xz tooling
to find available Python support. This is the xz command
on Unix and 7z on Windows"""
- if lzma_support():
- import lzma # novermin
-
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ if is_lzma_supported():
+ decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar:
with lzma.open(archive_file) as lar:
@@ -201,14 +195,41 @@ def _lzma_decomp(archive_file):
return _xz(archive_file)
+def _win_compressed_tarball_handler(archive_file):
+ """Decompress and extract compressed tarballs on Windows.
+ This method uses 7zip in conjunction with the tar utility
+ to perform decompression and extraction in a two step process
+ first using 7zip to decompress, and tar to extract.
+
+ The motivation for this method is the inability of 7zip
+ to directly decompress and extract compressed archives
+ in a single shot without undocumented workarounds, and
+ the Windows tar utility's lack of access to the xz tool (unsupported on Windows)
+ """
+ # perform intermediate extraction step
+ # record name of new archive so we can extract
+ # and later clean up
+ decomped_tarball = _7zip(archive_file)
+ # 7zip is able to one shot extract compressed archives
+ # that have been named .txz. If that is the case, there will
+ # be no intermediate archvie to extract.
+ if check_extension(decomped_tarball, "tar"):
+ # run tar on newly decomped archive
+ outfile = _untar(decomped_tarball)
+ # clean intermediate archive to mimic end result
+ # produced by one shot decomp/extraction
+ os.remove(decomped_tarball)
+ return outfile
+ return decomped_tarball
+
+
def _xz(archive_file):
"""Decompress lzma compressed .xz files via xz command line
tool. Available only on Unix
"""
if is_windows:
raise RuntimeError("XZ tool unavailable on Windows")
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
@@ -234,84 +255,399 @@ def _7zip(archive_file):
Args:
archive_file (str): absolute path of file to be unarchived
"""
- _, ext = os.path.splitext(archive_file)
- outfile = os.path.basename(archive_file.strip(ext))
+ outfile = os.path.basename(strip_last_extension(archive_file))
_7z = which("7z")
if not _7z:
raise CommandNotFoundError(
"7z unavailable,\
unable to extract %s files. 7z can be installed via Spack"
- % ext
+ % extension_from_path(archive_file)
)
_7z.add_default_arg("e")
_7z(archive_file)
return outfile
-def decompressor_for(path, ext):
+def decompressor_for(path, extension=None):
"""Returns a function pointer to appropriate decompression
algorithm based on extension type.
Args:
path (str): path of the archive file requiring decompression
- ext (str): Extension of archive file
"""
- if not allowed_archive(ext):
+ if not extension:
+ extension = extension_from_file(path, decompress=True)
+
+ if not allowed_archive(extension):
raise CommandNotFoundError(
"Cannot extract archive, \
unrecognized file extension: '%s'"
- % ext
+ % extension
)
- if re.match(r"\.?zip$", ext) or path.endswith(".zip"):
+ if re.match(r"\.?zip$", extension) or path.endswith(".zip"):
return _unzip
- if re.match(r"gz", ext):
+ if re.match(r"gz", extension):
return _gunzip
- if re.match(r"bz2", ext):
+ if re.match(r"bz2", extension):
return _bunzip2
# Python does not have native support
# of any kind for .Z files. In these cases,
# we rely on external tools such as tar,
# 7z, or uncompressZ
- if re.match(r"Z$", ext):
+ if re.match(r"Z$", extension):
return _unZ
# Python and platform may not have support for lzma
# compression. If no lzma support, use tools available on systems
# 7zip on Windows and the xz tool on Unix systems.
- if re.match(r"xz", ext):
+ if re.match(r"xz", extension):
return _lzma_decomp
- if ("xz" in ext or "Z" in ext) and is_windows:
- return _7zip
+ # Catch tar.xz/tar.Z files here for Windows
+ # as the tar utility on Windows cannot handle such
+ # compression types directly
+ if ("xz" in extension or "Z" in extension) and is_windows:
+ return _win_compressed_tarball_handler
return _untar
-def strip_extension(path):
- """Get the part of a path that does not include its compressed
- type extension."""
- for type in ALLOWED_ARCHIVE_TYPES:
- suffix = r"\.%s$" % type
- if re.search(suffix, path):
- return re.sub(suffix, "", path)
- return path
+class FileTypeInterface:
+ """
+ Base interface class for describing and querying file type information.
+ FileType describes information about a single file type
+ such as extension, and byte header properties, and provides an interface
+ to check a given file against said type based on magic number.
+
+ This class should be subclassed each time a new type is to be
+ described.
+
+ Note: This class should not be used directly as it does not define any specific
+ file. Attempts to directly use this class will fail, as it does not define
+ a magic number or extension string.
+
+ Subclasses should each describe a different
+ type of file. In order to do so, they must define
+ the extension string, magic number, and header offset (if non zero).
+ If a class has multiple magic numbers, it will need to
+ override the method describin that file types magic numbers and
+ the method that checks a types magic numbers against a given file's.
+ """
+
+ OFFSET = 0
+ compressed = False
+
+ @staticmethod
+ def name():
+ raise NotImplementedError
+
+ @classmethod
+ def magic_number(cls):
+ """Return a list of all potential magic numbers for a filetype"""
+ return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")]
+
+ @classmethod
+ def header_size(cls):
+ """Return size of largest magic number associated with file type"""
+ return max([len(x) for x in cls.magic_number()])
+
+ @classmethod
+ def _bytes_check(cls, magic_bytes):
+ for magic in cls.magic_number():
+ if magic_bytes.startswith(magic):
+ return True
+ return False
+
+ @classmethod
+ def is_file_of_type(cls, iostream):
+ """Query byte stream for appropriate magic number
+
+ Args:
+ iostream: file byte stream
+
+ Returns:
+ Bool denoting whether file is of class file type
+ based on magic number
+ """
+ if not iostream:
+ return False
+ # move to location of magic bytes
+ iostream.seek(cls.OFFSET)
+ magic_bytes = iostream.read(cls.header_size())
+ # return to beginning of file
+ iostream.seek(0)
+ if cls._bytes_check(magic_bytes):
+ return True
+ return False
+
+class CompressedFileTypeInterface(FileTypeInterface):
+ """Interface class for FileTypes that include compression information"""
+
+ compressed = True
-def extension(path):
- """Get the archive extension for a path."""
+ @staticmethod
+ def decomp_in_memory(stream):
+ """This method decompresses and loads the first 200 or so bytes of a compressed file
+ to check for compressed archives. This does not decompress the entire file and should
+ not be used for direct expansion of archives/compressed files
+ """
+ raise NotImplementedError("Implementation by compression subclass required")
+
+
+class BZipFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER = b"\x42\x5a\x68"
+ extension = "bz2"
+
+ @staticmethod
+ def name():
+ return "bzip2 compressed data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ if is_bz2_supported():
+ # checking for underlying archive, only decomp as many bytes
+ # as is absolutely neccesary for largest archive header (tar)
+ comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size())
+ return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream))
+ return None
+
+
+class ZCompressedFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER_LZW = b"\x1f\x9d"
+ _MAGIC_NUMBER_LZH = b"\x1f\xa0"
+ extension = "Z"
+
+ @staticmethod
+ def name():
+ return "compress'd data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ # python has no method of decompressing `.Z` files in memory
+ return None
+
+
+class GZipFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER = b"\x1f\x8b\x08"
+ extension = "gz"
+
+ @staticmethod
+ def name():
+ return "gzip compressed data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ if is_gzip_supported():
+ # checking for underlying archive, only decomp as many bytes
+ # as is absolutely neccesary for largest archive header (tar)
+ return io.BytesIO(
+ initial_bytes=gzip.GzipFile(fileobj=stream).read(
+ TarFileType.OFFSET + TarFileType.header_size()
+ )
+ )
+ return None
+
+
+class LzmaFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER = b"\xfd7zXZ"
+ extension = "xz"
+
+ @staticmethod
+ def name():
+ return "xz compressed data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ if is_lzma_supported():
+ # checking for underlying archive, only decomp as many bytes
+ # as is absolutely neccesary for largest archive header (tar)
+ max_size = TarFileType.OFFSET + TarFileType.header_size()
+ return io.BytesIO(
+ initial_bytes=lzma.LZMADecompressor().decompress(
+ stream.read(max_size), max_length=max_size
+ )
+ )
+ return None
+
+
+class TarFileType(FileTypeInterface):
+ OFFSET = 257
+ _MAGIC_NUMBER_GNU = b"ustar \0"
+ _MAGIC_NUMBER_POSIX = b"ustar\x0000"
+ extension = "tar"
+
+ @staticmethod
+ def name():
+ return "tar archive"
+
+
+class ZipFleType(FileTypeInterface):
+ _MAGIC_NUMBER = b"PK\003\004"
+ extension = "zip"
+
+ @staticmethod
+ def name():
+ return "Zip archive data"
+
+
+# collection of valid Spack recognized archive and compression
+# file type identifier classes.
+VALID_FILETYPES = [
+ BZipFileType,
+ ZCompressedFileType,
+ GZipFileType,
+ LzmaFileType,
+ TarFileType,
+ ZipFleType,
+]
+
+
+def extension_from_stream(stream, decompress=False):
+ """Return extension represented by stream corresponding to archive file
+ If stream does not represent an archive type recongized by Spack
+ (see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None
+
+ Extension type is derived by searching for identifying bytes
+ in file stream.
+
+ Args:
+ stream : stream representing a file on system
+ decompress (bool) : if True, compressed files are checked
+ for archive types beneath compression i.e. tar.gz
+ default is False, otherwise, return top level type i.e. gz
+
+ Return:
+ A string represting corresponding archive extension
+ or None as relevant.
+
+ """
+ for arc_type in VALID_FILETYPES:
+ if arc_type.is_file_of_type(stream):
+ suffix_ext = arc_type.extension
+ prefix_ext = ""
+ if arc_type.compressed and decompress:
+ # stream represents compressed file
+ # get decompressed stream (if possible)
+ decomp_stream = arc_type.decomp_in_memory(stream)
+ prefix_ext = extension_from_stream(decomp_stream, decompress=decompress)
+ if not prefix_ext:
+ # We were unable to decompress or unable to derive
+ # a nested extension from decompressed file.
+ # Try to use filename parsing to check for
+ # potential nested extensions if there are any
+ tty.debug(
+ "Cannot derive file extension from magic number;"
+ " falling back to regex path parsing."
+ )
+ return extension_from_path(stream.name)
+ resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext])
+ tty.debug("File extension %s successfully derived by magic number." % resultant_ext)
+ return resultant_ext
+ return None
+
+
+def extension_from_file(file, decompress=False):
+ """Return extension from archive file path
+ Extension is derived based on magic number parsing similar
+ to the `file` utility. Attempts to return abbreviated file extensions
+ whenever a file has an abbreviated extension such as `.tgz` or `.txz`.
+ This distinction in abbreivated extension names is accomplished
+ by string parsing.
+
+ Args:
+ file (os.PathLike): path descibing file on system for which ext
+ will be determined.
+ decompress (bool): If True, method will peek into compressed
+ files to check for archive file types. default is False.
+ If false, method will be unable to distinguish `.tar.gz` from `.gz`
+ or similar.
+ Return:
+ Spack recognized archive file extension as determined by file's magic number and
+ file name. If file is not on system or is of an type not recognized by Spack as
+ an archive or compression type, None is returned.
+ """
+ if os.path.exists(file):
+ with open(file, "rb") as f:
+ ext = extension_from_stream(f, decompress)
+ # based on magic number, file is compressed
+ # tar archive. Check to see if file is abbreviated as
+ # t[xz|gz|bz2|bz]
+ if ext and ext.startswith("tar."):
+ suf = ext.split(".")[1]
+ abbr = "t" + suf
+ if check_extension(file, abbr):
+ return abbr
+ if not ext:
+ # If unable to parse extension from stream,
+ # attempt to fall back to string parsing
+ ext = extension_from_path(file)
+ return ext
+ return None
+
+
+def extension_from_path(path):
+ """Get the allowed archive extension for a path.
+ If path does not include a valid archive extension
+ (see`spack.util.compression.ALLOWED_ARCHIVE_TYPES`) return None
+ """
if path is None:
raise ValueError("Can't call extension() on None")
- # Strip sourceforge suffix.
- if re.search(r"((?:sourceforge.net|sf.net)/.*)/download$", path):
- path = os.path.dirname(path)
-
for t in ALLOWED_ARCHIVE_TYPES:
- suffix = r"\.%s$" % t
- if re.search(suffix, path):
+ if check_extension(path, t):
return t
return None
+
+
+def strip_last_extension(path):
+ """Strips last supported archive extension from path"""
+ if path:
+ for ext in ALLOWED_SINGLE_EXT_ARCHIVE_TYPES:
+ mod_path = check_and_remove_ext(path, ext)
+ if mod_path != path:
+ return mod_path
+ return path
+
+
+def strip_extension(path, ext=None):
+ """Get the part of a path that does not include its compressed
+ type extension."""
+ if ext:
+ return check_and_remove_ext(path, ext)
+ for t in ALLOWED_ARCHIVE_TYPES:
+ mod_path = check_and_remove_ext(path, t)
+ if mod_path != path:
+ return mod_path
+ return path
+
+
+def check_extension(path, ext):
+ """Check if extension is present in path"""
+ # Strip sourceforge suffix.
+ prefix, _ = spath.find_sourceforge_suffix(path)
+ if not ext.startswith(r"\."):
+ ext = r"\.%s$" % ext
+ if re.search(ext, prefix):
+ return True
+ return False
+
+
+def reg_remove_ext(path, ext):
+ """Regex remove ext from path"""
+ if path and ext:
+ suffix = r"\.%s$" % ext
+ return re.sub(suffix, "", path)
+ return path
+
+
+def check_and_remove_ext(path, ext):
+ """If given extension is present in path, remove and return,
+ otherwise just return path"""
+ if check_extension(path, ext):
+ return reg_remove_ext(path, ext)
+ return path
diff --git a/lib/spack/spack/util/path.py b/lib/spack/spack/util/path.py
index 981a6b672d..fe45541321 100644
--- a/lib/spack/spack/util/path.py
+++ b/lib/spack/spack/util/path.py
@@ -71,6 +71,15 @@ def win_exe_ext():
return ".exe"
+def find_sourceforge_suffix(path):
+ """find and match sourceforge filepath components
+ Return match object"""
+ match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path)
+ if match:
+ return match.groups()
+ return path, ""
+
+
def path_to_os_path(*pths):
"""
Takes an arbitrary number of positional parameters