diff options
author | Harmen Stoppels <me@harmenstoppels.nl> | 2024-01-31 07:59:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 07:59:07 +0100 |
commit | 517dac6ff84df742e85590e53785daaeed792913 (patch) | |
tree | f28d19a6bb8633e08f9b86d844dc308b494be31a /lib | |
parent | 376653ec3dd81adae058643f22a91f02a4661818 (diff) | |
download | spack-517dac6ff84df742e85590e53785daaeed792913.tar.gz spack-517dac6ff84df742e85590e53785daaeed792913.tar.bz2 spack-517dac6ff84df742e85590e53785daaeed792913.tar.xz spack-517dac6ff84df742e85590e53785daaeed792913.zip |
compression.py: refactor + bug fix (#42367)
Improve naming, so it's clear file "extensions" are not taken in the
`PurePath(path).suffix` sense as the original function name suggests,
but rather that the files are opened and their magic bytes are
classified.
Add type hints.
Fix a bug where `stream.read(num_bytes)` was run on the compressed
stream instead of the uncompressed stream, which can potentially break
detection of tar.bz2 files.
Ensure that when peeking into streams for magic bytes, they are reset to
their original position upon return.
Use new API in `spack logs`.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/spack/docs/conf.py | 1 | ||||
-rw-r--r-- | lib/spack/spack/cmd/logs.py | 43 | ||||
-rw-r--r-- | lib/spack/spack/test/util/compression.py | 41 | ||||
-rw-r--r-- | lib/spack/spack/util/compression.py | 589 |
4 files changed, 289 insertions, 385 deletions
diff --git a/lib/spack/docs/conf.py b/lib/spack/docs/conf.py index 8f52edb89c..e0542640a2 100644 --- a/lib/spack/docs/conf.py +++ b/lib/spack/docs/conf.py @@ -199,6 +199,7 @@ nitpick_ignore = [ ("py:class", "contextlib.contextmanager"), ("py:class", "module"), ("py:class", "_io.BufferedReader"), + ("py:class", "_io.BytesIO"), ("py:class", "unittest.case.TestCase"), ("py:class", "_frozen_importlib_external.SourceFileLoader"), ("py:class", "clingo.Control"), diff --git a/lib/spack/spack/cmd/logs.py b/lib/spack/spack/cmd/logs.py index a9ec4dad61..9d5da880ed 100644 --- a/lib/spack/spack/cmd/logs.py +++ b/lib/spack/spack/cmd/logs.py @@ -5,11 +5,13 @@ import errno import gzip +import io import os import shutil import sys import spack.cmd +import spack.spec import spack.util.compression as compression from spack.cmd.common import arguments from spack.main import SpackCommandError @@ -23,45 +25,36 @@ def setup_parser(subparser): arguments.add_common_arguments(subparser, ["spec"]) -def _dump_byte_stream_to_stdout(instream): +def _dump_byte_stream_to_stdout(instream: io.BufferedIOBase) -> None: + # Reopen stdout in binary mode so we don't have to worry about encoding outstream = os.fdopen(sys.stdout.fileno(), "wb", closefd=False) - shutil.copyfileobj(instream, outstream) -def dump_build_log(package): - with open(package.log_path, "rb") as f: - _dump_byte_stream_to_stdout(f) - - -def _logs(cmdline_spec, concrete_spec): +def _logs(cmdline_spec: spack.spec.Spec, concrete_spec: spack.spec.Spec): if concrete_spec.installed: log_path = concrete_spec.package.install_log_path elif os.path.exists(concrete_spec.package.stage.path): - dump_build_log(concrete_spec.package) - return + # TODO: `spack logs` can currently not show the logs while a package is being built, as the + # combined log file is only written after the build is finished. + log_path = concrete_spec.package.log_path else: raise SpackCommandError(f"{cmdline_spec} is not installed or staged") try: - compression_ext = compression.extension_from_file(log_path) - with open(log_path, "rb") as fstream: - if compression_ext == "gz": - # If the log file is compressed, wrap it with a decompressor - fstream = gzip.open(log_path, "rb") - elif compression_ext: - raise SpackCommandError( - f"Unsupported storage format for {log_path}: {compression_ext}" - ) - - _dump_byte_stream_to_stdout(fstream) + stream = open(log_path, "rb") except OSError as e: if e.errno == errno.ENOENT: raise SpackCommandError(f"No logs are available for {cmdline_spec}") from e - elif e.errno == errno.EPERM: - raise SpackCommandError(f"Permission error accessing {log_path}") from e - else: - raise + raise SpackCommandError(f"Error reading logs for {cmdline_spec}: {e}") from e + + with stream as f: + ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False) + if ext and ext != "gz": + raise SpackCommandError(f"Unsupported storage format for {log_path}: {ext}") + + # If the log file is gzip compressed, wrap it with a decompressor + _dump_byte_stream_to_stdout(gzip.GzipFile(fileobj=f) if ext == "gz" else f) def logs(parser, args): diff --git a/lib/spack/spack/test/util/compression.py b/lib/spack/spack/test/util/compression.py index b3f5c15861..ab38da78ac 100644 --- a/lib/spack/spack/test/util/compression.py +++ b/lib/spack/spack/test/util/compression.py @@ -4,8 +4,10 @@ # SPDX-License-Identifier: (Apache-2.0 OR MIT) +import io import os import shutil +import tarfile from itertools import product import pytest @@ -14,7 +16,7 @@ import llnl.url from llnl.util.filesystem import working_dir from spack.paths import spack_root -from spack.util import compression as scomp +from spack.util import compression from spack.util.executable import CommandNotFoundError datadir = os.path.join(spack_root, "lib", "spack", "spack", "test", "data", "compression") @@ -30,15 +32,11 @@ ext_archive = {} native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key] -def support_stub(): - return False - - @pytest.fixture def compr_support_check(monkeypatch): - monkeypatch.setattr(scomp, "is_lzma_supported", support_stub) - monkeypatch.setattr(scomp, "is_gzip_supported", support_stub) - monkeypatch.setattr(scomp, "is_bz2_supported", support_stub) + monkeypatch.setattr(compression, "LZMA_SUPPORTED", False) + monkeypatch.setattr(compression, "GZIP_SUPPORTED", False) + monkeypatch.setattr(compression, "BZ2_SUPPORTED", False) @pytest.fixture @@ -59,7 +57,7 @@ def archive_file_and_extension(tmpdir_factory, request): ) def test_native_unpacking(tmpdir_factory, archive_file_and_extension): archive_file, extension = archive_file_and_extension - util = scomp.decompressor_for(archive_file, extension) + util = compression.decompressor_for(archive_file, extension) tmpdir = tmpdir_factory.mktemp("comp_test") with working_dir(str(tmpdir)): assert not os.listdir(os.getcwd()) @@ -78,7 +76,7 @@ def test_native_unpacking(tmpdir_factory, archive_file_and_extension): def test_system_unpacking(tmpdir_factory, archive_file_and_extension, compr_support_check): # actually run test archive_file, _ = archive_file_and_extension - util = scomp.decompressor_for(archive_file) + util = compression.decompressor_for(archive_file) tmpdir = tmpdir_factory.mktemp("system_comp_test") with working_dir(str(tmpdir)): assert not os.listdir(os.getcwd()) @@ -95,4 +93,25 @@ def test_unallowed_extension(): # are picked up by the linter and break style checks bad_ext_archive = "Foo.cxx" with pytest.raises(CommandNotFoundError): - scomp.decompressor_for(bad_ext_archive) + compression.decompressor_for(bad_ext_archive) + + +@pytest.mark.parametrize("ext", ["gz", "bz2", "xz"]) +def test_file_type_check_does_not_advance_stream(tmp_path, ext): + # Create a tarball compressed with the given format + path = str(tmp_path / "compressed_tarball") + + try: + with tarfile.open(path, f"w:{ext}") as tar: + tar.addfile(tarfile.TarInfo("test.txt"), fileobj=io.BytesIO(b"test")) + except tarfile.CompressionError: + pytest.skip(f"Cannot create tar.{ext} files") + + # Classify the file from its magic bytes, and check that the stream is not advanced + with open(path, "rb") as f: + computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False) + assert computed_ext == ext + assert f.tell() == 0 + computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=True) + assert computed_ext == f"tar.{ext}" + assert f.tell() == 0 diff --git a/lib/spack/spack/util/compression.py b/lib/spack/spack/util/compression.py index 1e66b3e205..f25841ee70 100644 --- a/lib/spack/spack/util/compression.py +++ b/lib/spack/spack/util/compression.py @@ -3,12 +3,13 @@ # # SPDX-License-Identifier: (Apache-2.0 OR MIT) +import errno import inspect import io import os -import re import shutil import sys +from typing import BinaryIO, Callable, Dict, List, Optional import llnl.url from llnl.util import tty @@ -19,42 +20,29 @@ from spack.util.executable import CommandNotFoundError, which try: import bz2 # noqa - _bz2_support = True + BZ2_SUPPORTED = True except ImportError: - _bz2_support = False + BZ2_SUPPORTED = False try: import gzip # noqa - _gzip_support = True + GZIP_SUPPORTED = True except ImportError: - _gzip_support = False + GZIP_SUPPORTED = False try: import lzma # noqa # novermin - _lzma_support = True + LZMA_SUPPORTED = True except ImportError: - _lzma_support = False + LZMA_SUPPORTED = False -def is_lzma_supported(): - return _lzma_support - - -def is_gzip_supported(): - return _gzip_support - - -def is_bz2_supported(): - return _bz2_support - - -def _system_untar(archive_file, remove_archive_file=False): - """Returns path to unarchived tar file. - Untars archive via system tar. +def _system_untar(archive_file: str, remove_archive_file: bool = False) -> str: + """Returns path to unarchived tar file. Untars archive via system tar. Args: archive_file (str): absolute path to the archive to be extracted. @@ -69,6 +57,11 @@ def _system_untar(archive_file, remove_archive_file=False): archive_file = archive_file_no_ext + "-input" shutil.move(archive_file_no_ext, archive_file) tar = which("tar", required=True) + # GNU tar's --no-same-owner is not as portable, -o works for BSD tar too. This flag is relevant + # when extracting archives as root, where tar attempts to set original ownership of files. This + # is redundant when distributing tarballs, as the tarballs are created on different systems + # than where they are extracted. In certain cases like rootless containers, setting original + # ownership is known to fail, so we need to disable it. tar.add_default_arg("-oxf") tar(archive_file) if remove_archive_file: @@ -79,21 +72,21 @@ def _system_untar(archive_file, remove_archive_file=False): return outfile -def _bunzip2(archive_file): +def _bunzip2(archive_file: str) -> str: """Returns path to decompressed file. Uses Python's bz2 module to decompress bz2 compressed archives Fall back to system utility failing to find Python module `bz2` Args: - archive_file (str): absolute path to the bz2 archive to be decompressed + archive_file: absolute path to the bz2 archive to be decompressed """ - if is_bz2_supported(): + if BZ2_SUPPORTED: return _py_bunzip(archive_file) else: return _system_bunzip(archive_file) -def _py_bunzip(archive_file): +def _py_bunzip(archive_file: str) -> str: """Returns path to decompressed file. Decompresses bz2 compressed archives/files via python's bz2 module""" decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2")) @@ -106,7 +99,7 @@ def _py_bunzip(archive_file): return archive_out -def _system_bunzip(archive_file): +def _system_bunzip(archive_file: str) -> str: """Returns path to decompressed file. Decompresses bz2 compressed archives/files via system bzip2 utility""" compressed_file_name = os.path.basename(archive_file) @@ -121,25 +114,20 @@ def _system_bunzip(archive_file): return archive_out -def _gunzip(archive_file): - """Returns path to gunzip'd file - Decompresses `.gz` extensions. Prefer native Python `gzip` module. - Failing back to system utility gunzip. - Like gunzip, but extracts in the current working directory - instead of in-place. +def _gunzip(archive_file: str) -> str: + """Returns path to gunzip'd file. Decompresses `.gz` extensions. Prefer native Python + `gzip` module. Falling back to system utility gunzip. Like gunzip, but extracts in the current + working directory instead of in-place. Args: - archive_file (str): absolute path of the file to be decompressed + archive_file: absolute path of the file to be decompressed """ - if is_gzip_supported(): - return _py_gunzip(archive_file) - else: - return _system_gunzip(archive_file) + return _py_gunzip(archive_file) if GZIP_SUPPORTED else _system_gunzip(archive_file) -def _py_gunzip(archive_file): - """Returns path to gunzip'd file - Decompresses `.gz` compressed archvies via python gzip module""" +def _py_gunzip(archive_file: str) -> str: + """Returns path to gunzip'd file. Decompresses `.gz` compressed archvies via python gzip + module""" decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "gz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) @@ -150,9 +138,8 @@ def _py_gunzip(archive_file): return destination_abspath -def _system_gunzip(archive_file): - """Returns path to gunzip'd file - Decompresses `.gz` compressed files via system gzip""" +def _system_gunzip(archive_file: str) -> str: + """Returns path to gunzip'd file. Decompresses `.gz` compressed files via system gzip""" archive_file_no_ext = llnl.url.strip_compression_extension(archive_file) if archive_file_no_ext == archive_file: # the zip file has no extension. On Unix gunzip cannot unzip onto itself @@ -170,50 +157,38 @@ def _system_gunzip(archive_file): return destination_abspath -def _unzip(archive_file): - """Returns path to extracted zip archive - Extract Zipfile, searching for unzip system executable - If unavailable, search for 'tar' executable on system and use instead +def _unzip(archive_file: str) -> str: + """Returns path to extracted zip archive. Extract Zipfile, searching for unzip system + executable. If unavailable, search for 'tar' executable on system and use instead. Args: - archive_file (str): absolute path of the file to be decompressed + archive_file: absolute path of the file to be decompressed """ - extracted_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="zip")) if sys.platform == "win32": return _system_untar(archive_file) - else: - exe = "unzip" - arg = "-q" - unzip = which(exe, required=True) - unzip.add_default_arg(arg) - unzip(archive_file) - return extracted_file + unzip = which("unzip", required=True) + unzip.add_default_arg("-q") + unzip(archive_file) + return os.path.basename(llnl.url.strip_extension(archive_file, extension="zip")) -def _system_unZ(archive_file): +def _system_unZ(archive_file: str) -> str: """Returns path to decompressed file Decompress UNIX compress style compression Utilizes gunzip on unix and 7zip on Windows """ if sys.platform == "win32": - result = _system_7zip(archive_file) - else: - result = _system_gunzip(archive_file) - return result + return _system_7zip(archive_file) + return _system_gunzip(archive_file) def _lzma_decomp(archive_file): - """Returns path to decompressed xz file. - Decompress lzma compressed files. Prefer Python native - lzma module, but fall back on command line xz tooling - to find available Python support.""" - if is_lzma_supported(): - return _py_lzma(archive_file) - else: - return _xz(archive_file) + """Returns path to decompressed xz file. Decompress lzma compressed files. Prefer Python native + lzma module, but fall back on command line xz tooling to find available Python support.""" + return _py_lzma(archive_file) if LZMA_SUPPORTED else _xz(archive_file) -def _win_compressed_tarball_handler(decompressor): +def _win_compressed_tarball_handler(decompressor: Callable[[str], str]) -> Callable[[str], str]: """Returns function pointer to two stage decompression and extraction method Decompress and extract compressed tarballs on Windows. @@ -227,7 +202,7 @@ def _win_compressed_tarball_handler(decompressor): can be installed manually or via spack """ - def unarchive(archive_file): + def unarchive(archive_file: str): # perform intermediate extraction step # record name of new archive so we can extract decomped_tarball = decompressor(archive_file) @@ -238,9 +213,9 @@ def _win_compressed_tarball_handler(decompressor): return unarchive -def _py_lzma(archive_file): - """Returns path to decompressed .xz files - Decompress lzma compressed .xz files via python lzma module""" +def _py_lzma(archive_file: str) -> str: + """Returns path to decompressed .xz files. Decompress lzma compressed .xz files via Python + lzma module.""" decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "xz")) archive_out = os.path.join(os.getcwd(), decompressed_file) with open(archive_out, "wb") as ar: @@ -250,10 +225,8 @@ def _py_lzma(archive_file): def _xz(archive_file): - """Returns path to decompressed xz files - Decompress lzma compressed .xz files via xz command line - tool. - """ + """Returns path to decompressed xz files. Decompress lzma compressed .xz files via xz command + line tool.""" decompressed_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="xz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) @@ -292,19 +265,17 @@ unable to extract %s files. 7z can be installed via Spack" return outfile -def decompressor_for(path, extension=None): +def decompressor_for(path: str, extension: Optional[str] = None): """Returns appropriate decompression/extraction algorithm function pointer for provided extension. If extension is none, it is computed from the `path` and the decompression function is derived from that information.""" if not extension: - extension = extension_from_file(path, decompress=True) + extension = extension_from_magic_numbers(path, decompress=True) - if not llnl.url.allowed_archive(extension): + if not extension or not llnl.url.allowed_archive(extension): raise CommandNotFoundError( - "Cannot extract archive, \ -unrecognized file extension: '%s'" - % extension + f"Cannot extract {path}, unrecognized file extension: '{extension}'" ) if sys.platform == "win32": return decompressor_for_win(extension) @@ -312,58 +283,37 @@ unrecognized file extension: '%s'" return decompressor_for_nix(extension) -def decompressor_for_nix(extension): - """Returns a function pointer to appropriate decompression - algorithm based on extension type and unix specific considerations - i.e. a reasonable expectation system utils like gzip, bzip2, and xz are - available +def decompressor_for_nix(extension: str) -> Callable[[str], str]: + """Returns a function pointer to appropriate decompression algorithm based on extension type + and unix specific considerations i.e. a reasonable expectation system utils like gzip, bzip2, + and xz are available Args: - path (str): path of the archive file requiring decompression + extension: path of the archive file requiring decompression """ - if re.match(r"zip$", extension): - return _unzip - - if re.match(r"gz$", extension): - return _gunzip + extension_to_decompressor: Dict[str, Callable[[str], str]] = { + "zip": _unzip, + "gz": _gunzip, + "bz2": _bunzip2, + "Z": _system_unZ, # no builtin support for .Z files + "xz": _lzma_decomp, + } - if re.match(r"bz2$", extension): - return _bunzip2 + return extension_to_decompressor.get(extension, _system_untar) - # Python does not have native support - # of any kind for .Z files. In these cases, - # we rely on external tools such as tar, - # 7z, or uncompressZ - if re.match(r"Z$", extension): - return _system_unZ - # Python and platform may not have support for lzma - # compression. If no lzma support, use tools available on systems - if re.match(r"xz$", extension): - return _lzma_decomp - - return _system_untar - - -def _determine_py_decomp_archive_strategy(extension): +def _determine_py_decomp_archive_strategy(extension: str) -> Optional[Callable[[str], str]]: """Returns appropriate python based decompression strategy based on extension type""" - # Only rely on Python decompression support for gz - if re.match(r"gz$", extension): - return _py_gunzip - - # Only rely on Python decompression support for bzip2 - if re.match(r"bz2$", extension): - return _py_bunzip - - # Only rely on Python decompression support for xz - if re.match(r"xz$", extension): - return _py_lzma - - return None + extension_to_decompressor: Dict[str, Callable[[str], str]] = { + "gz": _py_gunzip, + "bz2": _py_bunzip, + "xz": _py_lzma, + } + return extension_to_decompressor.get(extension, None) -def decompressor_for_win(extension): +def decompressor_for_win(extension: str) -> Callable[[str], str]: """Returns a function pointer to appropriate decompression algorithm based on extension type and Windows specific considerations @@ -371,34 +321,32 @@ def decompressor_for_win(extension): So we must rely exclusively on Python module support for all compression operations, tar for tarballs and zip files, and 7zip for Z compressed archives and files as Python does not provide support for the UNIX compress algorithm - - Args: - path (str): path of the archive file requiring decompression - extension (str): extension """ extension = llnl.url.expand_contracted_extension(extension) - # Windows native tar can handle .zip extensions, use standard - # unzip method - if re.match(r"zip$", extension): - return _unzip - - # if extension is standard tarball, invoke Windows native tar - if re.match(r"tar$", extension): - return _system_untar - - # Python does not have native support - # of any kind for .Z files. In these cases, - # we rely on 7zip, which must be installed outside - # of spack and added to the PATH or externally detected - if re.match(r"Z$", extension): - return _system_unZ - - # Windows vendors no native decompression tools, attempt to derive - # python based decompression strategy - # Expand extension from contracted extension i.e. tar.gz from .tgz - # no-op on non contracted extensions + extension_to_decompressor: Dict[str, Callable[[str], str]] = { + # Windows native tar can handle .zip extensions, use standard unzip method + "zip": _unzip, + # if extension is standard tarball, invoke Windows native tar + "tar": _system_untar, + # Python does not have native support of any kind for .Z files. In these cases, we rely on + # 7zip, which must be installed outside of Spack and added to the PATH or externally + # detected + "Z": _system_unZ, + "xz": _lzma_decomp, + } + + decompressor = extension_to_decompressor.get(extension) + if decompressor: + return decompressor + + # Windows vendors no native decompression tools, attempt to derive Python based decompression + # strategy. Expand extension from abbreviated ones, i.e. tar.gz from .tgz compression_extension = llnl.url.compression_ext_from_compressed_archive(extension) - decompressor = _determine_py_decomp_archive_strategy(compression_extension) + decompressor = ( + _determine_py_decomp_archive_strategy(compression_extension) + if compression_extension + else None + ) if not decompressor: raise SpackError( "Spack was unable to determine a proper decompression strategy for" @@ -412,103 +360,75 @@ def decompressor_for_win(extension): class FileTypeInterface: - """ - Base interface class for describing and querying file type information. - FileType describes information about a single file type - such as extension, and byte header properties, and provides an interface - to check a given file against said type based on magic number. - - This class should be subclassed each time a new type is to be - described. - - Note: This class should not be used directly as it does not define any specific - file. Attempts to directly use this class will fail, as it does not define - a magic number or extension string. - - Subclasses should each describe a different - type of file. In order to do so, they must define - the extension string, magic number, and header offset (if non zero). - If a class has multiple magic numbers, it will need to - override the method describin that file types magic numbers and - the method that checks a types magic numbers against a given file's. - """ + """Base interface class for describing and querying file type information. FileType describes + information about a single file type such as typical extension and byte header properties, + and provides an interface to check a given file against said type based on magic number. - OFFSET = 0 - compressed = False + This class should be subclassed each time a new type is to be described. + + Subclasses should each describe a different type of file. In order to do so, they must define + the extension string, magic number, and header offset (if non zero). If a class has multiple + magic numbers, it will need to override the method describing that file type's magic numbers + and the method that checks a types magic numbers against a given file's.""" - @staticmethod - def name(): - raise NotImplementedError + OFFSET = 0 + extension: str + name: str @classmethod - def magic_number(cls): + def magic_numbers(cls) -> List[bytes]: """Return a list of all potential magic numbers for a filetype""" - return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")] + return [ + value for name, value in inspect.getmembers(cls) if name.startswith("_MAGIC_NUMBER") + ] @classmethod - def header_size(cls): + def header_size(cls) -> int: """Return size of largest magic number associated with file type""" - return max([len(x) for x in cls.magic_number()]) + return max(len(x) for x in cls.magic_numbers()) - @classmethod - def _bytes_check(cls, magic_bytes): - for magic in cls.magic_number(): - if magic_bytes.startswith(magic): - return True - return False - - @classmethod - def is_file_of_type(cls, iostream): - """Query byte stream for appropriate magic number + def matches_magic(self, stream: BinaryIO) -> bool: + """Returns true if the stream matches the current file type by any of its magic numbers. + Resets stream to original position. Args: - iostream: file byte stream - - Returns: - Bool denoting whether file is of class file type - based on magic number + stream: file byte stream """ - if not iostream: - return False # move to location of magic bytes - iostream.seek(cls.OFFSET) - magic_bytes = iostream.read(cls.header_size()) - # return to beginning of file - iostream.seek(0) - if cls._bytes_check(magic_bytes): - return True - return False + offset = stream.tell() + stream.seek(self.OFFSET) + magic_bytes = stream.read(self.header_size()) + stream.seek(offset) + return any(magic_bytes.startswith(magic) for magic in self.magic_numbers()) class CompressedFileTypeInterface(FileTypeInterface): """Interface class for FileTypes that include compression information""" - compressed = True + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + """This method returns the first num_bytes of a decompressed stream. Returns None if no + builtin support for decompression.""" + return None - @staticmethod - def decomp_in_memory(stream): - """This method decompresses and loads the first 200 or so bytes of a compressed file - to check for compressed archives. This does not decompress the entire file and should - not be used for direct expansion of archives/compressed files - """ - raise NotImplementedError("Implementation by compression subclass required") + +def _decompressed_peek( + decompressed_stream: io.BufferedIOBase, stream: BinaryIO, num_bytes: int +) -> io.BytesIO: + # Read the first num_bytes of the decompressed stream, do not advance the stream position. + pos = stream.tell() + data = decompressed_stream.read(num_bytes) + stream.seek(pos) + return io.BytesIO(data) class BZipFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\x42\x5a\x68" extension = "bz2" + name = "bzip2 compressed data" - @staticmethod - def name(): - return "bzip2 compressed data" - - @staticmethod - def decomp_in_memory(stream): - if is_bz2_supported(): - # checking for underlying archive, only decomp as many bytes - # as is absolutely neccesary for largest archive header (tar) - comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size()) - return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream)) + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + if BZ2_SUPPORTED: + return _decompressed_peek(bz2.BZ2File(stream), stream, num_bytes) return None @@ -516,57 +436,28 @@ class ZCompressedFileType(CompressedFileTypeInterface): _MAGIC_NUMBER_LZW = b"\x1f\x9d" _MAGIC_NUMBER_LZH = b"\x1f\xa0" extension = "Z" - - @staticmethod - def name(): - return "compress'd data" - - @staticmethod - def decomp_in_memory(stream): - # python has no method of decompressing `.Z` files in memory - return None + name = "compress'd data" class GZipFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\x1f\x8b\x08" extension = "gz" + name = "gzip compressed data" - @staticmethod - def name(): - return "gzip compressed data" - - @staticmethod - def decomp_in_memory(stream): - if is_gzip_supported(): - # checking for underlying archive, only decomp as many bytes - # as is absolutely neccesary for largest archive header (tar) - return io.BytesIO( - initial_bytes=gzip.GzipFile(fileobj=stream).read( - TarFileType.OFFSET + TarFileType.header_size() - ) - ) + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + if GZIP_SUPPORTED: + return _decompressed_peek(gzip.GzipFile(fileobj=stream), stream, num_bytes) return None class LzmaFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\xfd7zXZ" extension = "xz" + name = "xz compressed data" - @staticmethod - def name(): - return "xz compressed data" - - @staticmethod - def decomp_in_memory(stream): - if is_lzma_supported(): - # checking for underlying archive, only decomp as many bytes - # as is absolutely neccesary for largest archive header (tar) - max_size = TarFileType.OFFSET + TarFileType.header_size() - return io.BytesIO( - initial_bytes=lzma.LZMADecompressor().decompress( - stream.read(max_size), max_length=max_size - ) - ) + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + if LZMA_SUPPORTED: + return _decompressed_peek(lzma.LZMAFile(stream), stream, num_bytes) return None @@ -575,111 +466,111 @@ class TarFileType(FileTypeInterface): _MAGIC_NUMBER_GNU = b"ustar \0" _MAGIC_NUMBER_POSIX = b"ustar\x0000" extension = "tar" - - @staticmethod - def name(): - return "tar archive" + name = "tar archive" class ZipFleType(FileTypeInterface): _MAGIC_NUMBER = b"PK\003\004" extension = "zip" + name = "Zip archive data" - @staticmethod - def name(): - return "Zip archive data" +#: Maximum number of bytes to read from a file to determine any archive type. Tar is the largest. +MAX_BYTES_ARCHIVE_HEADER = TarFileType.OFFSET + TarFileType.header_size() -# collection of valid Spack recognized archive and compression -# file type identifier classes. -VALID_FILETYPES = [ - BZipFileType, - ZCompressedFileType, - GZipFileType, - LzmaFileType, - TarFileType, - ZipFleType, +#: Collection of supported archive and compression file type identifier classes. +SUPPORTED_FILETYPES: List[FileTypeInterface] = [ + BZipFileType(), + ZCompressedFileType(), + GZipFileType(), + LzmaFileType(), + TarFileType(), + ZipFleType(), ] -def extension_from_stream(stream, decompress=False): - """Return extension represented by stream corresponding to archive file - If stream does not represent an archive type recongized by Spack - (see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None +def _extension_of_compressed_file( + file_type: CompressedFileTypeInterface, stream: BinaryIO +) -> Optional[str]: + """Retrieves the extension of a file after decompression from its magic numbers, if it can be + decompressed.""" + # To classify the file we only need to decompress the first so many bytes. + decompressed_magic = file_type.peek(stream, MAX_BYTES_ARCHIVE_HEADER) - Extension type is derived by searching for identifying bytes - in file stream. + if not decompressed_magic: + return None - Args: - stream : stream representing a file on system - decompress (bool) : if True, compressed files are checked - for archive types beneath compression i.e. tar.gz - default is False, otherwise, return top level type i.e. gz + return extension_from_magic_numbers_by_stream(decompressed_magic, decompress=False) - Return: - A string represting corresponding archive extension - or None as relevant. - """ - for arc_type in VALID_FILETYPES: - if arc_type.is_file_of_type(stream): - suffix_ext = arc_type.extension - prefix_ext = "" - if arc_type.compressed and decompress: - # stream represents compressed file - # get decompressed stream (if possible) - decomp_stream = arc_type.decomp_in_memory(stream) - prefix_ext = extension_from_stream(decomp_stream, decompress=decompress) - if not prefix_ext: - # We were unable to decompress or unable to derive - # a nested extension from decompressed file. - # Try to use filename parsing to check for - # potential nested extensions if there are any - tty.debug( - "Cannot derive file extension from magic number;" - " falling back to regex path parsing." - ) - return llnl.url.extension_from_path(stream.name) - resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext]) - tty.debug("File extension %s successfully derived by magic number." % resultant_ext) - return resultant_ext +def extension_from_magic_numbers_by_stream( + stream: BinaryIO, decompress: bool = False +) -> Optional[str]: + """Returns the typical extension for the opened file, without leading ``.``, based on its magic + numbers. + + If the stream does not represent file type recongized by Spack (see + :py:data:`SUPPORTED_FILETYPES`), the method will return None + + Args: + stream: stream representing a file on system + decompress: if True, compressed files are checked for archive types beneath compression. + For example tar.gz if True versus only gz if False.""" + for file_type in SUPPORTED_FILETYPES: + if not file_type.matches_magic(stream): + continue + ext = file_type.extension + if decompress and isinstance(file_type, CompressedFileTypeInterface): + uncompressed_ext = _extension_of_compressed_file(file_type, stream) + if not uncompressed_ext: + tty.debug( + "Cannot derive file extension from magic number;" + " falling back to original file name." + ) + return llnl.url.extension_from_path(stream.name) + ext = f"{uncompressed_ext}.{ext}" + tty.debug(f"File extension {ext} successfully derived by magic number.") + return ext return None -def extension_from_file(file, decompress=False): - """Return extension from archive file path - Extension is derived based on magic number parsing similar - to the `file` utility. Attempts to return abbreviated file extensions - whenever a file has an abbreviated extension such as `.tgz` or `.txz`. - This distinction in abbreivated extension names is accomplished - by string parsing. +def _maybe_abbreviate_extension(path: str, extension: str) -> str: + """If the file is a compressed tar archive, return the abbreviated extension t[xz|gz|bz2|bz] + instead of tar.[xz|gz|bz2|bz] if the file's original name also has an abbreviated extension.""" + if not extension.startswith("tar."): + return extension + abbr = f"t{extension[4:]}" + return abbr if llnl.url.has_extension(path, abbr) else extension + + +def extension_from_magic_numbers(path: str, decompress: bool = False) -> Optional[str]: + """Return typical extension without leading ``.`` of a compressed file or archive at the given + path, based on its magic numbers, similar to the `file` utility. Notice that the extension + returned from this function may not coincide with the file's given extension. Args: - file (os.PathLike): path descibing file on system for which ext - will be determined. - decompress (bool): If True, method will peek into compressed - files to check for archive file types. default is False. - If false, method will be unable to distinguish `.tar.gz` from `.gz` - or similar. - Return: - Spack recognized archive file extension as determined by file's magic number and - file name. If file is not on system or is of an type not recognized by Spack as - an archive or compression type, None is returned. + path: file to determine extension of + decompress: If True, method will peek into decompressed file to check for archive file + types. If False, the method will return only the top-level extension (for example + ``gz`` and not ``tar.gz``). + Returns: + Spack recognized archive file extension as determined by file's magic number and file name. + If file is not on system or is of a type not recognized by Spack as an archive or + compression type, None is returned. If the file is classified as a compressed tarball, the + extension is abbreviated (for instance ``tgz`` not ``tar.gz``) if that matches the file's + given extension. """ - if os.path.exists(file): - with open(file, "rb") as f: - ext = extension_from_stream(f, decompress) - # based on magic number, file is compressed - # tar archive. Check to see if file is abbreviated as - # t[xz|gz|bz2|bz] - if ext and ext.startswith("tar."): - suf = ext.split(".")[1] - abbr = "t" + suf - if llnl.url.has_extension(file, abbr): - return abbr - if not ext: - # If unable to parse extension from stream, - # attempt to fall back to string parsing - ext = llnl.url.extension_from_path(file) - return ext - return None + try: + with open(path, "rb") as f: + ext = extension_from_magic_numbers_by_stream(f, decompress) + except OSError as e: + if e.errno == errno.ENOENT: + return None + raise + + # Return the extension derived from the magic number if possible. + if ext: + return _maybe_abbreviate_extension(path, ext) + + # Otherwise, use the extension from the file name. + return llnl.url.extension_from_path(path) |