summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/spack/docs/conf.py1
-rw-r--r--lib/spack/spack/cmd/logs.py43
-rw-r--r--lib/spack/spack/test/util/compression.py41
-rw-r--r--lib/spack/spack/util/compression.py589
4 files changed, 289 insertions, 385 deletions
diff --git a/lib/spack/docs/conf.py b/lib/spack/docs/conf.py
index 8f52edb89c..e0542640a2 100644
--- a/lib/spack/docs/conf.py
+++ b/lib/spack/docs/conf.py
@@ -199,6 +199,7 @@ nitpick_ignore = [
("py:class", "contextlib.contextmanager"),
("py:class", "module"),
("py:class", "_io.BufferedReader"),
+ ("py:class", "_io.BytesIO"),
("py:class", "unittest.case.TestCase"),
("py:class", "_frozen_importlib_external.SourceFileLoader"),
("py:class", "clingo.Control"),
diff --git a/lib/spack/spack/cmd/logs.py b/lib/spack/spack/cmd/logs.py
index a9ec4dad61..9d5da880ed 100644
--- a/lib/spack/spack/cmd/logs.py
+++ b/lib/spack/spack/cmd/logs.py
@@ -5,11 +5,13 @@
import errno
import gzip
+import io
import os
import shutil
import sys
import spack.cmd
+import spack.spec
import spack.util.compression as compression
from spack.cmd.common import arguments
from spack.main import SpackCommandError
@@ -23,45 +25,36 @@ def setup_parser(subparser):
arguments.add_common_arguments(subparser, ["spec"])
-def _dump_byte_stream_to_stdout(instream):
+def _dump_byte_stream_to_stdout(instream: io.BufferedIOBase) -> None:
+ # Reopen stdout in binary mode so we don't have to worry about encoding
outstream = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
-
shutil.copyfileobj(instream, outstream)
-def dump_build_log(package):
- with open(package.log_path, "rb") as f:
- _dump_byte_stream_to_stdout(f)
-
-
-def _logs(cmdline_spec, concrete_spec):
+def _logs(cmdline_spec: spack.spec.Spec, concrete_spec: spack.spec.Spec):
if concrete_spec.installed:
log_path = concrete_spec.package.install_log_path
elif os.path.exists(concrete_spec.package.stage.path):
- dump_build_log(concrete_spec.package)
- return
+ # TODO: `spack logs` can currently not show the logs while a package is being built, as the
+ # combined log file is only written after the build is finished.
+ log_path = concrete_spec.package.log_path
else:
raise SpackCommandError(f"{cmdline_spec} is not installed or staged")
try:
- compression_ext = compression.extension_from_file(log_path)
- with open(log_path, "rb") as fstream:
- if compression_ext == "gz":
- # If the log file is compressed, wrap it with a decompressor
- fstream = gzip.open(log_path, "rb")
- elif compression_ext:
- raise SpackCommandError(
- f"Unsupported storage format for {log_path}: {compression_ext}"
- )
-
- _dump_byte_stream_to_stdout(fstream)
+ stream = open(log_path, "rb")
except OSError as e:
if e.errno == errno.ENOENT:
raise SpackCommandError(f"No logs are available for {cmdline_spec}") from e
- elif e.errno == errno.EPERM:
- raise SpackCommandError(f"Permission error accessing {log_path}") from e
- else:
- raise
+ raise SpackCommandError(f"Error reading logs for {cmdline_spec}: {e}") from e
+
+ with stream as f:
+ ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False)
+ if ext and ext != "gz":
+ raise SpackCommandError(f"Unsupported storage format for {log_path}: {ext}")
+
+ # If the log file is gzip compressed, wrap it with a decompressor
+ _dump_byte_stream_to_stdout(gzip.GzipFile(fileobj=f) if ext == "gz" else f)
def logs(parser, args):
diff --git a/lib/spack/spack/test/util/compression.py b/lib/spack/spack/test/util/compression.py
index b3f5c15861..ab38da78ac 100644
--- a/lib/spack/spack/test/util/compression.py
+++ b/lib/spack/spack/test/util/compression.py
@@ -4,8 +4,10 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
+import io
import os
import shutil
+import tarfile
from itertools import product
import pytest
@@ -14,7 +16,7 @@ import llnl.url
from llnl.util.filesystem import working_dir
from spack.paths import spack_root
-from spack.util import compression as scomp
+from spack.util import compression
from spack.util.executable import CommandNotFoundError
datadir = os.path.join(spack_root, "lib", "spack", "spack", "test", "data", "compression")
@@ -30,15 +32,11 @@ ext_archive = {}
native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key]
-def support_stub():
- return False
-
-
@pytest.fixture
def compr_support_check(monkeypatch):
- monkeypatch.setattr(scomp, "is_lzma_supported", support_stub)
- monkeypatch.setattr(scomp, "is_gzip_supported", support_stub)
- monkeypatch.setattr(scomp, "is_bz2_supported", support_stub)
+ monkeypatch.setattr(compression, "LZMA_SUPPORTED", False)
+ monkeypatch.setattr(compression, "GZIP_SUPPORTED", False)
+ monkeypatch.setattr(compression, "BZ2_SUPPORTED", False)
@pytest.fixture
@@ -59,7 +57,7 @@ def archive_file_and_extension(tmpdir_factory, request):
)
def test_native_unpacking(tmpdir_factory, archive_file_and_extension):
archive_file, extension = archive_file_and_extension
- util = scomp.decompressor_for(archive_file, extension)
+ util = compression.decompressor_for(archive_file, extension)
tmpdir = tmpdir_factory.mktemp("comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@@ -78,7 +76,7 @@ def test_native_unpacking(tmpdir_factory, archive_file_and_extension):
def test_system_unpacking(tmpdir_factory, archive_file_and_extension, compr_support_check):
# actually run test
archive_file, _ = archive_file_and_extension
- util = scomp.decompressor_for(archive_file)
+ util = compression.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("system_comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@@ -95,4 +93,25 @@ def test_unallowed_extension():
# are picked up by the linter and break style checks
bad_ext_archive = "Foo.cxx"
with pytest.raises(CommandNotFoundError):
- scomp.decompressor_for(bad_ext_archive)
+ compression.decompressor_for(bad_ext_archive)
+
+
+@pytest.mark.parametrize("ext", ["gz", "bz2", "xz"])
+def test_file_type_check_does_not_advance_stream(tmp_path, ext):
+ # Create a tarball compressed with the given format
+ path = str(tmp_path / "compressed_tarball")
+
+ try:
+ with tarfile.open(path, f"w:{ext}") as tar:
+ tar.addfile(tarfile.TarInfo("test.txt"), fileobj=io.BytesIO(b"test"))
+ except tarfile.CompressionError:
+ pytest.skip(f"Cannot create tar.{ext} files")
+
+ # Classify the file from its magic bytes, and check that the stream is not advanced
+ with open(path, "rb") as f:
+ computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False)
+ assert computed_ext == ext
+ assert f.tell() == 0
+ computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=True)
+ assert computed_ext == f"tar.{ext}"
+ assert f.tell() == 0
diff --git a/lib/spack/spack/util/compression.py b/lib/spack/spack/util/compression.py
index 1e66b3e205..f25841ee70 100644
--- a/lib/spack/spack/util/compression.py
+++ b/lib/spack/spack/util/compression.py
@@ -3,12 +3,13 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
+import errno
import inspect
import io
import os
-import re
import shutil
import sys
+from typing import BinaryIO, Callable, Dict, List, Optional
import llnl.url
from llnl.util import tty
@@ -19,42 +20,29 @@ from spack.util.executable import CommandNotFoundError, which
try:
import bz2 # noqa
- _bz2_support = True
+ BZ2_SUPPORTED = True
except ImportError:
- _bz2_support = False
+ BZ2_SUPPORTED = False
try:
import gzip # noqa
- _gzip_support = True
+ GZIP_SUPPORTED = True
except ImportError:
- _gzip_support = False
+ GZIP_SUPPORTED = False
try:
import lzma # noqa # novermin
- _lzma_support = True
+ LZMA_SUPPORTED = True
except ImportError:
- _lzma_support = False
+ LZMA_SUPPORTED = False
-def is_lzma_supported():
- return _lzma_support
-
-
-def is_gzip_supported():
- return _gzip_support
-
-
-def is_bz2_supported():
- return _bz2_support
-
-
-def _system_untar(archive_file, remove_archive_file=False):
- """Returns path to unarchived tar file.
- Untars archive via system tar.
+def _system_untar(archive_file: str, remove_archive_file: bool = False) -> str:
+ """Returns path to unarchived tar file. Untars archive via system tar.
Args:
archive_file (str): absolute path to the archive to be extracted.
@@ -69,6 +57,11 @@ def _system_untar(archive_file, remove_archive_file=False):
archive_file = archive_file_no_ext + "-input"
shutil.move(archive_file_no_ext, archive_file)
tar = which("tar", required=True)
+ # GNU tar's --no-same-owner is not as portable, -o works for BSD tar too. This flag is relevant
+ # when extracting archives as root, where tar attempts to set original ownership of files. This
+ # is redundant when distributing tarballs, as the tarballs are created on different systems
+ # than where they are extracted. In certain cases like rootless containers, setting original
+ # ownership is known to fail, so we need to disable it.
tar.add_default_arg("-oxf")
tar(archive_file)
if remove_archive_file:
@@ -79,21 +72,21 @@ def _system_untar(archive_file, remove_archive_file=False):
return outfile
-def _bunzip2(archive_file):
+def _bunzip2(archive_file: str) -> str:
"""Returns path to decompressed file.
Uses Python's bz2 module to decompress bz2 compressed archives
Fall back to system utility failing to find Python module `bz2`
Args:
- archive_file (str): absolute path to the bz2 archive to be decompressed
+ archive_file: absolute path to the bz2 archive to be decompressed
"""
- if is_bz2_supported():
+ if BZ2_SUPPORTED:
return _py_bunzip(archive_file)
else:
return _system_bunzip(archive_file)
-def _py_bunzip(archive_file):
+def _py_bunzip(archive_file: str) -> str:
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via python's bz2 module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2"))
@@ -106,7 +99,7 @@ def _py_bunzip(archive_file):
return archive_out
-def _system_bunzip(archive_file):
+def _system_bunzip(archive_file: str) -> str:
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via system bzip2 utility"""
compressed_file_name = os.path.basename(archive_file)
@@ -121,25 +114,20 @@ def _system_bunzip(archive_file):
return archive_out
-def _gunzip(archive_file):
- """Returns path to gunzip'd file
- Decompresses `.gz` extensions. Prefer native Python `gzip` module.
- Failing back to system utility gunzip.
- Like gunzip, but extracts in the current working directory
- instead of in-place.
+def _gunzip(archive_file: str) -> str:
+ """Returns path to gunzip'd file. Decompresses `.gz` extensions. Prefer native Python
+ `gzip` module. Falling back to system utility gunzip. Like gunzip, but extracts in the current
+ working directory instead of in-place.
Args:
- archive_file (str): absolute path of the file to be decompressed
+ archive_file: absolute path of the file to be decompressed
"""
- if is_gzip_supported():
- return _py_gunzip(archive_file)
- else:
- return _system_gunzip(archive_file)
+ return _py_gunzip(archive_file) if GZIP_SUPPORTED else _system_gunzip(archive_file)
-def _py_gunzip(archive_file):
- """Returns path to gunzip'd file
- Decompresses `.gz` compressed archvies via python gzip module"""
+def _py_gunzip(archive_file: str) -> str:
+ """Returns path to gunzip'd file. Decompresses `.gz` compressed archvies via python gzip
+ module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
@@ -150,9 +138,8 @@ def _py_gunzip(archive_file):
return destination_abspath
-def _system_gunzip(archive_file):
- """Returns path to gunzip'd file
- Decompresses `.gz` compressed files via system gzip"""
+def _system_gunzip(archive_file: str) -> str:
+ """Returns path to gunzip'd file. Decompresses `.gz` compressed files via system gzip"""
archive_file_no_ext = llnl.url.strip_compression_extension(archive_file)
if archive_file_no_ext == archive_file:
# the zip file has no extension. On Unix gunzip cannot unzip onto itself
@@ -170,50 +157,38 @@ def _system_gunzip(archive_file):
return destination_abspath
-def _unzip(archive_file):
- """Returns path to extracted zip archive
- Extract Zipfile, searching for unzip system executable
- If unavailable, search for 'tar' executable on system and use instead
+def _unzip(archive_file: str) -> str:
+ """Returns path to extracted zip archive. Extract Zipfile, searching for unzip system
+ executable. If unavailable, search for 'tar' executable on system and use instead.
Args:
- archive_file (str): absolute path of the file to be decompressed
+ archive_file: absolute path of the file to be decompressed
"""
- extracted_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="zip"))
if sys.platform == "win32":
return _system_untar(archive_file)
- else:
- exe = "unzip"
- arg = "-q"
- unzip = which(exe, required=True)
- unzip.add_default_arg(arg)
- unzip(archive_file)
- return extracted_file
+ unzip = which("unzip", required=True)
+ unzip.add_default_arg("-q")
+ unzip(archive_file)
+ return os.path.basename(llnl.url.strip_extension(archive_file, extension="zip"))
-def _system_unZ(archive_file):
+def _system_unZ(archive_file: str) -> str:
"""Returns path to decompressed file
Decompress UNIX compress style compression
Utilizes gunzip on unix and 7zip on Windows
"""
if sys.platform == "win32":
- result = _system_7zip(archive_file)
- else:
- result = _system_gunzip(archive_file)
- return result
+ return _system_7zip(archive_file)
+ return _system_gunzip(archive_file)
def _lzma_decomp(archive_file):
- """Returns path to decompressed xz file.
- Decompress lzma compressed files. Prefer Python native
- lzma module, but fall back on command line xz tooling
- to find available Python support."""
- if is_lzma_supported():
- return _py_lzma(archive_file)
- else:
- return _xz(archive_file)
+ """Returns path to decompressed xz file. Decompress lzma compressed files. Prefer Python native
+ lzma module, but fall back on command line xz tooling to find available Python support."""
+ return _py_lzma(archive_file) if LZMA_SUPPORTED else _xz(archive_file)
-def _win_compressed_tarball_handler(decompressor):
+def _win_compressed_tarball_handler(decompressor: Callable[[str], str]) -> Callable[[str], str]:
"""Returns function pointer to two stage decompression
and extraction method
Decompress and extract compressed tarballs on Windows.
@@ -227,7 +202,7 @@ def _win_compressed_tarball_handler(decompressor):
can be installed manually or via spack
"""
- def unarchive(archive_file):
+ def unarchive(archive_file: str):
# perform intermediate extraction step
# record name of new archive so we can extract
decomped_tarball = decompressor(archive_file)
@@ -238,9 +213,9 @@ def _win_compressed_tarball_handler(decompressor):
return unarchive
-def _py_lzma(archive_file):
- """Returns path to decompressed .xz files
- Decompress lzma compressed .xz files via python lzma module"""
+def _py_lzma(archive_file: str) -> str:
+ """Returns path to decompressed .xz files. Decompress lzma compressed .xz files via Python
+ lzma module."""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar:
@@ -250,10 +225,8 @@ def _py_lzma(archive_file):
def _xz(archive_file):
- """Returns path to decompressed xz files
- Decompress lzma compressed .xz files via xz command line
- tool.
- """
+ """Returns path to decompressed xz files. Decompress lzma compressed .xz files via xz command
+ line tool."""
decompressed_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="xz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
@@ -292,19 +265,17 @@ unable to extract %s files. 7z can be installed via Spack"
return outfile
-def decompressor_for(path, extension=None):
+def decompressor_for(path: str, extension: Optional[str] = None):
"""Returns appropriate decompression/extraction algorithm function pointer
for provided extension. If extension is none, it is computed
from the `path` and the decompression function is derived
from that information."""
if not extension:
- extension = extension_from_file(path, decompress=True)
+ extension = extension_from_magic_numbers(path, decompress=True)
- if not llnl.url.allowed_archive(extension):
+ if not extension or not llnl.url.allowed_archive(extension):
raise CommandNotFoundError(
- "Cannot extract archive, \
-unrecognized file extension: '%s'"
- % extension
+ f"Cannot extract {path}, unrecognized file extension: '{extension}'"
)
if sys.platform == "win32":
return decompressor_for_win(extension)
@@ -312,58 +283,37 @@ unrecognized file extension: '%s'"
return decompressor_for_nix(extension)
-def decompressor_for_nix(extension):
- """Returns a function pointer to appropriate decompression
- algorithm based on extension type and unix specific considerations
- i.e. a reasonable expectation system utils like gzip, bzip2, and xz are
- available
+def decompressor_for_nix(extension: str) -> Callable[[str], str]:
+ """Returns a function pointer to appropriate decompression algorithm based on extension type
+ and unix specific considerations i.e. a reasonable expectation system utils like gzip, bzip2,
+ and xz are available
Args:
- path (str): path of the archive file requiring decompression
+ extension: path of the archive file requiring decompression
"""
- if re.match(r"zip$", extension):
- return _unzip
-
- if re.match(r"gz$", extension):
- return _gunzip
+ extension_to_decompressor: Dict[str, Callable[[str], str]] = {
+ "zip": _unzip,
+ "gz": _gunzip,
+ "bz2": _bunzip2,
+ "Z": _system_unZ, # no builtin support for .Z files
+ "xz": _lzma_decomp,
+ }
- if re.match(r"bz2$", extension):
- return _bunzip2
+ return extension_to_decompressor.get(extension, _system_untar)
- # Python does not have native support
- # of any kind for .Z files. In these cases,
- # we rely on external tools such as tar,
- # 7z, or uncompressZ
- if re.match(r"Z$", extension):
- return _system_unZ
- # Python and platform may not have support for lzma
- # compression. If no lzma support, use tools available on systems
- if re.match(r"xz$", extension):
- return _lzma_decomp
-
- return _system_untar
-
-
-def _determine_py_decomp_archive_strategy(extension):
+def _determine_py_decomp_archive_strategy(extension: str) -> Optional[Callable[[str], str]]:
"""Returns appropriate python based decompression strategy
based on extension type"""
- # Only rely on Python decompression support for gz
- if re.match(r"gz$", extension):
- return _py_gunzip
-
- # Only rely on Python decompression support for bzip2
- if re.match(r"bz2$", extension):
- return _py_bunzip
-
- # Only rely on Python decompression support for xz
- if re.match(r"xz$", extension):
- return _py_lzma
-
- return None
+ extension_to_decompressor: Dict[str, Callable[[str], str]] = {
+ "gz": _py_gunzip,
+ "bz2": _py_bunzip,
+ "xz": _py_lzma,
+ }
+ return extension_to_decompressor.get(extension, None)
-def decompressor_for_win(extension):
+def decompressor_for_win(extension: str) -> Callable[[str], str]:
"""Returns a function pointer to appropriate decompression
algorithm based on extension type and Windows specific considerations
@@ -371,34 +321,32 @@ def decompressor_for_win(extension):
So we must rely exclusively on Python module support for all compression
operations, tar for tarballs and zip files, and 7zip for Z compressed archives
and files as Python does not provide support for the UNIX compress algorithm
-
- Args:
- path (str): path of the archive file requiring decompression
- extension (str): extension
"""
extension = llnl.url.expand_contracted_extension(extension)
- # Windows native tar can handle .zip extensions, use standard
- # unzip method
- if re.match(r"zip$", extension):
- return _unzip
-
- # if extension is standard tarball, invoke Windows native tar
- if re.match(r"tar$", extension):
- return _system_untar
-
- # Python does not have native support
- # of any kind for .Z files. In these cases,
- # we rely on 7zip, which must be installed outside
- # of spack and added to the PATH or externally detected
- if re.match(r"Z$", extension):
- return _system_unZ
-
- # Windows vendors no native decompression tools, attempt to derive
- # python based decompression strategy
- # Expand extension from contracted extension i.e. tar.gz from .tgz
- # no-op on non contracted extensions
+ extension_to_decompressor: Dict[str, Callable[[str], str]] = {
+ # Windows native tar can handle .zip extensions, use standard unzip method
+ "zip": _unzip,
+ # if extension is standard tarball, invoke Windows native tar
+ "tar": _system_untar,
+ # Python does not have native support of any kind for .Z files. In these cases, we rely on
+ # 7zip, which must be installed outside of Spack and added to the PATH or externally
+ # detected
+ "Z": _system_unZ,
+ "xz": _lzma_decomp,
+ }
+
+ decompressor = extension_to_decompressor.get(extension)
+ if decompressor:
+ return decompressor
+
+ # Windows vendors no native decompression tools, attempt to derive Python based decompression
+ # strategy. Expand extension from abbreviated ones, i.e. tar.gz from .tgz
compression_extension = llnl.url.compression_ext_from_compressed_archive(extension)
- decompressor = _determine_py_decomp_archive_strategy(compression_extension)
+ decompressor = (
+ _determine_py_decomp_archive_strategy(compression_extension)
+ if compression_extension
+ else None
+ )
if not decompressor:
raise SpackError(
"Spack was unable to determine a proper decompression strategy for"
@@ -412,103 +360,75 @@ def decompressor_for_win(extension):
class FileTypeInterface:
- """
- Base interface class for describing and querying file type information.
- FileType describes information about a single file type
- such as extension, and byte header properties, and provides an interface
- to check a given file against said type based on magic number.
-
- This class should be subclassed each time a new type is to be
- described.
-
- Note: This class should not be used directly as it does not define any specific
- file. Attempts to directly use this class will fail, as it does not define
- a magic number or extension string.
-
- Subclasses should each describe a different
- type of file. In order to do so, they must define
- the extension string, magic number, and header offset (if non zero).
- If a class has multiple magic numbers, it will need to
- override the method describin that file types magic numbers and
- the method that checks a types magic numbers against a given file's.
- """
+ """Base interface class for describing and querying file type information. FileType describes
+ information about a single file type such as typical extension and byte header properties,
+ and provides an interface to check a given file against said type based on magic number.
- OFFSET = 0
- compressed = False
+ This class should be subclassed each time a new type is to be described.
+
+ Subclasses should each describe a different type of file. In order to do so, they must define
+ the extension string, magic number, and header offset (if non zero). If a class has multiple
+ magic numbers, it will need to override the method describing that file type's magic numbers
+ and the method that checks a types magic numbers against a given file's."""
- @staticmethod
- def name():
- raise NotImplementedError
+ OFFSET = 0
+ extension: str
+ name: str
@classmethod
- def magic_number(cls):
+ def magic_numbers(cls) -> List[bytes]:
"""Return a list of all potential magic numbers for a filetype"""
- return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")]
+ return [
+ value for name, value in inspect.getmembers(cls) if name.startswith("_MAGIC_NUMBER")
+ ]
@classmethod
- def header_size(cls):
+ def header_size(cls) -> int:
"""Return size of largest magic number associated with file type"""
- return max([len(x) for x in cls.magic_number()])
+ return max(len(x) for x in cls.magic_numbers())
- @classmethod
- def _bytes_check(cls, magic_bytes):
- for magic in cls.magic_number():
- if magic_bytes.startswith(magic):
- return True
- return False
-
- @classmethod
- def is_file_of_type(cls, iostream):
- """Query byte stream for appropriate magic number
+ def matches_magic(self, stream: BinaryIO) -> bool:
+ """Returns true if the stream matches the current file type by any of its magic numbers.
+ Resets stream to original position.
Args:
- iostream: file byte stream
-
- Returns:
- Bool denoting whether file is of class file type
- based on magic number
+ stream: file byte stream
"""
- if not iostream:
- return False
# move to location of magic bytes
- iostream.seek(cls.OFFSET)
- magic_bytes = iostream.read(cls.header_size())
- # return to beginning of file
- iostream.seek(0)
- if cls._bytes_check(magic_bytes):
- return True
- return False
+ offset = stream.tell()
+ stream.seek(self.OFFSET)
+ magic_bytes = stream.read(self.header_size())
+ stream.seek(offset)
+ return any(magic_bytes.startswith(magic) for magic in self.magic_numbers())
class CompressedFileTypeInterface(FileTypeInterface):
"""Interface class for FileTypes that include compression information"""
- compressed = True
+ def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
+ """This method returns the first num_bytes of a decompressed stream. Returns None if no
+ builtin support for decompression."""
+ return None
- @staticmethod
- def decomp_in_memory(stream):
- """This method decompresses and loads the first 200 or so bytes of a compressed file
- to check for compressed archives. This does not decompress the entire file and should
- not be used for direct expansion of archives/compressed files
- """
- raise NotImplementedError("Implementation by compression subclass required")
+
+def _decompressed_peek(
+ decompressed_stream: io.BufferedIOBase, stream: BinaryIO, num_bytes: int
+) -> io.BytesIO:
+ # Read the first num_bytes of the decompressed stream, do not advance the stream position.
+ pos = stream.tell()
+ data = decompressed_stream.read(num_bytes)
+ stream.seek(pos)
+ return io.BytesIO(data)
class BZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x42\x5a\x68"
extension = "bz2"
+ name = "bzip2 compressed data"
- @staticmethod
- def name():
- return "bzip2 compressed data"
-
- @staticmethod
- def decomp_in_memory(stream):
- if is_bz2_supported():
- # checking for underlying archive, only decomp as many bytes
- # as is absolutely neccesary for largest archive header (tar)
- comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size())
- return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream))
+ def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
+ if BZ2_SUPPORTED:
+ return _decompressed_peek(bz2.BZ2File(stream), stream, num_bytes)
return None
@@ -516,57 +436,28 @@ class ZCompressedFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER_LZW = b"\x1f\x9d"
_MAGIC_NUMBER_LZH = b"\x1f\xa0"
extension = "Z"
-
- @staticmethod
- def name():
- return "compress'd data"
-
- @staticmethod
- def decomp_in_memory(stream):
- # python has no method of decompressing `.Z` files in memory
- return None
+ name = "compress'd data"
class GZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x1f\x8b\x08"
extension = "gz"
+ name = "gzip compressed data"
- @staticmethod
- def name():
- return "gzip compressed data"
-
- @staticmethod
- def decomp_in_memory(stream):
- if is_gzip_supported():
- # checking for underlying archive, only decomp as many bytes
- # as is absolutely neccesary for largest archive header (tar)
- return io.BytesIO(
- initial_bytes=gzip.GzipFile(fileobj=stream).read(
- TarFileType.OFFSET + TarFileType.header_size()
- )
- )
+ def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
+ if GZIP_SUPPORTED:
+ return _decompressed_peek(gzip.GzipFile(fileobj=stream), stream, num_bytes)
return None
class LzmaFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\xfd7zXZ"
extension = "xz"
+ name = "xz compressed data"
- @staticmethod
- def name():
- return "xz compressed data"
-
- @staticmethod
- def decomp_in_memory(stream):
- if is_lzma_supported():
- # checking for underlying archive, only decomp as many bytes
- # as is absolutely neccesary for largest archive header (tar)
- max_size = TarFileType.OFFSET + TarFileType.header_size()
- return io.BytesIO(
- initial_bytes=lzma.LZMADecompressor().decompress(
- stream.read(max_size), max_length=max_size
- )
- )
+ def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
+ if LZMA_SUPPORTED:
+ return _decompressed_peek(lzma.LZMAFile(stream), stream, num_bytes)
return None
@@ -575,111 +466,111 @@ class TarFileType(FileTypeInterface):
_MAGIC_NUMBER_GNU = b"ustar \0"
_MAGIC_NUMBER_POSIX = b"ustar\x0000"
extension = "tar"
-
- @staticmethod
- def name():
- return "tar archive"
+ name = "tar archive"
class ZipFleType(FileTypeInterface):
_MAGIC_NUMBER = b"PK\003\004"
extension = "zip"
+ name = "Zip archive data"
- @staticmethod
- def name():
- return "Zip archive data"
+#: Maximum number of bytes to read from a file to determine any archive type. Tar is the largest.
+MAX_BYTES_ARCHIVE_HEADER = TarFileType.OFFSET + TarFileType.header_size()
-# collection of valid Spack recognized archive and compression
-# file type identifier classes.
-VALID_FILETYPES = [
- BZipFileType,
- ZCompressedFileType,
- GZipFileType,
- LzmaFileType,
- TarFileType,
- ZipFleType,
+#: Collection of supported archive and compression file type identifier classes.
+SUPPORTED_FILETYPES: List[FileTypeInterface] = [
+ BZipFileType(),
+ ZCompressedFileType(),
+ GZipFileType(),
+ LzmaFileType(),
+ TarFileType(),
+ ZipFleType(),
]
-def extension_from_stream(stream, decompress=False):
- """Return extension represented by stream corresponding to archive file
- If stream does not represent an archive type recongized by Spack
- (see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None
+def _extension_of_compressed_file(
+ file_type: CompressedFileTypeInterface, stream: BinaryIO
+) -> Optional[str]:
+ """Retrieves the extension of a file after decompression from its magic numbers, if it can be
+ decompressed."""
+ # To classify the file we only need to decompress the first so many bytes.
+ decompressed_magic = file_type.peek(stream, MAX_BYTES_ARCHIVE_HEADER)
- Extension type is derived by searching for identifying bytes
- in file stream.
+ if not decompressed_magic:
+ return None
- Args:
- stream : stream representing a file on system
- decompress (bool) : if True, compressed files are checked
- for archive types beneath compression i.e. tar.gz
- default is False, otherwise, return top level type i.e. gz
+ return extension_from_magic_numbers_by_stream(decompressed_magic, decompress=False)
- Return:
- A string represting corresponding archive extension
- or None as relevant.
- """
- for arc_type in VALID_FILETYPES:
- if arc_type.is_file_of_type(stream):
- suffix_ext = arc_type.extension
- prefix_ext = ""
- if arc_type.compressed and decompress:
- # stream represents compressed file
- # get decompressed stream (if possible)
- decomp_stream = arc_type.decomp_in_memory(stream)
- prefix_ext = extension_from_stream(decomp_stream, decompress=decompress)
- if not prefix_ext:
- # We were unable to decompress or unable to derive
- # a nested extension from decompressed file.
- # Try to use filename parsing to check for
- # potential nested extensions if there are any
- tty.debug(
- "Cannot derive file extension from magic number;"
- " falling back to regex path parsing."
- )
- return llnl.url.extension_from_path(stream.name)
- resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext])
- tty.debug("File extension %s successfully derived by magic number." % resultant_ext)
- return resultant_ext
+def extension_from_magic_numbers_by_stream(
+ stream: BinaryIO, decompress: bool = False
+) -> Optional[str]:
+ """Returns the typical extension for the opened file, without leading ``.``, based on its magic
+ numbers.
+
+ If the stream does not represent file type recongized by Spack (see
+ :py:data:`SUPPORTED_FILETYPES`), the method will return None
+
+ Args:
+ stream: stream representing a file on system
+ decompress: if True, compressed files are checked for archive types beneath compression.
+ For example tar.gz if True versus only gz if False."""
+ for file_type in SUPPORTED_FILETYPES:
+ if not file_type.matches_magic(stream):
+ continue
+ ext = file_type.extension
+ if decompress and isinstance(file_type, CompressedFileTypeInterface):
+ uncompressed_ext = _extension_of_compressed_file(file_type, stream)
+ if not uncompressed_ext:
+ tty.debug(
+ "Cannot derive file extension from magic number;"
+ " falling back to original file name."
+ )
+ return llnl.url.extension_from_path(stream.name)
+ ext = f"{uncompressed_ext}.{ext}"
+ tty.debug(f"File extension {ext} successfully derived by magic number.")
+ return ext
return None
-def extension_from_file(file, decompress=False):
- """Return extension from archive file path
- Extension is derived based on magic number parsing similar
- to the `file` utility. Attempts to return abbreviated file extensions
- whenever a file has an abbreviated extension such as `.tgz` or `.txz`.
- This distinction in abbreivated extension names is accomplished
- by string parsing.
+def _maybe_abbreviate_extension(path: str, extension: str) -> str:
+ """If the file is a compressed tar archive, return the abbreviated extension t[xz|gz|bz2|bz]
+ instead of tar.[xz|gz|bz2|bz] if the file's original name also has an abbreviated extension."""
+ if not extension.startswith("tar."):
+ return extension
+ abbr = f"t{extension[4:]}"
+ return abbr if llnl.url.has_extension(path, abbr) else extension
+
+
+def extension_from_magic_numbers(path: str, decompress: bool = False) -> Optional[str]:
+ """Return typical extension without leading ``.`` of a compressed file or archive at the given
+ path, based on its magic numbers, similar to the `file` utility. Notice that the extension
+ returned from this function may not coincide with the file's given extension.
Args:
- file (os.PathLike): path descibing file on system for which ext
- will be determined.
- decompress (bool): If True, method will peek into compressed
- files to check for archive file types. default is False.
- If false, method will be unable to distinguish `.tar.gz` from `.gz`
- or similar.
- Return:
- Spack recognized archive file extension as determined by file's magic number and
- file name. If file is not on system or is of an type not recognized by Spack as
- an archive or compression type, None is returned.
+ path: file to determine extension of
+ decompress: If True, method will peek into decompressed file to check for archive file
+ types. If False, the method will return only the top-level extension (for example
+ ``gz`` and not ``tar.gz``).
+ Returns:
+ Spack recognized archive file extension as determined by file's magic number and file name.
+ If file is not on system or is of a type not recognized by Spack as an archive or
+ compression type, None is returned. If the file is classified as a compressed tarball, the
+ extension is abbreviated (for instance ``tgz`` not ``tar.gz``) if that matches the file's
+ given extension.
"""
- if os.path.exists(file):
- with open(file, "rb") as f:
- ext = extension_from_stream(f, decompress)
- # based on magic number, file is compressed
- # tar archive. Check to see if file is abbreviated as
- # t[xz|gz|bz2|bz]
- if ext and ext.startswith("tar."):
- suf = ext.split(".")[1]
- abbr = "t" + suf
- if llnl.url.has_extension(file, abbr):
- return abbr
- if not ext:
- # If unable to parse extension from stream,
- # attempt to fall back to string parsing
- ext = llnl.url.extension_from_path(file)
- return ext
- return None
+ try:
+ with open(path, "rb") as f:
+ ext = extension_from_magic_numbers_by_stream(f, decompress)
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ return None
+ raise
+
+ # Return the extension derived from the magic number if possible.
+ if ext:
+ return _maybe_abbreviate_extension(path, ext)
+
+ # Otherwise, use the extension from the file name.
+ return llnl.url.extension_from_path(path)