1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
|
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import inspect
import io
import os
import re
import shutil
import sys
import llnl.url
from llnl.util import tty
from spack.error import SpackError
from spack.util.executable import CommandNotFoundError, which
try:
import bz2 # noqa
_bz2_support = True
except ImportError:
_bz2_support = False
try:
import gzip # noqa
_gzip_support = True
except ImportError:
_gzip_support = False
try:
import lzma # noqa # novermin
_lzma_support = True
except ImportError:
_lzma_support = False
def is_lzma_supported():
return _lzma_support
def is_gzip_supported():
return _gzip_support
def is_bz2_supported():
return _bz2_support
def _system_untar(archive_file, remove_archive_file=False):
"""Returns path to unarchived tar file.
Untars archive via system tar.
Args:
archive_file (str): absolute path to the archive to be extracted.
Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz).
"""
archive_file_no_ext = llnl.url.strip_extension(archive_file)
outfile = os.path.basename(archive_file_no_ext)
if archive_file_no_ext == archive_file:
# the archive file has no extension. Tar on windows cannot untar onto itself
# archive_file can be a tar file (which causes the problem on windows) but it can
# also have other extensions (on Unix) such as tgz, tbz2, ...
archive_file = archive_file_no_ext + "-input"
shutil.move(archive_file_no_ext, archive_file)
tar = which("tar", required=True)
tar.add_default_arg("-oxf")
tar(archive_file)
if remove_archive_file:
# remove input file to prevent two stage
# extractions from being treated as exploding
# archives by the fetcher
os.remove(archive_file)
return outfile
def _bunzip2(archive_file):
"""Returns path to decompressed file.
Uses Python's bz2 module to decompress bz2 compressed archives
Fall back to system utility failing to find Python module `bz2`
Args:
archive_file (str): absolute path to the bz2 archive to be decompressed
"""
if is_bz2_supported():
return _py_bunzip(archive_file)
else:
return _system_bunzip(archive_file)
def _py_bunzip(archive_file):
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via python's bz2 module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2"))
working_dir = os.getcwd()
archive_out = os.path.join(working_dir, decompressed_file)
f_bz = bz2.BZ2File(archive_file, mode="rb")
with open(archive_out, "wb") as ar:
shutil.copyfileobj(f_bz, ar)
f_bz.close()
return archive_out
def _system_bunzip(archive_file):
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via system bzip2 utility"""
compressed_file_name = os.path.basename(archive_file)
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2"))
working_dir = os.getcwd()
archive_out = os.path.join(working_dir, decompressed_file)
copy_path = os.path.join(working_dir, compressed_file_name)
shutil.copy(archive_file, copy_path)
bunzip2 = which("bunzip2", required=True)
bunzip2.add_default_arg("-q")
bunzip2(copy_path)
return archive_out
def _gunzip(archive_file):
"""Returns path to gunzip'd file
Decompresses `.gz` extensions. Prefer native Python `gzip` module.
Failing back to system utility gunzip.
Like gunzip, but extracts in the current working directory
instead of in-place.
Args:
archive_file (str): absolute path of the file to be decompressed
"""
if is_gzip_supported():
return _py_gunzip(archive_file)
else:
return _system_gunzip(archive_file)
def _py_gunzip(archive_file):
"""Returns path to gunzip'd file
Decompresses `.gz` compressed archvies via python gzip module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
f_in = gzip.open(archive_file, "rb")
with open(destination_abspath, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
f_in.close()
return destination_abspath
def _system_gunzip(archive_file):
"""Returns path to gunzip'd file
Decompresses `.gz` compressed files via system gzip"""
archive_file_no_ext = llnl.url.strip_compression_extension(archive_file)
if archive_file_no_ext == archive_file:
# the zip file has no extension. On Unix gunzip cannot unzip onto itself
archive_file = archive_file + ".gz"
shutil.move(archive_file_no_ext, archive_file)
decompressed_file = os.path.basename(archive_file_no_ext)
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
copy_path = os.path.join(working_dir, compressed_file)
shutil.copy(archive_file, copy_path)
gzip = which("gzip", required=True)
gzip.add_default_arg("-d")
gzip(copy_path)
return destination_abspath
def _unzip(archive_file):
"""Returns path to extracted zip archive
Extract Zipfile, searching for unzip system executable
If unavailable, search for 'tar' executable on system and use instead
Args:
archive_file (str): absolute path of the file to be decompressed
"""
extracted_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="zip"))
if sys.platform == "win32":
return _system_untar(archive_file)
else:
exe = "unzip"
arg = "-q"
unzip = which(exe, required=True)
unzip.add_default_arg(arg)
unzip(archive_file)
return extracted_file
def _system_unZ(archive_file):
"""Returns path to decompressed file
Decompress UNIX compress style compression
Utilizes gunzip on unix and 7zip on Windows
"""
if sys.platform == "win32":
result = _system_7zip(archive_file)
else:
result = _system_gunzip(archive_file)
return result
def _lzma_decomp(archive_file):
"""Returns path to decompressed xz file.
Decompress lzma compressed files. Prefer Python native
lzma module, but fall back on command line xz tooling
to find available Python support."""
if is_lzma_supported():
return _py_lzma(archive_file)
else:
return _xz(archive_file)
def _win_compressed_tarball_handler(decompressor):
"""Returns function pointer to two stage decompression
and extraction method
Decompress and extract compressed tarballs on Windows.
This method uses a decompression method in conjunction with
the tar utility to perform decompression and extraction in
a two step process first using decompressor to decompress,
and tar to extract.
The motivation for this method is Windows tar utility's lack
of access to the xz tool (unsupported natively on Windows) but
can be installed manually or via spack
"""
def unarchive(archive_file):
# perform intermediate extraction step
# record name of new archive so we can extract
decomped_tarball = decompressor(archive_file)
# run tar on newly decomped archive
outfile = _system_untar(decomped_tarball, remove_archive_file=True)
return outfile
return unarchive
def _py_lzma(archive_file):
"""Returns path to decompressed .xz files
Decompress lzma compressed .xz files via python lzma module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar:
with lzma.open(archive_file) as lar:
shutil.copyfileobj(lar, ar)
return archive_out
def _xz(archive_file):
"""Returns path to decompressed xz files
Decompress lzma compressed .xz files via xz command line
tool.
"""
decompressed_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="xz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
copy_path = os.path.join(working_dir, compressed_file)
shutil.copy(archive_file, copy_path)
xz = which("xz", required=True)
xz.add_default_arg("-d")
xz(copy_path)
return destination_abspath
def _system_7zip(archive_file):
"""Returns path to decompressed file
Unpack/decompress with 7z executable
7z is able to handle a number file extensions however
it may not be available on system.
Without 7z, Windows users with certain versions of Python may
be unable to extract .xz files, and all Windows users will be unable
to extract .Z files. If we cannot find 7z either externally or a
Spack installed copy, we fail, but inform the user that 7z can
be installed via `spack install 7zip`
Args:
archive_file (str): absolute path of file to be unarchived
"""
outfile = os.path.basename(llnl.url.strip_compression_extension(archive_file))
_7z = which("7z")
if not _7z:
raise CommandNotFoundError(
"7z unavailable,\
unable to extract %s files. 7z can be installed via Spack"
% llnl.url.extension_from_path(archive_file)
)
_7z.add_default_arg("e")
_7z(archive_file)
return outfile
def decompressor_for(path, extension=None):
"""Returns appropriate decompression/extraction algorithm function pointer
for provided extension. If extension is none, it is computed
from the `path` and the decompression function is derived
from that information."""
if not extension:
extension = extension_from_file(path, decompress=True)
if not llnl.url.allowed_archive(extension):
raise CommandNotFoundError(
"Cannot extract archive, \
unrecognized file extension: '%s'"
% extension
)
if sys.platform == "win32":
return decompressor_for_win(extension)
else:
return decompressor_for_nix(extension)
def decompressor_for_nix(extension):
"""Returns a function pointer to appropriate decompression
algorithm based on extension type and unix specific considerations
i.e. a reasonable expectation system utils like gzip, bzip2, and xz are
available
Args:
path (str): path of the archive file requiring decompression
"""
if re.match(r"zip$", extension):
return _unzip
if re.match(r"gz$", extension):
return _gunzip
if re.match(r"bz2$", extension):
return _bunzip2
# Python does not have native support
# of any kind for .Z files. In these cases,
# we rely on external tools such as tar,
# 7z, or uncompressZ
if re.match(r"Z$", extension):
return _system_unZ
# Python and platform may not have support for lzma
# compression. If no lzma support, use tools available on systems
if re.match(r"xz$", extension):
return _lzma_decomp
return _system_untar
def _determine_py_decomp_archive_strategy(extension):
"""Returns appropriate python based decompression strategy
based on extension type"""
# Only rely on Python decompression support for gz
if re.match(r"gz$", extension):
return _py_gunzip
# Only rely on Python decompression support for bzip2
if re.match(r"bz2$", extension):
return _py_bunzip
# Only rely on Python decompression support for xz
if re.match(r"xz$", extension):
return _py_lzma
return None
def decompressor_for_win(extension):
"""Returns a function pointer to appropriate decompression
algorithm based on extension type and Windows specific considerations
Windows natively vendors *only* tar, no other archive/compression utilities
So we must rely exclusively on Python module support for all compression
operations, tar for tarballs and zip files, and 7zip for Z compressed archives
and files as Python does not provide support for the UNIX compress algorithm
Args:
path (str): path of the archive file requiring decompression
extension (str): extension
"""
extension = llnl.url.expand_contracted_extension(extension)
# Windows native tar can handle .zip extensions, use standard
# unzip method
if re.match(r"zip$", extension):
return _unzip
# if extension is standard tarball, invoke Windows native tar
if re.match(r"tar$", extension):
return _system_untar
# Python does not have native support
# of any kind for .Z files. In these cases,
# we rely on 7zip, which must be installed outside
# of spack and added to the PATH or externally detected
if re.match(r"Z$", extension):
return _system_unZ
# Windows vendors no native decompression tools, attempt to derive
# python based decompression strategy
# Expand extension from contracted extension i.e. tar.gz from .tgz
# no-op on non contracted extensions
compression_extension = llnl.url.compression_ext_from_compressed_archive(extension)
decompressor = _determine_py_decomp_archive_strategy(compression_extension)
if not decompressor:
raise SpackError(
"Spack was unable to determine a proper decompression strategy for"
f"valid extension: {extension}"
"This is a bug, please file an issue at https://github.com/spack/spack/issues"
)
if "tar" not in extension:
return decompressor
return _win_compressed_tarball_handler(decompressor)
class FileTypeInterface:
"""
Base interface class for describing and querying file type information.
FileType describes information about a single file type
such as extension, and byte header properties, and provides an interface
to check a given file against said type based on magic number.
This class should be subclassed each time a new type is to be
described.
Note: This class should not be used directly as it does not define any specific
file. Attempts to directly use this class will fail, as it does not define
a magic number or extension string.
Subclasses should each describe a different
type of file. In order to do so, they must define
the extension string, magic number, and header offset (if non zero).
If a class has multiple magic numbers, it will need to
override the method describin that file types magic numbers and
the method that checks a types magic numbers against a given file's.
"""
OFFSET = 0
compressed = False
@staticmethod
def name():
raise NotImplementedError
@classmethod
def magic_number(cls):
"""Return a list of all potential magic numbers for a filetype"""
return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")]
@classmethod
def header_size(cls):
"""Return size of largest magic number associated with file type"""
return max([len(x) for x in cls.magic_number()])
@classmethod
def _bytes_check(cls, magic_bytes):
for magic in cls.magic_number():
if magic_bytes.startswith(magic):
return True
return False
@classmethod
def is_file_of_type(cls, iostream):
"""Query byte stream for appropriate magic number
Args:
iostream: file byte stream
Returns:
Bool denoting whether file is of class file type
based on magic number
"""
if not iostream:
return False
# move to location of magic bytes
iostream.seek(cls.OFFSET)
magic_bytes = iostream.read(cls.header_size())
# return to beginning of file
iostream.seek(0)
if cls._bytes_check(magic_bytes):
return True
return False
class CompressedFileTypeInterface(FileTypeInterface):
"""Interface class for FileTypes that include compression information"""
compressed = True
@staticmethod
def decomp_in_memory(stream):
"""This method decompresses and loads the first 200 or so bytes of a compressed file
to check for compressed archives. This does not decompress the entire file and should
not be used for direct expansion of archives/compressed files
"""
raise NotImplementedError("Implementation by compression subclass required")
class BZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x42\x5a\x68"
extension = "bz2"
@staticmethod
def name():
return "bzip2 compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_bz2_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size())
return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream))
return None
class ZCompressedFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER_LZW = b"\x1f\x9d"
_MAGIC_NUMBER_LZH = b"\x1f\xa0"
extension = "Z"
@staticmethod
def name():
return "compress'd data"
@staticmethod
def decomp_in_memory(stream):
# python has no method of decompressing `.Z` files in memory
return None
class GZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x1f\x8b\x08"
extension = "gz"
@staticmethod
def name():
return "gzip compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_gzip_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
return io.BytesIO(
initial_bytes=gzip.GzipFile(fileobj=stream).read(
TarFileType.OFFSET + TarFileType.header_size()
)
)
return None
class LzmaFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\xfd7zXZ"
extension = "xz"
@staticmethod
def name():
return "xz compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_lzma_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
max_size = TarFileType.OFFSET + TarFileType.header_size()
return io.BytesIO(
initial_bytes=lzma.LZMADecompressor().decompress(
stream.read(max_size), max_length=max_size
)
)
return None
class TarFileType(FileTypeInterface):
OFFSET = 257
_MAGIC_NUMBER_GNU = b"ustar \0"
_MAGIC_NUMBER_POSIX = b"ustar\x0000"
extension = "tar"
@staticmethod
def name():
return "tar archive"
class ZipFleType(FileTypeInterface):
_MAGIC_NUMBER = b"PK\003\004"
extension = "zip"
@staticmethod
def name():
return "Zip archive data"
# collection of valid Spack recognized archive and compression
# file type identifier classes.
VALID_FILETYPES = [
BZipFileType,
ZCompressedFileType,
GZipFileType,
LzmaFileType,
TarFileType,
ZipFleType,
]
def extension_from_stream(stream, decompress=False):
"""Return extension represented by stream corresponding to archive file
If stream does not represent an archive type recongized by Spack
(see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None
Extension type is derived by searching for identifying bytes
in file stream.
Args:
stream : stream representing a file on system
decompress (bool) : if True, compressed files are checked
for archive types beneath compression i.e. tar.gz
default is False, otherwise, return top level type i.e. gz
Return:
A string represting corresponding archive extension
or None as relevant.
"""
for arc_type in VALID_FILETYPES:
if arc_type.is_file_of_type(stream):
suffix_ext = arc_type.extension
prefix_ext = ""
if arc_type.compressed and decompress:
# stream represents compressed file
# get decompressed stream (if possible)
decomp_stream = arc_type.decomp_in_memory(stream)
prefix_ext = extension_from_stream(decomp_stream, decompress=decompress)
if not prefix_ext:
# We were unable to decompress or unable to derive
# a nested extension from decompressed file.
# Try to use filename parsing to check for
# potential nested extensions if there are any
tty.debug(
"Cannot derive file extension from magic number;"
" falling back to regex path parsing."
)
return llnl.url.extension_from_path(stream.name)
resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext])
tty.debug("File extension %s successfully derived by magic number." % resultant_ext)
return resultant_ext
return None
def extension_from_file(file, decompress=False):
"""Return extension from archive file path
Extension is derived based on magic number parsing similar
to the `file` utility. Attempts to return abbreviated file extensions
whenever a file has an abbreviated extension such as `.tgz` or `.txz`.
This distinction in abbreivated extension names is accomplished
by string parsing.
Args:
file (os.PathLike): path descibing file on system for which ext
will be determined.
decompress (bool): If True, method will peek into compressed
files to check for archive file types. default is False.
If false, method will be unable to distinguish `.tar.gz` from `.gz`
or similar.
Return:
Spack recognized archive file extension as determined by file's magic number and
file name. If file is not on system or is of an type not recognized by Spack as
an archive or compression type, None is returned.
"""
if os.path.exists(file):
with open(file, "rb") as f:
ext = extension_from_stream(f, decompress)
# based on magic number, file is compressed
# tar archive. Check to see if file is abbreviated as
# t[xz|gz|bz2|bz]
if ext and ext.startswith("tar."):
suf = ext.split(".")[1]
abbr = "t" + suf
if llnl.url.has_extension(file, abbr):
return abbr
if not ext:
# If unable to parse extension from stream,
# attempt to fall back to string parsing
ext = llnl.url.extension_from_path(file)
return ext
return None
|