summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/spack/llnl/util/filesystem.py65
-rw-r--r--lib/spack/spack/binary_distribution.py3
-rw-r--r--lib/spack/spack/fetch_strategy.py4
-rw-r--r--lib/spack/spack/relocate.py31
-rw-r--r--lib/spack/spack/test/data/compression/Foo.cxx0
-rw-r--r--lib/spack/spack/test/util/compression.py32
-rw-r--r--lib/spack/spack/url.py9
-rw-r--r--lib/spack/spack/util/compression.py508
-rw-r--r--lib/spack/spack/util/path.py9
9 files changed, 524 insertions, 137 deletions
diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py
index a5da826217..ad91e7c876 100644
--- a/lib/spack/llnl/util/filesystem.py
+++ b/lib/spack/llnl/util/filesystem.py
@@ -24,7 +24,7 @@ from llnl.util.compat import Sequence
from llnl.util.lang import dedupe, memoized
from llnl.util.symlink import islink, symlink
-from spack.util.executable import Executable
+from spack.util.executable import CommandNotFoundError, Executable, which
from spack.util.path import path_to_os_path, system_path_filter
is_windows = _platform == "win32"
@@ -113,6 +113,69 @@ def path_contains_subdirectory(path, root):
return norm_path.startswith(norm_root)
+@memoized
+def file_command(*args):
+ """Creates entry point to `file` system command with provided arguments"""
+ try:
+ file_cmd = which("file", required=True)
+ except CommandNotFoundError as e:
+ if is_windows:
+ raise CommandNotFoundError("`file` utility is not available on Windows")
+ else:
+ raise e
+ for arg in args:
+ file_cmd.add_default_arg(arg)
+ return file_cmd
+
+
+@memoized
+def _get_mime_type():
+ """Generate method to call `file` system command to aquire mime type
+ for a specified path
+ """
+ return file_command("-b", "-h", "--mime-type")
+
+
+@memoized
+def _get_mime_type_compressed():
+ """Same as _get_mime_type but attempts to check for
+ compression first
+ """
+ mime_uncompressed = _get_mime_type()
+ mime_uncompressed.add_default_arg("-Z")
+ return mime_uncompressed
+
+
+def mime_type(filename):
+ """Returns the mime type and subtype of a file.
+
+ Args:
+ filename: file to be analyzed
+
+ Returns:
+ Tuple containing the MIME type and subtype
+ """
+ output = _get_mime_type()(filename, output=str, error=str).strip()
+ tty.debug("==> " + output)
+ type, _, subtype = output.partition("/")
+ return type, subtype
+
+
+def compressed_mime_type(filename):
+ """Same as mime_type but checks for type that has been compressed
+
+ Args:
+ filename (str): file to be analyzed
+
+ Returns:
+ Tuple containing the MIME type and subtype
+ """
+ output = _get_mime_type_compressed()(filename, output=str, error=str).strip()
+ tty.debug("==> " + output)
+ type, _, subtype = output.partition("/")
+ return type, subtype
+
+
#: This generates the library filenames that may appear on any OS.
library_extensions = ["a", "la", "so", "tbd", "dylib"]
diff --git a/lib/spack/spack/binary_distribution.py b/lib/spack/spack/binary_distribution.py
index e51d7d4842..c329287de8 100644
--- a/lib/spack/spack/binary_distribution.py
+++ b/lib/spack/spack/binary_distribution.py
@@ -19,6 +19,7 @@ from contextlib import closing
import ruamel.yaml as yaml
from six.moves.urllib.error import HTTPError, URLError
+import llnl.util.filesystem as fsys
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.filesystem import mkdirp
@@ -653,7 +654,7 @@ def get_buildfile_manifest(spec):
for filename in files:
path_name = os.path.join(root, filename)
- m_type, m_subtype = relocate.mime_type(path_name)
+ m_type, m_subtype = fsys.mime_type(path_name)
rel_path_name = os.path.relpath(path_name, spec.prefix)
added = False
diff --git a/lib/spack/spack/fetch_strategy.py b/lib/spack/spack/fetch_strategy.py
index 5ed46c3278..ea85c6a682 100644
--- a/lib/spack/spack/fetch_strategy.py
+++ b/lib/spack/spack/fetch_strategy.py
@@ -54,7 +54,7 @@ import spack.util.pattern as pattern
import spack.util.url as url_util
import spack.util.web as web_util
import spack.version
-from spack.util.compression import decompressor_for, extension
+from spack.util.compression import decompressor_for, extension_from_path
from spack.util.executable import CommandNotFoundError, which
from spack.util.string import comma_and, quote
@@ -613,7 +613,7 @@ class VCSFetchStrategy(FetchStrategy):
@_needs_stage
def archive(self, destination, **kwargs):
- assert extension(destination) == "tar.gz"
+ assert extension_from_path(destination) == "tar.gz"
assert self.stage.source_path.startswith(self.stage.path)
tar = which("tar", required=True)
diff --git a/lib/spack/spack/relocate.py b/lib/spack/spack/relocate.py
index 8212093a12..3ef332c204 100644
--- a/lib/spack/spack/relocate.py
+++ b/lib/spack/spack/relocate.py
@@ -11,6 +11,7 @@ import shutil
import macholib.mach_o
import macholib.MachO
+import llnl.util.filesystem as fs
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.lang import memoized
@@ -887,7 +888,7 @@ def file_is_relocatable(filename, paths_to_relocate=None):
# Remove the RPATHS from the strings in the executable
set_of_strings = set(strings(filename, output=str).split())
- m_type, m_subtype = mime_type(filename)
+ m_type, m_subtype = fs.mime_type(filename)
if m_type == "application":
tty.debug("{0},{1}".format(m_type, m_subtype), level=2)
@@ -923,7 +924,7 @@ def is_binary(filename):
Returns:
True or False
"""
- m_type, _ = mime_type(filename)
+ m_type, _ = fs.mime_type(filename)
msg = "[{0}] -> ".format(filename)
if m_type == "application":
@@ -934,30 +935,6 @@ def is_binary(filename):
return False
-@llnl.util.lang.memoized
-def _get_mime_type():
- file_cmd = executable.which("file")
- for arg in ["-b", "-h", "--mime-type"]:
- file_cmd.add_default_arg(arg)
- return file_cmd
-
-
-@llnl.util.lang.memoized
-def mime_type(filename):
- """Returns the mime type and subtype of a file.
-
- Args:
- filename: file to be analyzed
-
- Returns:
- Tuple containing the MIME type and subtype
- """
- output = _get_mime_type()(filename, output=str, error=str).strip()
- tty.debug("==> " + output, level=2)
- type, _, subtype = output.partition("/")
- return type, subtype
-
-
# Memoize this due to repeated calls to libraries in the same directory.
@llnl.util.lang.memoized
def _exists_dir(dirname):
@@ -975,7 +952,7 @@ def fixup_macos_rpath(root, filename):
True if fixups were applied, else False
"""
abspath = os.path.join(root, filename)
- if mime_type(abspath) != ("application", "x-mach-binary"):
+ if fs.mime_type(abspath) != ("application", "x-mach-binary"):
return False
# Get Mach-O header commands
diff --git a/lib/spack/spack/test/data/compression/Foo.cxx b/lib/spack/spack/test/data/compression/Foo.cxx
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/lib/spack/spack/test/data/compression/Foo.cxx
diff --git a/lib/spack/spack/test/util/compression.py b/lib/spack/spack/test/util/compression.py
index 13d1a44a73..907b1946b8 100644
--- a/lib/spack/spack/test/util/compression.py
+++ b/lib/spack/spack/test/util/compression.py
@@ -22,6 +22,9 @@ ext_archive = {}
for ext in scomp.ALLOWED_ARCHIVE_TYPES
if "TAR" not in ext
]
+# Spack does not use Python native handling for tarballs or zip
+# Don't test tarballs or zip in native test
+native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key]
def support_stub():
@@ -30,10 +33,9 @@ def support_stub():
@pytest.fixture
def compr_support_check(monkeypatch):
- monkeypatch.setattr(scomp, "lzma_support", support_stub)
- monkeypatch.setattr(scomp, "tar_support", support_stub)
- monkeypatch.setattr(scomp, "gzip_support", support_stub)
- monkeypatch.setattr(scomp, "bz2_support", support_stub)
+ monkeypatch.setattr(scomp, "is_lzma_supported", support_stub)
+ monkeypatch.setattr(scomp, "is_gzip_supported", support_stub)
+ monkeypatch.setattr(scomp, "is_bz2_supported", support_stub)
@pytest.fixture
@@ -46,10 +48,9 @@ def archive_file(tmpdir_factory, request):
return os.path.join(str(tmpdir), "Foo.%s" % extension)
-@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True)
+@pytest.mark.parametrize("archive_file", native_archive_list, indirect=True)
def test_native_unpacking(tmpdir_factory, archive_file):
- extension = scomp.extension(archive_file)
- util = scomp.decompressor_for(archive_file, extension)
+ util = scomp.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@@ -63,9 +64,8 @@ def test_native_unpacking(tmpdir_factory, archive_file):
@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True)
def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check):
- extension = scomp.extension(archive_file)
# actually run test
- util = scomp.decompressor_for(archive_file, extension)
+ util = scomp.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("system_comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@@ -78,23 +78,25 @@ def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check):
def test_unallowed_extension():
- bad_ext_archive = "Foo.py"
+ # use a cxx file as python files included for the test
+ # are picked up by the linter and break style checks
+ bad_ext_archive = "Foo.cxx"
with pytest.raises(CommandNotFoundError):
- scomp.decompressor_for(bad_ext_archive, "py")
+ scomp.decompressor_for(bad_ext_archive)
@pytest.mark.parametrize("archive", ext_archive.values())
def test_get_extension(archive):
- ext = scomp.extension(archive)
+ ext = scomp.extension_from_path(archive)
assert ext_archive[ext] == archive
def test_get_bad_extension():
- archive = "Foo.py"
- ext = scomp.extension(archive)
+ archive = "Foo.cxx"
+ ext = scomp.extension_from_path(archive)
assert ext is None
@pytest.mark.parametrize("path", ext_archive.values())
-def test_allowed_archvie(path):
+def test_allowed_archive(path):
assert scomp.allowed_archive(path)
diff --git a/lib/spack/spack/url.py b/lib/spack/spack/url.py
index 00c7d68063..08eef72e93 100644
--- a/lib/spack/spack/url.py
+++ b/lib/spack/spack/url.py
@@ -36,6 +36,7 @@ from llnl.util.tty.color import cescape, colorize
import spack.error
import spack.util.compression as comp
+import spack.util.path as spath
import spack.version
@@ -366,17 +367,15 @@ def split_url_extension(path):
# Strip off sourceforge download suffix.
# e.g. https://sourceforge.net/projects/glew/files/glew/2.0.0/glew-2.0.0.tgz/download
- match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path)
- if match:
- prefix, suffix = match.groups()
+ prefix, suffix = spath.find_sourceforge_suffix(path)
- ext = comp.extension(prefix)
+ ext = comp.extension_from_path(prefix)
if ext is not None:
prefix = comp.strip_extension(prefix)
else:
prefix, suf = strip_query_and_fragment(prefix)
- ext = comp.extension(prefix)
+ ext = comp.extension_from_path(prefix)
prefix = comp.strip_extension(prefix)
suffix = suf + suffix
if ext is None:
diff --git a/lib/spack/spack/util/compression.py b/lib/spack/spack/util/compression.py
index d9c1f5bd18..2411daa6ad 100644
--- a/lib/spack/spack/util/compression.py
+++ b/lib/spack/spack/util/compression.py
@@ -3,61 +3,67 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
+import inspect
+import io
import os
import re
import shutil
import sys
from itertools import product
+from llnl.util import tty
+
+import spack.util.path as spath
from spack.util.executable import CommandNotFoundError, which
# Supported archive extensions.
PRE_EXTS = ["tar", "TAR"]
EXTS = ["gz", "bz2", "xz", "Z"]
-NOTAR_EXTS = ["zip", "tgz", "tbz", "tbz2", "txz"]
+NOTAR_EXTS = ["zip", "tgz", "tbz2", "tbz", "txz"]
# Add PRE_EXTS and EXTS last so that .tar.gz is matched *before* .tar or .gz
ALLOWED_ARCHIVE_TYPES = (
[".".join(ext) for ext in product(PRE_EXTS, EXTS)] + PRE_EXTS + EXTS + NOTAR_EXTS
)
+ALLOWED_SINGLE_EXT_ARCHIVE_TYPES = PRE_EXTS + EXTS + NOTAR_EXTS
+
is_windows = sys.platform == "win32"
+try:
+ import bz2 # noqa
-def bz2_support():
- try:
- import bz2 # noqa: F401
+ _bz2_support = True
+except ImportError:
+ _bz2_support = False
- return True
- except ImportError:
- return False
+try:
+ import gzip # noqa
-def gzip_support():
- try:
- import gzip # noqa: F401
+ _gzip_support = True
+except ImportError:
+ _gzip_support = False
- return True
- except ImportError:
- return False
+try:
+ import lzma # noqa # novermin
-def lzma_support():
- try:
- import lzma # noqa: F401 # novm
+ _lzma_support = True
+except ImportError:
+ _lzma_support = False
- return True
- except ImportError:
- return False
+def is_lzma_supported():
+ return _lzma_support
-def tar_support():
- try:
- import tarfile # noqa: F401
- return True
- except ImportError:
- return False
+def is_gzip_supported():
+ return _gzip_support
+
+
+def is_bz2_supported():
+ return _bz2_support
def allowed_archive(path):
@@ -75,8 +81,7 @@ def _untar(archive_file):
archive_file (str): absolute path to the archive to be extracted.
Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz).
"""
- _, ext = os.path.splitext(archive_file)
- outfile = os.path.basename(archive_file.strip(ext))
+ outfile = os.path.basename(strip_extension(archive_file, "tar"))
tar = which("tar", required=True)
tar.add_default_arg("-oxf")
@@ -91,15 +96,12 @@ def _bunzip2(archive_file):
Args:
archive_file (str): absolute path to the bz2 archive to be decompressed
"""
- _, ext = os.path.splitext(archive_file)
compressed_file_name = os.path.basename(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "bz2"))
working_dir = os.getcwd()
archive_out = os.path.join(working_dir, decompressed_file)
copy_path = os.path.join(working_dir, compressed_file_name)
- if bz2_support():
- import bz2
-
+ if is_bz2_supported():
f_bz = bz2.BZ2File(archive_file, mode="rb")
with open(archive_out, "wb") as ar:
shutil.copyfileobj(f_bz, ar)
@@ -121,13 +123,10 @@ def _gunzip(archive_file):
Args:
archive_file (str): absolute path of the file to be decompressed
"""
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
- if gzip_support():
- import gzip
-
+ if is_gzip_supported():
f_in = gzip.open(archive_file, "rb")
with open(destination_abspath, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
@@ -138,8 +137,7 @@ def _gunzip(archive_file):
def _system_gunzip(archive_file):
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
@@ -159,17 +157,16 @@ def _unzip(archive_file):
Args:
archive_file (str): absolute path of the file to be decompressed
"""
-
- destination_abspath = os.getcwd()
- exe = "unzip"
- arg = "-q"
+ extracted_file = os.path.basename(strip_extension(archive_file, "zip"))
if is_windows:
- exe = "tar"
- arg = "-xf"
- unzip = which(exe, required=True)
- unzip.add_default_arg(arg)
- unzip(archive_file)
- return destination_abspath
+ return _untar(archive_file)
+ else:
+ exe = "unzip"
+ arg = "-q"
+ unzip = which(exe, required=True)
+ unzip.add_default_arg(arg)
+ unzip(archive_file)
+ return extracted_file
def _unZ(archive_file):
@@ -185,11 +182,8 @@ def _lzma_decomp(archive_file):
lzma module, but fall back on command line xz tooling
to find available Python support. This is the xz command
on Unix and 7z on Windows"""
- if lzma_support():
- import lzma # novermin
-
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ if is_lzma_supported():
+ decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar:
with lzma.open(archive_file) as lar:
@@ -201,14 +195,41 @@ def _lzma_decomp(archive_file):
return _xz(archive_file)
+def _win_compressed_tarball_handler(archive_file):
+ """Decompress and extract compressed tarballs on Windows.
+ This method uses 7zip in conjunction with the tar utility
+ to perform decompression and extraction in a two step process
+ first using 7zip to decompress, and tar to extract.
+
+ The motivation for this method is the inability of 7zip
+ to directly decompress and extract compressed archives
+ in a single shot without undocumented workarounds, and
+ the Windows tar utility's lack of access to the xz tool (unsupported on Windows)
+ """
+ # perform intermediate extraction step
+ # record name of new archive so we can extract
+ # and later clean up
+ decomped_tarball = _7zip(archive_file)
+ # 7zip is able to one shot extract compressed archives
+ # that have been named .txz. If that is the case, there will
+ # be no intermediate archvie to extract.
+ if check_extension(decomped_tarball, "tar"):
+ # run tar on newly decomped archive
+ outfile = _untar(decomped_tarball)
+ # clean intermediate archive to mimic end result
+ # produced by one shot decomp/extraction
+ os.remove(decomped_tarball)
+ return outfile
+ return decomped_tarball
+
+
def _xz(archive_file):
"""Decompress lzma compressed .xz files via xz command line
tool. Available only on Unix
"""
if is_windows:
raise RuntimeError("XZ tool unavailable on Windows")
- _, ext = os.path.splitext(archive_file)
- decompressed_file = os.path.basename(archive_file.strip(ext))
+ decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
@@ -234,84 +255,399 @@ def _7zip(archive_file):
Args:
archive_file (str): absolute path of file to be unarchived
"""
- _, ext = os.path.splitext(archive_file)
- outfile = os.path.basename(archive_file.strip(ext))
+ outfile = os.path.basename(strip_last_extension(archive_file))
_7z = which("7z")
if not _7z:
raise CommandNotFoundError(
"7z unavailable,\
unable to extract %s files. 7z can be installed via Spack"
- % ext
+ % extension_from_path(archive_file)
)
_7z.add_default_arg("e")
_7z(archive_file)
return outfile
-def decompressor_for(path, ext):
+def decompressor_for(path, extension=None):
"""Returns a function pointer to appropriate decompression
algorithm based on extension type.
Args:
path (str): path of the archive file requiring decompression
- ext (str): Extension of archive file
"""
- if not allowed_archive(ext):
+ if not extension:
+ extension = extension_from_file(path, decompress=True)
+
+ if not allowed_archive(extension):
raise CommandNotFoundError(
"Cannot extract archive, \
unrecognized file extension: '%s'"
- % ext
+ % extension
)
- if re.match(r"\.?zip$", ext) or path.endswith(".zip"):
+ if re.match(r"\.?zip$", extension) or path.endswith(".zip"):
return _unzip
- if re.match(r"gz", ext):
+ if re.match(r"gz", extension):
return _gunzip
- if re.match(r"bz2", ext):
+ if re.match(r"bz2", extension):
return _bunzip2
# Python does not have native support
# of any kind for .Z files. In these cases,
# we rely on external tools such as tar,
# 7z, or uncompressZ
- if re.match(r"Z$", ext):
+ if re.match(r"Z$", extension):
return _unZ
# Python and platform may not have support for lzma
# compression. If no lzma support, use tools available on systems
# 7zip on Windows and the xz tool on Unix systems.
- if re.match(r"xz", ext):
+ if re.match(r"xz", extension):
return _lzma_decomp
- if ("xz" in ext or "Z" in ext) and is_windows:
- return _7zip
+ # Catch tar.xz/tar.Z files here for Windows
+ # as the tar utility on Windows cannot handle such
+ # compression types directly
+ if ("xz" in extension or "Z" in extension) and is_windows:
+ return _win_compressed_tarball_handler
return _untar
-def strip_extension(path):
- """Get the part of a path that does not include its compressed
- type extension."""
- for type in ALLOWED_ARCHIVE_TYPES:
- suffix = r"\.%s$" % type
- if re.search(suffix, path):
- return re.sub(suffix, "", path)
- return path
+class FileTypeInterface:
+ """
+ Base interface class for describing and querying file type information.
+ FileType describes information about a single file type
+ such as extension, and byte header properties, and provides an interface
+ to check a given file against said type based on magic number.
+
+ This class should be subclassed each time a new type is to be
+ described.
+
+ Note: This class should not be used directly as it does not define any specific
+ file. Attempts to directly use this class will fail, as it does not define
+ a magic number or extension string.
+
+ Subclasses should each describe a different
+ type of file. In order to do so, they must define
+ the extension string, magic number, and header offset (if non zero).
+ If a class has multiple magic numbers, it will need to
+ override the method describin that file types magic numbers and
+ the method that checks a types magic numbers against a given file's.
+ """
+
+ OFFSET = 0
+ compressed = False
+
+ @staticmethod
+ def name():
+ raise NotImplementedError
+
+ @classmethod
+ def magic_number(cls):
+ """Return a list of all potential magic numbers for a filetype"""
+ return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")]
+
+ @classmethod
+ def header_size(cls):
+ """Return size of largest magic number associated with file type"""
+ return max([len(x) for x in cls.magic_number()])
+
+ @classmethod
+ def _bytes_check(cls, magic_bytes):
+ for magic in cls.magic_number():
+ if magic_bytes.startswith(magic):
+ return True
+ return False
+
+ @classmethod
+ def is_file_of_type(cls, iostream):
+ """Query byte stream for appropriate magic number
+
+ Args:
+ iostream: file byte stream
+
+ Returns:
+ Bool denoting whether file is of class file type
+ based on magic number
+ """
+ if not iostream:
+ return False
+ # move to location of magic bytes
+ iostream.seek(cls.OFFSET)
+ magic_bytes = iostream.read(cls.header_size())
+ # return to beginning of file
+ iostream.seek(0)
+ if cls._bytes_check(magic_bytes):
+ return True
+ return False
+
+class CompressedFileTypeInterface(FileTypeInterface):
+ """Interface class for FileTypes that include compression information"""
+
+ compressed = True
-def extension(path):
- """Get the archive extension for a path."""
+ @staticmethod
+ def decomp_in_memory(stream):
+ """This method decompresses and loads the first 200 or so bytes of a compressed file
+ to check for compressed archives. This does not decompress the entire file and should
+ not be used for direct expansion of archives/compressed files
+ """
+ raise NotImplementedError("Implementation by compression subclass required")
+
+
+class BZipFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER = b"\x42\x5a\x68"
+ extension = "bz2"
+
+ @staticmethod
+ def name():
+ return "bzip2 compressed data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ if is_bz2_supported():
+ # checking for underlying archive, only decomp as many bytes
+ # as is absolutely neccesary for largest archive header (tar)
+ comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size())
+ return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream))
+ return None
+
+
+class ZCompressedFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER_LZW = b"\x1f\x9d"
+ _MAGIC_NUMBER_LZH = b"\x1f\xa0"
+ extension = "Z"
+
+ @staticmethod
+ def name():
+ return "compress'd data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ # python has no method of decompressing `.Z` files in memory
+ return None
+
+
+class GZipFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER = b"\x1f\x8b\x08"
+ extension = "gz"
+
+ @staticmethod
+ def name():
+ return "gzip compressed data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ if is_gzip_supported():
+ # checking for underlying archive, only decomp as many bytes
+ # as is absolutely neccesary for largest archive header (tar)
+ return io.BytesIO(
+ initial_bytes=gzip.GzipFile(fileobj=stream).read(
+ TarFileType.OFFSET + TarFileType.header_size()
+ )
+ )
+ return None
+
+
+class LzmaFileType(CompressedFileTypeInterface):
+ _MAGIC_NUMBER = b"\xfd7zXZ"
+ extension = "xz"
+
+ @staticmethod
+ def name():
+ return "xz compressed data"
+
+ @staticmethod
+ def decomp_in_memory(stream):
+ if is_lzma_supported():
+ # checking for underlying archive, only decomp as many bytes
+ # as is absolutely neccesary for largest archive header (tar)
+ max_size = TarFileType.OFFSET + TarFileType.header_size()
+ return io.BytesIO(
+ initial_bytes=lzma.LZMADecompressor().decompress(
+ stream.read(max_size), max_length=max_size
+ )
+ )
+ return None
+
+
+class TarFileType(FileTypeInterface):
+ OFFSET = 257
+ _MAGIC_NUMBER_GNU = b"ustar \0"
+ _MAGIC_NUMBER_POSIX = b"ustar\x0000"
+ extension = "tar"
+
+ @staticmethod
+ def name():
+ return "tar archive"
+
+
+class ZipFleType(FileTypeInterface):
+ _MAGIC_NUMBER = b"PK\003\004"
+ extension = "zip"
+
+ @staticmethod
+ def name():
+ return "Zip archive data"
+
+
+# collection of valid Spack recognized archive and compression
+# file type identifier classes.
+VALID_FILETYPES = [
+ BZipFileType,
+ ZCompressedFileType,
+ GZipFileType,
+ LzmaFileType,
+ TarFileType,
+ ZipFleType,
+]
+
+
+def extension_from_stream(stream, decompress=False):
+ """Return extension represented by stream corresponding to archive file
+ If stream does not represent an archive type recongized by Spack
+ (see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None
+
+ Extension type is derived by searching for identifying bytes
+ in file stream.
+
+ Args:
+ stream : stream representing a file on system
+ decompress (bool) : if True, compressed files are checked
+ for archive types beneath compression i.e. tar.gz
+ default is False, otherwise, return top level type i.e. gz
+
+ Return:
+ A string represting corresponding archive extension
+ or None as relevant.
+
+ """
+ for arc_type in VALID_FILETYPES:
+ if arc_type.is_file_of_type(stream):
+ suffix_ext = arc_type.extension
+ prefix_ext = ""
+ if arc_type.compressed and decompress:
+ # stream represents compressed file
+ # get decompressed stream (if possible)
+ decomp_stream = arc_type.decomp_in_memory(stream)
+ prefix_ext = extension_from_stream(decomp_stream, decompress=decompress)
+ if not prefix_ext:
+ # We were unable to decompress or unable to derive
+ # a nested extension from decompressed file.
+ # Try to use filename parsing to check for
+ # potential nested extensions if there are any
+ tty.debug(
+ "Cannot derive file extension from magic number;"
+ " falling back to regex path parsing."
+ )
+ return extension_from_path(stream.name)
+ resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext])
+ tty.debug("File extension %s successfully derived by magic number." % resultant_ext)
+ return resultant_ext
+ return None
+
+
+def extension_from_file(file, decompress=False):
+ """Return extension from archive file path
+ Extension is derived based on magic number parsing similar
+ to the `file` utility. Attempts to return abbreviated file extensions
+ whenever a file has an abbreviated extension such as `.tgz` or `.txz`.
+ This distinction in abbreivated extension names is accomplished
+ by string parsing.
+
+ Args:
+ file (os.PathLike): path descibing file on system for which ext
+ will be determined.
+ decompress (bool): If True, method will peek into compressed
+ files to check for archive file types. default is False.
+ If false, method will be unable to distinguish `.tar.gz` from `.gz`
+ or similar.
+ Return:
+ Spack recognized archive file extension as determined by file's magic number and
+ file name. If file is not on system or is of an type not recognized by Spack as
+ an archive or compression type, None is returned.
+ """
+ if os.path.exists(file):
+ with open(file, "rb") as f:
+ ext = extension_from_stream(f, decompress)
+ # based on magic number, file is compressed
+ # tar archive. Check to see if file is abbreviated as
+ # t[xz|gz|bz2|bz]
+ if ext and ext.startswith("tar."):
+ suf = ext.split(".")[1]
+ abbr = "t" + suf
+ if check_extension(file, abbr):
+ return abbr
+ if not ext:
+ # If unable to parse extension from stream,
+ # attempt to fall back to string parsing
+ ext = extension_from_path(file)
+ return ext
+ return None
+
+
+def extension_from_path(path):
+ """Get the allowed archive extension for a path.
+ If path does not include a valid archive extension
+ (see`spack.util.compression.ALLOWED_ARCHIVE_TYPES`) return None
+ """
if path is None:
raise ValueError("Can't call extension() on None")
- # Strip sourceforge suffix.
- if re.search(r"((?:sourceforge.net|sf.net)/.*)/download$", path):
- path = os.path.dirname(path)
-
for t in ALLOWED_ARCHIVE_TYPES:
- suffix = r"\.%s$" % t
- if re.search(suffix, path):
+ if check_extension(path, t):
return t
return None
+
+
+def strip_last_extension(path):
+ """Strips last supported archive extension from path"""
+ if path:
+ for ext in ALLOWED_SINGLE_EXT_ARCHIVE_TYPES:
+ mod_path = check_and_remove_ext(path, ext)
+ if mod_path != path:
+ return mod_path
+ return path
+
+
+def strip_extension(path, ext=None):
+ """Get the part of a path that does not include its compressed
+ type extension."""
+ if ext:
+ return check_and_remove_ext(path, ext)
+ for t in ALLOWED_ARCHIVE_TYPES:
+ mod_path = check_and_remove_ext(path, t)
+ if mod_path != path:
+ return mod_path
+ return path
+
+
+def check_extension(path, ext):
+ """Check if extension is present in path"""
+ # Strip sourceforge suffix.
+ prefix, _ = spath.find_sourceforge_suffix(path)
+ if not ext.startswith(r"\."):
+ ext = r"\.%s$" % ext
+ if re.search(ext, prefix):
+ return True
+ return False
+
+
+def reg_remove_ext(path, ext):
+ """Regex remove ext from path"""
+ if path and ext:
+ suffix = r"\.%s$" % ext
+ return re.sub(suffix, "", path)
+ return path
+
+
+def check_and_remove_ext(path, ext):
+ """If given extension is present in path, remove and return,
+ otherwise just return path"""
+ if check_extension(path, ext):
+ return reg_remove_ext(path, ext)
+ return path
diff --git a/lib/spack/spack/util/path.py b/lib/spack/spack/util/path.py
index 981a6b672d..fe45541321 100644
--- a/lib/spack/spack/util/path.py
+++ b/lib/spack/spack/util/path.py
@@ -71,6 +71,15 @@ def win_exe_ext():
return ".exe"
+def find_sourceforge_suffix(path):
+ """find and match sourceforge filepath components
+ Return match object"""
+ match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path)
+ if match:
+ return match.groups()
+ return path, ""
+
+
def path_to_os_path(*pths):
"""
Takes an arbitrary number of positional parameters