summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMassimiliano Culpo <massimiliano.culpo@gmail.com>2023-01-10 14:23:42 +0100
committerGitHub <noreply@github.com>2023-01-10 14:23:42 +0100
commit9d00e7d15d5da43cd098f695f15d0ee4d1aeaedc (patch)
treede1e797c25c80a375ec6a889994a3ac6339e6db6 /lib
parentcc333b600c797264aec7ca6f0d77cff1fb658c38 (diff)
downloadspack-9d00e7d15d5da43cd098f695f15d0ee4d1aeaedc.tar.gz
spack-9d00e7d15d5da43cd098f695f15d0ee4d1aeaedc.tar.bz2
spack-9d00e7d15d5da43cd098f695f15d0ee4d1aeaedc.tar.xz
spack-9d00e7d15d5da43cd098f695f15d0ee4d1aeaedc.zip
Remove **kwargs from function signatures in llnl.util.filesystem (#34804)
Since we dropped support for Python 2.7, we can embrace using keyword only arguments for many functions in Spack that use **kwargs in the function signature. Here this is done for the llnl.util.filesystem module. There were a couple of bugs lurking in the code related to typo-like errors when retrieving from kwargs. Those have been fixed as well.
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/llnl/util/filesystem.py188
1 files changed, 119 insertions, 69 deletions
diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py
index 8e664cc0a9..219a13192d 100644
--- a/lib/spack/llnl/util/filesystem.py
+++ b/lib/spack/llnl/util/filesystem.py
@@ -17,6 +17,7 @@ import sys
import tempfile
from contextlib import contextmanager
from sys import platform as _platform
+from typing import Callable, List, Match, Optional, Tuple, Union
from llnl.util import tty
from llnl.util.lang import dedupe, memoized
@@ -214,7 +215,16 @@ def same_path(path1, path2):
return norm1 == norm2
-def filter_file(regex, repl, *filenames, **kwargs):
+def filter_file(
+ regex: str,
+ repl: Union[str, Callable[[Match], str]],
+ *filenames: str,
+ string: bool = False,
+ backup: bool = False,
+ ignore_absent: bool = False,
+ start_at: Optional[str] = None,
+ stop_at: Optional[str] = None,
+) -> None:
r"""Like sed, but uses python regular expressions.
Filters every line of each file through regex and replaces the file
@@ -226,12 +236,10 @@ def filter_file(regex, repl, *filenames, **kwargs):
can contain ``\1``, ``\2``, etc. to represent back-substitution
as sed would allow.
- Parameters:
+ Args:
regex (str): The regular expression to search for
repl (str): The string to replace matches with
*filenames: One or more files to search and replace
-
- Keyword Arguments:
string (bool): Treat regex as a plain string. Default it False
backup (bool): Make backup file(s) suffixed with ``~``. Default is False
ignore_absent (bool): Ignore any files that don't exist.
@@ -246,17 +254,11 @@ def filter_file(regex, repl, *filenames, **kwargs):
file is copied verbatim. Default is to filter until the end of the
file.
"""
- string = kwargs.get("string", False)
- backup = kwargs.get("backup", False)
- ignore_absent = kwargs.get("ignore_absent", False)
- start_at = kwargs.get("start_at", None)
- stop_at = kwargs.get("stop_at", None)
-
# Allow strings to use \1, \2, etc. for replacement, like sed
if not callable(repl):
unescaped = repl.replace(r"\\", "\\")
- def replace_groups_with_groupid(m):
+ def replace_groups_with_groupid(m: Match) -> str:
def groupid_to_group(x):
return m.group(int(x.group(1)))
@@ -290,15 +292,14 @@ def filter_file(regex, repl, *filenames, **kwargs):
shutil.copy(filename, tmp_filename)
try:
- # To avoid translating line endings (\n to \r\n and vis versa)
+ # Open as a text file and filter until the end of the file is
+ # reached, or we found a marker in the line if it was specified
+ #
+ # To avoid translating line endings (\n to \r\n and vice-versa)
# we force os.open to ignore translations and use the line endings
# the file comes with
- extra_kwargs = {"errors": "surrogateescape", "newline": ""}
-
- # Open as a text file and filter until the end of the file is
- # reached or we found a marker in the line if it was specified
- with open(tmp_filename, mode="r", **extra_kwargs) as input_file:
- with open(filename, mode="w", **extra_kwargs) as output_file:
+ with open(tmp_filename, mode="r", errors="surrogateescape", newline="") as input_file:
+ with open(filename, mode="w", errors="surrogateescape", newline="") as output_file:
do_filtering = start_at is None
# Using iter and readline is a workaround needed not to
# disable input_file.tell(), which will happen if we call
@@ -321,10 +322,10 @@ def filter_file(regex, repl, *filenames, **kwargs):
# If we stopped filtering at some point, reopen the file in
# binary mode and copy verbatim the remaining part
if current_position and stop_at:
- with open(tmp_filename, mode="rb") as input_file:
- input_file.seek(current_position)
- with open(filename, mode="ab") as output_file:
- output_file.writelines(input_file.readlines())
+ with open(tmp_filename, mode="rb") as input_binary_buffer:
+ input_binary_buffer.seek(current_position)
+ with open(filename, mode="ab") as output_binary_buffer:
+ output_binary_buffer.writelines(input_binary_buffer.readlines())
except BaseException:
# clean up the original file on failure.
@@ -343,8 +344,26 @@ class FileFilter(object):
def __init__(self, *filenames):
self.filenames = filenames
- def filter(self, regex, repl, **kwargs):
- return filter_file(regex, repl, *self.filenames, **kwargs)
+ def filter(
+ self,
+ regex: str,
+ repl: Union[str, Callable[[Match], str]],
+ string: bool = False,
+ backup: bool = False,
+ ignore_absent: bool = False,
+ start_at: Optional[str] = None,
+ stop_at: Optional[str] = None,
+ ) -> None:
+ return filter_file(
+ regex,
+ repl,
+ *self.filenames,
+ string=string,
+ backup=backup,
+ ignore_absent=ignore_absent,
+ start_at=start_at,
+ stop_at=stop_at,
+ )
def change_sed_delimiter(old_delim, new_delim, *filenames):
@@ -652,7 +671,13 @@ def resolve_link_target_relative_to_the_link(link):
@system_path_filter
-def copy_tree(src, dest, symlinks=True, ignore=None, _permissions=False):
+def copy_tree(
+ src: str,
+ dest: str,
+ symlinks: bool = True,
+ ignore: Optional[Callable[[str], bool]] = None,
+ _permissions: bool = False,
+):
"""Recursively copy an entire directory tree rooted at *src*.
If the destination directory *dest* does not already exist, it will
@@ -710,7 +735,7 @@ def copy_tree(src, dest, symlinks=True, ignore=None, _permissions=False):
abs_src,
abs_dest,
order="pre",
- follow_symlinks=not symlinks,
+ follow_links=not symlinks,
ignore=ignore,
follow_nonexisting=True,
):
@@ -812,45 +837,32 @@ def chgrp_if_not_world_writable(path, group):
chgrp(path, group)
-def mkdirp(*paths, **kwargs):
+def mkdirp(
+ *paths: str,
+ mode: Optional[int] = None,
+ group: Optional[Union[str, int]] = None,
+ default_perms: Optional[str] = None,
+):
"""Creates a directory, as well as parent directories if needed.
Arguments:
- paths (str): paths to create with mkdirp
-
- Keyword Aguments:
- mode (permission bits or None): optional permissions to set
- on the created directory -- use OS default if not provided
- group (group name or None): optional group for permissions of
- final created directory -- use OS default if not provided. Only
- used if world write permissions are not set
- default_perms (str or None): one of 'parents' or 'args'. The default permissions
- that are set for directories that are not themselves an argument
- for mkdirp. 'parents' means intermediate directories get the
- permissions of their direct parent directory, 'args' means
- intermediate get the same permissions specified in the arguments to
+ paths: paths to create with mkdirp
+ mode: optional permissions to set on the created directory -- use OS default
+ if not provided
+ group: optional group for permissions of final created directory -- use OS
+ default if not provided. Only used if world write permissions are not set
+ default_perms: one of 'parents' or 'args'. The default permissions that are set for
+ directories that are not themselves an argument for mkdirp. 'parents' means
+ intermediate directories get the permissions of their direct parent directory,
+ 'args' means intermediate get the same permissions specified in the arguments to
mkdirp -- default value is 'args'
"""
- mode = kwargs.get("mode", None)
- group = kwargs.get("group", None)
- default_perms = kwargs.get("default_perms", "args")
+ default_perms = default_perms or "args"
paths = path_to_os_path(*paths)
for path in paths:
if not os.path.exists(path):
try:
- # detect missing intermediate folders
- intermediate_folders = []
- last_parent = ""
-
- intermediate_path = os.path.dirname(path)
-
- while intermediate_path:
- if os.path.exists(intermediate_path):
- last_parent = intermediate_path
- break
-
- intermediate_folders.append(intermediate_path)
- intermediate_path = os.path.dirname(intermediate_path)
+ last_parent, intermediate_folders = longest_existing_parent(path)
# create folders
os.makedirs(path)
@@ -884,7 +896,10 @@ def mkdirp(*paths, **kwargs):
os.chmod(intermediate_path, intermediate_mode)
if intermediate_group is not None:
chgrp_if_not_world_writable(intermediate_path, intermediate_group)
- os.chmod(intermediate_path, intermediate_mode) # reset sticky bit after
+ if intermediate_mode is not None:
+ os.chmod(
+ intermediate_path, intermediate_mode
+ ) # reset sticky bit after
except OSError as e:
if e.errno != errno.EEXIST or not os.path.isdir(path):
@@ -893,6 +908,27 @@ def mkdirp(*paths, **kwargs):
raise OSError(errno.EEXIST, "File already exists", path)
+def longest_existing_parent(path: str) -> Tuple[str, List[str]]:
+ """Return the last existing parent and a list of all intermediate directories
+ to be created for the directory passed as input.
+
+ Args:
+ path: directory to be created
+ """
+ # detect missing intermediate folders
+ intermediate_folders = []
+ last_parent = ""
+ intermediate_path = os.path.dirname(path)
+ while intermediate_path:
+ if os.path.lexists(intermediate_path):
+ last_parent = intermediate_path
+ break
+
+ intermediate_folders.append(intermediate_path)
+ intermediate_path = os.path.dirname(intermediate_path)
+ return last_parent, intermediate_folders
+
+
@system_path_filter
def force_remove(*paths):
"""Remove files without printing errors. Like ``rm -f``, does NOT
@@ -906,8 +942,8 @@ def force_remove(*paths):
@contextmanager
@system_path_filter
-def working_dir(dirname, **kwargs):
- if kwargs.get("create", False):
+def working_dir(dirname: str, *, create: bool = False):
+ if create:
mkdirp(dirname)
orig_dir = os.getcwd()
@@ -1118,7 +1154,16 @@ def can_access(file_name):
@system_path_filter
-def traverse_tree(source_root, dest_root, rel_path="", **kwargs):
+def traverse_tree(
+ source_root: str,
+ dest_root: str,
+ rel_path: str = "",
+ *,
+ order: str = "pre",
+ ignore: Optional[Callable[[str], bool]] = None,
+ follow_nonexisting: bool = True,
+ follow_links: bool = False,
+):
"""Traverse two filesystem trees simultaneously.
Walks the LinkTree directory in pre or post order. Yields each
@@ -1150,16 +1195,11 @@ def traverse_tree(source_root, dest_root, rel_path="", **kwargs):
``src`` that do not exit in ``dest``. Default is True
follow_links (bool): Whether to descend into symlinks in ``src``
"""
- follow_nonexisting = kwargs.get("follow_nonexisting", True)
- follow_links = kwargs.get("follow_link", False)
-
- # Yield in pre or post order?
- order = kwargs.get("order", "pre")
if order not in ("pre", "post"):
raise ValueError("Order must be 'pre' or 'post'.")
# List of relative paths to ignore under the src root.
- ignore = kwargs.get("ignore", None) or (lambda filename: False)
+ ignore = ignore or (lambda filename: False)
# Don't descend into ignored directories
if ignore(rel_path):
@@ -1186,7 +1226,15 @@ def traverse_tree(source_root, dest_root, rel_path="", **kwargs):
# When follow_nonexisting isn't set, don't descend into dirs
# in source that do not exist in dest
if follow_nonexisting or os.path.exists(dest_child):
- tuples = traverse_tree(source_root, dest_root, rel_child, **kwargs)
+ tuples = traverse_tree(
+ source_root,
+ dest_root,
+ rel_child,
+ order=order,
+ ignore=ignore,
+ follow_nonexisting=follow_nonexisting,
+ follow_links=follow_links,
+ )
for t in tuples:
yield t
@@ -2573,13 +2621,15 @@ def keep_modification_time(*filenames):
@contextmanager
-def temporary_dir(*args, **kwargs):
+def temporary_dir(
+ suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None
+):
"""Create a temporary directory and cd's into it. Delete the directory
on exit.
Takes the same arguments as tempfile.mkdtemp()
"""
- tmp_dir = tempfile.mkdtemp(*args, **kwargs)
+ tmp_dir = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir)
try:
with working_dir(tmp_dir):
yield tmp_dir