diff options
-rw-r--r-- | lib/spack/docs/conf.py | 1 | ||||
-rw-r--r-- | lib/spack/spack/cmd/logs.py | 43 | ||||
-rw-r--r-- | lib/spack/spack/test/util/compression.py | 41 | ||||
-rw-r--r-- | lib/spack/spack/util/compression.py | 589 |
4 files changed, 289 insertions, 385 deletions
diff --git a/lib/spack/docs/conf.py b/lib/spack/docs/conf.py index 8f52edb89c..e0542640a2 100644 --- a/lib/spack/docs/conf.py +++ b/lib/spack/docs/conf.py @@ -199,6 +199,7 @@ nitpick_ignore = [ ("py:class", "contextlib.contextmanager"), ("py:class", "module"), ("py:class", "_io.BufferedReader"), + ("py:class", "_io.BytesIO"), ("py:class", "unittest.case.TestCase"), ("py:class", "_frozen_importlib_external.SourceFileLoader"), ("py:class", "clingo.Control"), diff --git a/lib/spack/spack/cmd/logs.py b/lib/spack/spack/cmd/logs.py index a9ec4dad61..9d5da880ed 100644 --- a/lib/spack/spack/cmd/logs.py +++ b/lib/spack/spack/cmd/logs.py @@ -5,11 +5,13 @@ import errno import gzip +import io import os import shutil import sys import spack.cmd +import spack.spec import spack.util.compression as compression from spack.cmd.common import arguments from spack.main import SpackCommandError @@ -23,45 +25,36 @@ def setup_parser(subparser): arguments.add_common_arguments(subparser, ["spec"]) -def _dump_byte_stream_to_stdout(instream): +def _dump_byte_stream_to_stdout(instream: io.BufferedIOBase) -> None: + # Reopen stdout in binary mode so we don't have to worry about encoding outstream = os.fdopen(sys.stdout.fileno(), "wb", closefd=False) - shutil.copyfileobj(instream, outstream) -def dump_build_log(package): - with open(package.log_path, "rb") as f: - _dump_byte_stream_to_stdout(f) - - -def _logs(cmdline_spec, concrete_spec): +def _logs(cmdline_spec: spack.spec.Spec, concrete_spec: spack.spec.Spec): if concrete_spec.installed: log_path = concrete_spec.package.install_log_path elif os.path.exists(concrete_spec.package.stage.path): - dump_build_log(concrete_spec.package) - return + # TODO: `spack logs` can currently not show the logs while a package is being built, as the + # combined log file is only written after the build is finished. + log_path = concrete_spec.package.log_path else: raise SpackCommandError(f"{cmdline_spec} is not installed or staged") try: - compression_ext = compression.extension_from_file(log_path) - with open(log_path, "rb") as fstream: - if compression_ext == "gz": - # If the log file is compressed, wrap it with a decompressor - fstream = gzip.open(log_path, "rb") - elif compression_ext: - raise SpackCommandError( - f"Unsupported storage format for {log_path}: {compression_ext}" - ) - - _dump_byte_stream_to_stdout(fstream) + stream = open(log_path, "rb") except OSError as e: if e.errno == errno.ENOENT: raise SpackCommandError(f"No logs are available for {cmdline_spec}") from e - elif e.errno == errno.EPERM: - raise SpackCommandError(f"Permission error accessing {log_path}") from e - else: - raise + raise SpackCommandError(f"Error reading logs for {cmdline_spec}: {e}") from e + + with stream as f: + ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False) + if ext and ext != "gz": + raise SpackCommandError(f"Unsupported storage format for {log_path}: {ext}") + + # If the log file is gzip compressed, wrap it with a decompressor + _dump_byte_stream_to_stdout(gzip.GzipFile(fileobj=f) if ext == "gz" else f) def logs(parser, args): diff --git a/lib/spack/spack/test/util/compression.py b/lib/spack/spack/test/util/compression.py index b3f5c15861..ab38da78ac 100644 --- a/lib/spack/spack/test/util/compression.py +++ b/lib/spack/spack/test/util/compression.py @@ -4,8 +4,10 @@ # SPDX-License-Identifier: (Apache-2.0 OR MIT) +import io import os import shutil +import tarfile from itertools import product import pytest @@ -14,7 +16,7 @@ import llnl.url from llnl.util.filesystem import working_dir from spack.paths import spack_root -from spack.util import compression as scomp +from spack.util import compression from spack.util.executable import CommandNotFoundError datadir = os.path.join(spack_root, "lib", "spack", "spack", "test", "data", "compression") @@ -30,15 +32,11 @@ ext_archive = {} native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key] -def support_stub(): - return False - - @pytest.fixture def compr_support_check(monkeypatch): - monkeypatch.setattr(scomp, "is_lzma_supported", support_stub) - monkeypatch.setattr(scomp, "is_gzip_supported", support_stub) - monkeypatch.setattr(scomp, "is_bz2_supported", support_stub) + monkeypatch.setattr(compression, "LZMA_SUPPORTED", False) + monkeypatch.setattr(compression, "GZIP_SUPPORTED", False) + monkeypatch.setattr(compression, "BZ2_SUPPORTED", False) @pytest.fixture @@ -59,7 +57,7 @@ def archive_file_and_extension(tmpdir_factory, request): ) def test_native_unpacking(tmpdir_factory, archive_file_and_extension): archive_file, extension = archive_file_and_extension - util = scomp.decompressor_for(archive_file, extension) + util = compression.decompressor_for(archive_file, extension) tmpdir = tmpdir_factory.mktemp("comp_test") with working_dir(str(tmpdir)): assert not os.listdir(os.getcwd()) @@ -78,7 +76,7 @@ def test_native_unpacking(tmpdir_factory, archive_file_and_extension): def test_system_unpacking(tmpdir_factory, archive_file_and_extension, compr_support_check): # actually run test archive_file, _ = archive_file_and_extension - util = scomp.decompressor_for(archive_file) + util = compression.decompressor_for(archive_file) tmpdir = tmpdir_factory.mktemp("system_comp_test") with working_dir(str(tmpdir)): assert not os.listdir(os.getcwd()) @@ -95,4 +93,25 @@ def test_unallowed_extension(): # are picked up by the linter and break style checks bad_ext_archive = "Foo.cxx" with pytest.raises(CommandNotFoundError): - scomp.decompressor_for(bad_ext_archive) + compression.decompressor_for(bad_ext_archive) + + +@pytest.mark.parametrize("ext", ["gz", "bz2", "xz"]) +def test_file_type_check_does_not_advance_stream(tmp_path, ext): + # Create a tarball compressed with the given format + path = str(tmp_path / "compressed_tarball") + + try: + with tarfile.open(path, f"w:{ext}") as tar: + tar.addfile(tarfile.TarInfo("test.txt"), fileobj=io.BytesIO(b"test")) + except tarfile.CompressionError: + pytest.skip(f"Cannot create tar.{ext} files") + + # Classify the file from its magic bytes, and check that the stream is not advanced + with open(path, "rb") as f: + computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False) + assert computed_ext == ext + assert f.tell() == 0 + computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=True) + assert computed_ext == f"tar.{ext}" + assert f.tell() == 0 diff --git a/lib/spack/spack/util/compression.py b/lib/spack/spack/util/compression.py index 1e66b3e205..f25841ee70 100644 --- a/lib/spack/spack/util/compression.py +++ b/lib/spack/spack/util/compression.py @@ -3,12 +3,13 @@ # # SPDX-License-Identifier: (Apache-2.0 OR MIT) +import errno import inspect import io import os -import re import shutil import sys +from typing import BinaryIO, Callable, Dict, List, Optional import llnl.url from llnl.util import tty @@ -19,42 +20,29 @@ from spack.util.executable import CommandNotFoundError, which try: import bz2 # noqa - _bz2_support = True + BZ2_SUPPORTED = True except ImportError: - _bz2_support = False + BZ2_SUPPORTED = False try: import gzip # noqa - _gzip_support = True + GZIP_SUPPORTED = True except ImportError: - _gzip_support = False + GZIP_SUPPORTED = False try: import lzma # noqa # novermin - _lzma_support = True + LZMA_SUPPORTED = True except ImportError: - _lzma_support = False + LZMA_SUPPORTED = False -def is_lzma_supported(): - return _lzma_support - - -def is_gzip_supported(): - return _gzip_support - - -def is_bz2_supported(): - return _bz2_support - - -def _system_untar(archive_file, remove_archive_file=False): - """Returns path to unarchived tar file. - Untars archive via system tar. +def _system_untar(archive_file: str, remove_archive_file: bool = False) -> str: + """Returns path to unarchived tar file. Untars archive via system tar. Args: archive_file (str): absolute path to the archive to be extracted. @@ -69,6 +57,11 @@ def _system_untar(archive_file, remove_archive_file=False): archive_file = archive_file_no_ext + "-input" shutil.move(archive_file_no_ext, archive_file) tar = which("tar", required=True) + # GNU tar's --no-same-owner is not as portable, -o works for BSD tar too. This flag is relevant + # when extracting archives as root, where tar attempts to set original ownership of files. This + # is redundant when distributing tarballs, as the tarballs are created on different systems + # than where they are extracted. In certain cases like rootless containers, setting original + # ownership is known to fail, so we need to disable it. tar.add_default_arg("-oxf") tar(archive_file) if remove_archive_file: @@ -79,21 +72,21 @@ def _system_untar(archive_file, remove_archive_file=False): return outfile -def _bunzip2(archive_file): +def _bunzip2(archive_file: str) -> str: """Returns path to decompressed file. Uses Python's bz2 module to decompress bz2 compressed archives Fall back to system utility failing to find Python module `bz2` Args: - archive_file (str): absolute path to the bz2 archive to be decompressed + archive_file: absolute path to the bz2 archive to be decompressed """ - if is_bz2_supported(): + if BZ2_SUPPORTED: return _py_bunzip(archive_file) else: return _system_bunzip(archive_file) -def _py_bunzip(archive_file): +def _py_bunzip(archive_file: str) -> str: """Returns path to decompressed file. Decompresses bz2 compressed archives/files via python's bz2 module""" decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2")) @@ -106,7 +99,7 @@ def _py_bunzip(archive_file): return archive_out -def _system_bunzip(archive_file): +def _system_bunzip(archive_file: str) -> str: """Returns path to decompressed file. Decompresses bz2 compressed archives/files via system bzip2 utility""" compressed_file_name = os.path.basename(archive_file) @@ -121,25 +114,20 @@ def _system_bunzip(archive_file): return archive_out -def _gunzip(archive_file): - """Returns path to gunzip'd file - Decompresses `.gz` extensions. Prefer native Python `gzip` module. - Failing back to system utility gunzip. - Like gunzip, but extracts in the current working directory - instead of in-place. +def _gunzip(archive_file: str) -> str: + """Returns path to gunzip'd file. Decompresses `.gz` extensions. Prefer native Python + `gzip` module. Falling back to system utility gunzip. Like gunzip, but extracts in the current + working directory instead of in-place. Args: - archive_file (str): absolute path of the file to be decompressed + archive_file: absolute path of the file to be decompressed """ - if is_gzip_supported(): - return _py_gunzip(archive_file) - else: - return _system_gunzip(archive_file) + return _py_gunzip(archive_file) if GZIP_SUPPORTED else _system_gunzip(archive_file) -def _py_gunzip(archive_file): - """Returns path to gunzip'd file - Decompresses `.gz` compressed archvies via python gzip module""" +def _py_gunzip(archive_file: str) -> str: + """Returns path to gunzip'd file. Decompresses `.gz` compressed archvies via python gzip + module""" decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "gz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) @@ -150,9 +138,8 @@ def _py_gunzip(archive_file): return destination_abspath -def _system_gunzip(archive_file): - """Returns path to gunzip'd file - Decompresses `.gz` compressed files via system gzip""" +def _system_gunzip(archive_file: str) -> str: + """Returns path to gunzip'd file. Decompresses `.gz` compressed files via system gzip""" archive_file_no_ext = llnl.url.strip_compression_extension(archive_file) if archive_file_no_ext == archive_file: # the zip file has no extension. On Unix gunzip cannot unzip onto itself @@ -170,50 +157,38 @@ def _system_gunzip(archive_file): return destination_abspath -def _unzip(archive_file): - """Returns path to extracted zip archive - Extract Zipfile, searching for unzip system executable - If unavailable, search for 'tar' executable on system and use instead +def _unzip(archive_file: str) -> str: + """Returns path to extracted zip archive. Extract Zipfile, searching for unzip system + executable. If unavailable, search for 'tar' executable on system and use instead. Args: - archive_file (str): absolute path of the file to be decompressed + archive_file: absolute path of the file to be decompressed """ - extracted_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="zip")) if sys.platform == "win32": return _system_untar(archive_file) - else: - exe = "unzip" - arg = "-q" - unzip = which(exe, required=True) - unzip.add_default_arg(arg) - unzip(archive_file) - return extracted_file + unzip = which("unzip", required=True) + unzip.add_default_arg("-q") + unzip(archive_file) + return os.path.basename(llnl.url.strip_extension(archive_file, extension="zip")) -def _system_unZ(archive_file): +def _system_unZ(archive_file: str) -> str: """Returns path to decompressed file Decompress UNIX compress style compression Utilizes gunzip on unix and 7zip on Windows """ if sys.platform == "win32": - result = _system_7zip(archive_file) - else: - result = _system_gunzip(archive_file) - return result + return _system_7zip(archive_file) + return _system_gunzip(archive_file) def _lzma_decomp(archive_file): - """Returns path to decompressed xz file. - Decompress lzma compressed files. Prefer Python native - lzma module, but fall back on command line xz tooling - to find available Python support.""" - if is_lzma_supported(): - return _py_lzma(archive_file) - else: - return _xz(archive_file) + """Returns path to decompressed xz file. Decompress lzma compressed files. Prefer Python native + lzma module, but fall back on command line xz tooling to find available Python support.""" + return _py_lzma(archive_file) if LZMA_SUPPORTED else _xz(archive_file) -def _win_compressed_tarball_handler(decompressor): +def _win_compressed_tarball_handler(decompressor: Callable[[str], str]) -> Callable[[str], str]: """Returns function pointer to two stage decompression and extraction method Decompress and extract compressed tarballs on Windows. @@ -227,7 +202,7 @@ def _win_compressed_tarball_handler(decompressor): can be installed manually or via spack """ - def unarchive(archive_file): + def unarchive(archive_file: str): # perform intermediate extraction step # record name of new archive so we can extract decomped_tarball = decompressor(archive_file) @@ -238,9 +213,9 @@ def _win_compressed_tarball_handler(decompressor): return unarchive -def _py_lzma(archive_file): - """Returns path to decompressed .xz files - Decompress lzma compressed .xz files via python lzma module""" +def _py_lzma(archive_file: str) -> str: + """Returns path to decompressed .xz files. Decompress lzma compressed .xz files via Python + lzma module.""" decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "xz")) archive_out = os.path.join(os.getcwd(), decompressed_file) with open(archive_out, "wb") as ar: @@ -250,10 +225,8 @@ def _py_lzma(archive_file): def _xz(archive_file): - """Returns path to decompressed xz files - Decompress lzma compressed .xz files via xz command line - tool. - """ + """Returns path to decompressed xz files. Decompress lzma compressed .xz files via xz command + line tool.""" decompressed_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="xz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) @@ -292,19 +265,17 @@ unable to extract %s files. 7z can be installed via Spack" return outfile -def decompressor_for(path, extension=None): +def decompressor_for(path: str, extension: Optional[str] = None): """Returns appropriate decompression/extraction algorithm function pointer for provided extension. If extension is none, it is computed from the `path` and the decompression function is derived from that information.""" if not extension: - extension = extension_from_file(path, decompress=True) + extension = extension_from_magic_numbers(path, decompress=True) - if not llnl.url.allowed_archive(extension): + if not extension or not llnl.url.allowed_archive(extension): raise CommandNotFoundError( - "Cannot extract archive, \ -unrecognized file extension: '%s'" - % extension + f"Cannot extract {path}, unrecognized file extension: '{extension}'" ) if sys.platform == "win32": return decompressor_for_win(extension) @@ -312,58 +283,37 @@ unrecognized file extension: '%s'" return decompressor_for_nix(extension) -def decompressor_for_nix(extension): - """Returns a function pointer to appropriate decompression - algorithm based on extension type and unix specific considerations - i.e. a reasonable expectation system utils like gzip, bzip2, and xz are - available +def decompressor_for_nix(extension: str) -> Callable[[str], str]: + """Returns a function pointer to appropriate decompression algorithm based on extension type + and unix specific considerations i.e. a reasonable expectation system utils like gzip, bzip2, + and xz are available Args: - path (str): path of the archive file requiring decompression + extension: path of the archive file requiring decompression """ - if re.match(r"zip$", extension): - return _unzip - - if re.match(r"gz$", extension): - return _gunzip + extension_to_decompressor: Dict[str, Callable[[str], str]] = { + "zip": _unzip, + "gz": _gunzip, + "bz2": _bunzip2, + "Z": _system_unZ, # no builtin support for .Z files + "xz": _lzma_decomp, + } - if re.match(r"bz2$", extension): - return _bunzip2 + return extension_to_decompressor.get(extension, _system_untar) - # Python does not have native support - # of any kind for .Z files. In these cases, - # we rely on external tools such as tar, - # 7z, or uncompressZ - if re.match(r"Z$", extension): - return _system_unZ - # Python and platform may not have support for lzma - # compression. If no lzma support, use tools available on systems - if re.match(r"xz$", extension): - return _lzma_decomp - - return _system_untar - - -def _determine_py_decomp_archive_strategy(extension): +def _determine_py_decomp_archive_strategy(extension: str) -> Optional[Callable[[str], str]]: """Returns appropriate python based decompression strategy based on extension type""" - # Only rely on Python decompression support for gz - if re.match(r"gz$", extension): - return _py_gunzip - - # Only rely on Python decompression support for bzip2 - if re.match(r"bz2$", extension): - return _py_bunzip - - # Only rely on Python decompression support for xz - if re.match(r"xz$", extension): - return _py_lzma - - return None + extension_to_decompressor: Dict[str, Callable[[str], str]] = { + "gz": _py_gunzip, + "bz2": _py_bunzip, + "xz": _py_lzma, + } + return extension_to_decompressor.get(extension, None) -def decompressor_for_win(extension): +def decompressor_for_win(extension: str) -> Callable[[str], str]: """Returns a function pointer to appropriate decompression algorithm based on extension type and Windows specific considerations @@ -371,34 +321,32 @@ def decompressor_for_win(extension): So we must rely exclusively on Python module support for all compression operations, tar for tarballs and zip files, and 7zip for Z compressed archives and files as Python does not provide support for the UNIX compress algorithm - - Args: - path (str): path of the archive file requiring decompression - extension (str): extension """ extension = llnl.url.expand_contracted_extension(extension) - # Windows native tar can handle .zip extensions, use standard - # unzip method - if re.match(r"zip$", extension): - return _unzip - - # if extension is standard tarball, invoke Windows native tar - if re.match(r"tar$", extension): - return _system_untar - - # Python does not have native support - # of any kind for .Z files. In these cases, - # we rely on 7zip, which must be installed outside - # of spack and added to the PATH or externally detected - if re.match(r"Z$", extension): - return _system_unZ - - # Windows vendors no native decompression tools, attempt to derive - # python based decompression strategy - # Expand extension from contracted extension i.e. tar.gz from .tgz - # no-op on non contracted extensions + extension_to_decompressor: Dict[str, Callable[[str], str]] = { + # Windows native tar can handle .zip extensions, use standard unzip method + "zip": _unzip, + # if extension is standard tarball, invoke Windows native tar + "tar": _system_untar, + # Python does not have native support of any kind for .Z files. In these cases, we rely on + # 7zip, which must be installed outside of Spack and added to the PATH or externally + # detected + "Z": _system_unZ, + "xz": _lzma_decomp, + } + + decompressor = extension_to_decompressor.get(extension) + if decompressor: + return decompressor + + # Windows vendors no native decompression tools, attempt to derive Python based decompression + # strategy. Expand extension from abbreviated ones, i.e. tar.gz from .tgz compression_extension = llnl.url.compression_ext_from_compressed_archive(extension) - decompressor = _determine_py_decomp_archive_strategy(compression_extension) + decompressor = ( + _determine_py_decomp_archive_strategy(compression_extension) + if compression_extension + else None + ) if not decompressor: raise SpackError( "Spack was unable to determine a proper decompression strategy for" @@ -412,103 +360,75 @@ def decompressor_for_win(extension): class FileTypeInterface: - """ - Base interface class for describing and querying file type information. - FileType describes information about a single file type - such as extension, and byte header properties, and provides an interface - to check a given file against said type based on magic number. - - This class should be subclassed each time a new type is to be - described. - - Note: This class should not be used directly as it does not define any specific - file. Attempts to directly use this class will fail, as it does not define - a magic number or extension string. - - Subclasses should each describe a different - type of file. In order to do so, they must define - the extension string, magic number, and header offset (if non zero). - If a class has multiple magic numbers, it will need to - override the method describin that file types magic numbers and - the method that checks a types magic numbers against a given file's. - """ + """Base interface class for describing and querying file type information. FileType describes + information about a single file type such as typical extension and byte header properties, + and provides an interface to check a given file against said type based on magic number. - OFFSET = 0 - compressed = False + This class should be subclassed each time a new type is to be described. + + Subclasses should each describe a different type of file. In order to do so, they must define + the extension string, magic number, and header offset (if non zero). If a class has multiple + magic numbers, it will need to override the method describing that file type's magic numbers + and the method that checks a types magic numbers against a given file's.""" - @staticmethod - def name(): - raise NotImplementedError + OFFSET = 0 + extension: str + name: str @classmethod - def magic_number(cls): + def magic_numbers(cls) -> List[bytes]: """Return a list of all potential magic numbers for a filetype""" - return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")] + return [ + value for name, value in inspect.getmembers(cls) if name.startswith("_MAGIC_NUMBER") + ] @classmethod - def header_size(cls): + def header_size(cls) -> int: """Return size of largest magic number associated with file type""" - return max([len(x) for x in cls.magic_number()]) + return max(len(x) for x in cls.magic_numbers()) - @classmethod - def _bytes_check(cls, magic_bytes): - for magic in cls.magic_number(): - if magic_bytes.startswith(magic): - return True - return False - - @classmethod - def is_file_of_type(cls, iostream): - """Query byte stream for appropriate magic number + def matches_magic(self, stream: BinaryIO) -> bool: + """Returns true if the stream matches the current file type by any of its magic numbers. + Resets stream to original position. Args: - iostream: file byte stream - - Returns: - Bool denoting whether file is of class file type - based on magic number + stream: file byte stream """ - if not iostream: - return False # move to location of magic bytes - iostream.seek(cls.OFFSET) - magic_bytes = iostream.read(cls.header_size()) - # return to beginning of file - iostream.seek(0) - if cls._bytes_check(magic_bytes): - return True - return False + offset = stream.tell() + stream.seek(self.OFFSET) + magic_bytes = stream.read(self.header_size()) + stream.seek(offset) + return any(magic_bytes.startswith(magic) for magic in self.magic_numbers()) class CompressedFileTypeInterface(FileTypeInterface): """Interface class for FileTypes that include compression information""" - compressed = True + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + """This method returns the first num_bytes of a decompressed stream. Returns None if no + builtin support for decompression.""" + return None - @staticmethod - def decomp_in_memory(stream): - """This method decompresses and loads the first 200 or so bytes of a compressed file - to check for compressed archives. This does not decompress the entire file and should - not be used for direct expansion of archives/compressed files - """ - raise NotImplementedError("Implementation by compression subclass required") + +def _decompressed_peek( + decompressed_stream: io.BufferedIOBase, stream: BinaryIO, num_bytes: int +) -> io.BytesIO: + # Read the first num_bytes of the decompressed stream, do not advance the stream position. + pos = stream.tell() + data = decompressed_stream.read(num_bytes) + stream.seek(pos) + return io.BytesIO(data) class BZipFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\x42\x5a\x68" extension = "bz2" + name = "bzip2 compressed data" - @staticmethod - def name(): - return "bzip2 compressed data" - - @staticmethod - def decomp_in_memory(stream): - if is_bz2_supported(): - # checking for underlying archive, only decomp as many bytes - # as is absolutely neccesary for largest archive header (tar) - comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size()) - return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream)) + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + if BZ2_SUPPORTED: + return _decompressed_peek(bz2.BZ2File(stream), stream, num_bytes) return None @@ -516,57 +436,28 @@ class ZCompressedFileType(CompressedFileTypeInterface): _MAGIC_NUMBER_LZW = b"\x1f\x9d" _MAGIC_NUMBER_LZH = b"\x1f\xa0" extension = "Z" - - @staticmethod - def name(): - return "compress'd data" - - @staticmethod - def decomp_in_memory(stream): - # python has no method of decompressing `.Z` files in memory - return None + name = "compress'd data" class GZipFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\x1f\x8b\x08" extension = "gz" + name = "gzip compressed data" - @staticmethod - def name(): - return "gzip compressed data" - - @staticmethod - def decomp_in_memory(stream): - if is_gzip_supported(): - # checking for underlying archive, only decomp as many bytes - # as is absolutely neccesary for largest archive header (tar) - return io.BytesIO( - initial_bytes=gzip.GzipFile(fileobj=stream).read( - TarFileType.OFFSET + TarFileType.header_size() - ) - ) + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + if GZIP_SUPPORTED: + return _decompressed_peek(gzip.GzipFile(fileobj=stream), stream, num_bytes) return None class LzmaFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\xfd7zXZ" extension = "xz" + name = "xz compressed data" - @staticmethod - def name(): - return "xz compressed data" - - @staticmethod - def decomp_in_memory(stream): - if is_lzma_supported(): - # checking for underlying archive, only decomp as many bytes - # as is absolutely neccesary for largest archive header (tar) - max_size = TarFileType.OFFSET + TarFileType.header_size() - return io.BytesIO( - initial_bytes=lzma.LZMADecompressor().decompress( - stream.read(max_size), max_length=max_size - ) - ) + def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]: + if LZMA_SUPPORTED: + return _decompressed_peek(lzma.LZMAFile(stream), stream, num_bytes) return None @@ -575,111 +466,111 @@ class TarFileType(FileTypeInterface): _MAGIC_NUMBER_GNU = b"ustar \0" _MAGIC_NUMBER_POSIX = b"ustar\x0000" extension = "tar" - - @staticmethod - def name(): - return "tar archive" + name = "tar archive" class ZipFleType(FileTypeInterface): _MAGIC_NUMBER = b"PK\003\004" extension = "zip" + name = "Zip archive data" - @staticmethod - def name(): - return "Zip archive data" +#: Maximum number of bytes to read from a file to determine any archive type. Tar is the largest. +MAX_BYTES_ARCHIVE_HEADER = TarFileType.OFFSET + TarFileType.header_size() -# collection of valid Spack recognized archive and compression -# file type identifier classes. -VALID_FILETYPES = [ - BZipFileType, - ZCompressedFileType, - GZipFileType, - LzmaFileType, - TarFileType, - ZipFleType, +#: Collection of supported archive and compression file type identifier classes. +SUPPORTED_FILETYPES: List[FileTypeInterface] = [ + BZipFileType(), + ZCompressedFileType(), + GZipFileType(), + LzmaFileType(), + TarFileType(), + ZipFleType(), ] -def extension_from_stream(stream, decompress=False): - """Return extension represented by stream corresponding to archive file - If stream does not represent an archive type recongized by Spack - (see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None +def _extension_of_compressed_file( + file_type: CompressedFileTypeInterface, stream: BinaryIO +) -> Optional[str]: + """Retrieves the extension of a file after decompression from its magic numbers, if it can be + decompressed.""" + # To classify the file we only need to decompress the first so many bytes. + decompressed_magic = file_type.peek(stream, MAX_BYTES_ARCHIVE_HEADER) - Extension type is derived by searching for identifying bytes - in file stream. + if not decompressed_magic: + return None - Args: - stream : stream representing a file on system - decompress (bool) : if True, compressed files are checked - for archive types beneath compression i.e. tar.gz - default is False, otherwise, return top level type i.e. gz + return extension_from_magic_numbers_by_stream(decompressed_magic, decompress=False) - Return: - A string represting corresponding archive extension - or None as relevant. - """ - for arc_type in VALID_FILETYPES: - if arc_type.is_file_of_type(stream): - suffix_ext = arc_type.extension - prefix_ext = "" - if arc_type.compressed and decompress: - # stream represents compressed file - # get decompressed stream (if possible) - decomp_stream = arc_type.decomp_in_memory(stream) - prefix_ext = extension_from_stream(decomp_stream, decompress=decompress) - if not prefix_ext: - # We were unable to decompress or unable to derive - # a nested extension from decompressed file. - # Try to use filename parsing to check for - # potential nested extensions if there are any - tty.debug( - "Cannot derive file extension from magic number;" - " falling back to regex path parsing." - ) - return llnl.url.extension_from_path(stream.name) - resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext]) - tty.debug("File extension %s successfully derived by magic number." % resultant_ext) - return resultant_ext +def extension_from_magic_numbers_by_stream( + stream: BinaryIO, decompress: bool = False +) -> Optional[str]: + """Returns the typical extension for the opened file, without leading ``.``, based on its magic + numbers. + + If the stream does not represent file type recongized by Spack (see + :py:data:`SUPPORTED_FILETYPES`), the method will return None + + Args: + stream: stream representing a file on system + decompress: if True, compressed files are checked for archive types beneath compression. + For example tar.gz if True versus only gz if False.""" + for file_type in SUPPORTED_FILETYPES: + if not file_type.matches_magic(stream): + continue + ext = file_type.extension + if decompress and isinstance(file_type, CompressedFileTypeInterface): + uncompressed_ext = _extension_of_compressed_file(file_type, stream) + if not uncompressed_ext: + tty.debug( + "Cannot derive file extension from magic number;" + " falling back to original file name." + ) + return llnl.url.extension_from_path(stream.name) + ext = f"{uncompressed_ext}.{ext}" + tty.debug(f"File extension {ext} successfully derived by magic number.") + return ext return None -def extension_from_file(file, decompress=False): - """Return extension from archive file path - Extension is derived based on magic number parsing similar - to the `file` utility. Attempts to return abbreviated file extensions - whenever a file has an abbreviated extension such as `.tgz` or `.txz`. - This distinction in abbreivated extension names is accomplished - by string parsing. +def _maybe_abbreviate_extension(path: str, extension: str) -> str: + """If the file is a compressed tar archive, return the abbreviated extension t[xz|gz|bz2|bz] + instead of tar.[xz|gz|bz2|bz] if the file's original name also has an abbreviated extension.""" + if not extension.startswith("tar."): + return extension + abbr = f"t{extension[4:]}" + return abbr if llnl.url.has_extension(path, abbr) else extension + + +def extension_from_magic_numbers(path: str, decompress: bool = False) -> Optional[str]: + """Return typical extension without leading ``.`` of a compressed file or archive at the given + path, based on its magic numbers, similar to the `file` utility. Notice that the extension + returned from this function may not coincide with the file's given extension. Args: - file (os.PathLike): path descibing file on system for which ext - will be determined. - decompress (bool): If True, method will peek into compressed - files to check for archive file types. default is False. - If false, method will be unable to distinguish `.tar.gz` from `.gz` - or similar. - Return: - Spack recognized archive file extension as determined by file's magic number and - file name. If file is not on system or is of an type not recognized by Spack as - an archive or compression type, None is returned. + path: file to determine extension of + decompress: If True, method will peek into decompressed file to check for archive file + types. If False, the method will return only the top-level extension (for example + ``gz`` and not ``tar.gz``). + Returns: + Spack recognized archive file extension as determined by file's magic number and file name. + If file is not on system or is of a type not recognized by Spack as an archive or + compression type, None is returned. If the file is classified as a compressed tarball, the + extension is abbreviated (for instance ``tgz`` not ``tar.gz``) if that matches the file's + given extension. """ - if os.path.exists(file): - with open(file, "rb") as f: - ext = extension_from_stream(f, decompress) - # based on magic number, file is compressed - # tar archive. Check to see if file is abbreviated as - # t[xz|gz|bz2|bz] - if ext and ext.startswith("tar."): - suf = ext.split(".")[1] - abbr = "t" + suf - if llnl.url.has_extension(file, abbr): - return abbr - if not ext: - # If unable to parse extension from stream, - # attempt to fall back to string parsing - ext = llnl.url.extension_from_path(file) - return ext - return None + try: + with open(path, "rb") as f: + ext = extension_from_magic_numbers_by_stream(f, decompress) + except OSError as e: + if e.errno == errno.ENOENT: + return None + raise + + # Return the extension derived from the magic number if possible. + if ext: + return _maybe_abbreviate_extension(path, ext) + + # Otherwise, use the extension from the file name. + return llnl.url.extension_from_path(path) |