From 9d00e7d15d5da43cd098f695f15d0ee4d1aeaedc Mon Sep 17 00:00:00 2001 From: Massimiliano Culpo Date: Tue, 10 Jan 2023 14:23:42 +0100 Subject: Remove **kwargs from function signatures in llnl.util.filesystem (#34804) Since we dropped support for Python 2.7, we can embrace using keyword only arguments for many functions in Spack that use **kwargs in the function signature. Here this is done for the llnl.util.filesystem module. There were a couple of bugs lurking in the code related to typo-like errors when retrieving from kwargs. Those have been fixed as well. --- lib/spack/llnl/util/filesystem.py | 188 ++++++++++++++++++++++++-------------- 1 file changed, 119 insertions(+), 69 deletions(-) diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py index 8e664cc0a9..219a13192d 100644 --- a/lib/spack/llnl/util/filesystem.py +++ b/lib/spack/llnl/util/filesystem.py @@ -17,6 +17,7 @@ import sys import tempfile from contextlib import contextmanager from sys import platform as _platform +from typing import Callable, List, Match, Optional, Tuple, Union from llnl.util import tty from llnl.util.lang import dedupe, memoized @@ -214,7 +215,16 @@ def same_path(path1, path2): return norm1 == norm2 -def filter_file(regex, repl, *filenames, **kwargs): +def filter_file( + regex: str, + repl: Union[str, Callable[[Match], str]], + *filenames: str, + string: bool = False, + backup: bool = False, + ignore_absent: bool = False, + start_at: Optional[str] = None, + stop_at: Optional[str] = None, +) -> None: r"""Like sed, but uses python regular expressions. Filters every line of each file through regex and replaces the file @@ -226,12 +236,10 @@ def filter_file(regex, repl, *filenames, **kwargs): can contain ``\1``, ``\2``, etc. to represent back-substitution as sed would allow. - Parameters: + Args: regex (str): The regular expression to search for repl (str): The string to replace matches with *filenames: One or more files to search and replace - - Keyword Arguments: string (bool): Treat regex as a plain string. Default it False backup (bool): Make backup file(s) suffixed with ``~``. Default is False ignore_absent (bool): Ignore any files that don't exist. @@ -246,17 +254,11 @@ def filter_file(regex, repl, *filenames, **kwargs): file is copied verbatim. Default is to filter until the end of the file. """ - string = kwargs.get("string", False) - backup = kwargs.get("backup", False) - ignore_absent = kwargs.get("ignore_absent", False) - start_at = kwargs.get("start_at", None) - stop_at = kwargs.get("stop_at", None) - # Allow strings to use \1, \2, etc. for replacement, like sed if not callable(repl): unescaped = repl.replace(r"\\", "\\") - def replace_groups_with_groupid(m): + def replace_groups_with_groupid(m: Match) -> str: def groupid_to_group(x): return m.group(int(x.group(1))) @@ -290,15 +292,14 @@ def filter_file(regex, repl, *filenames, **kwargs): shutil.copy(filename, tmp_filename) try: - # To avoid translating line endings (\n to \r\n and vis versa) + # Open as a text file and filter until the end of the file is + # reached, or we found a marker in the line if it was specified + # + # To avoid translating line endings (\n to \r\n and vice-versa) # we force os.open to ignore translations and use the line endings # the file comes with - extra_kwargs = {"errors": "surrogateescape", "newline": ""} - - # Open as a text file and filter until the end of the file is - # reached or we found a marker in the line if it was specified - with open(tmp_filename, mode="r", **extra_kwargs) as input_file: - with open(filename, mode="w", **extra_kwargs) as output_file: + with open(tmp_filename, mode="r", errors="surrogateescape", newline="") as input_file: + with open(filename, mode="w", errors="surrogateescape", newline="") as output_file: do_filtering = start_at is None # Using iter and readline is a workaround needed not to # disable input_file.tell(), which will happen if we call @@ -321,10 +322,10 @@ def filter_file(regex, repl, *filenames, **kwargs): # If we stopped filtering at some point, reopen the file in # binary mode and copy verbatim the remaining part if current_position and stop_at: - with open(tmp_filename, mode="rb") as input_file: - input_file.seek(current_position) - with open(filename, mode="ab") as output_file: - output_file.writelines(input_file.readlines()) + with open(tmp_filename, mode="rb") as input_binary_buffer: + input_binary_buffer.seek(current_position) + with open(filename, mode="ab") as output_binary_buffer: + output_binary_buffer.writelines(input_binary_buffer.readlines()) except BaseException: # clean up the original file on failure. @@ -343,8 +344,26 @@ class FileFilter(object): def __init__(self, *filenames): self.filenames = filenames - def filter(self, regex, repl, **kwargs): - return filter_file(regex, repl, *self.filenames, **kwargs) + def filter( + self, + regex: str, + repl: Union[str, Callable[[Match], str]], + string: bool = False, + backup: bool = False, + ignore_absent: bool = False, + start_at: Optional[str] = None, + stop_at: Optional[str] = None, + ) -> None: + return filter_file( + regex, + repl, + *self.filenames, + string=string, + backup=backup, + ignore_absent=ignore_absent, + start_at=start_at, + stop_at=stop_at, + ) def change_sed_delimiter(old_delim, new_delim, *filenames): @@ -652,7 +671,13 @@ def resolve_link_target_relative_to_the_link(link): @system_path_filter -def copy_tree(src, dest, symlinks=True, ignore=None, _permissions=False): +def copy_tree( + src: str, + dest: str, + symlinks: bool = True, + ignore: Optional[Callable[[str], bool]] = None, + _permissions: bool = False, +): """Recursively copy an entire directory tree rooted at *src*. If the destination directory *dest* does not already exist, it will @@ -710,7 +735,7 @@ def copy_tree(src, dest, symlinks=True, ignore=None, _permissions=False): abs_src, abs_dest, order="pre", - follow_symlinks=not symlinks, + follow_links=not symlinks, ignore=ignore, follow_nonexisting=True, ): @@ -812,45 +837,32 @@ def chgrp_if_not_world_writable(path, group): chgrp(path, group) -def mkdirp(*paths, **kwargs): +def mkdirp( + *paths: str, + mode: Optional[int] = None, + group: Optional[Union[str, int]] = None, + default_perms: Optional[str] = None, +): """Creates a directory, as well as parent directories if needed. Arguments: - paths (str): paths to create with mkdirp - - Keyword Aguments: - mode (permission bits or None): optional permissions to set - on the created directory -- use OS default if not provided - group (group name or None): optional group for permissions of - final created directory -- use OS default if not provided. Only - used if world write permissions are not set - default_perms (str or None): one of 'parents' or 'args'. The default permissions - that are set for directories that are not themselves an argument - for mkdirp. 'parents' means intermediate directories get the - permissions of their direct parent directory, 'args' means - intermediate get the same permissions specified in the arguments to + paths: paths to create with mkdirp + mode: optional permissions to set on the created directory -- use OS default + if not provided + group: optional group for permissions of final created directory -- use OS + default if not provided. Only used if world write permissions are not set + default_perms: one of 'parents' or 'args'. The default permissions that are set for + directories that are not themselves an argument for mkdirp. 'parents' means + intermediate directories get the permissions of their direct parent directory, + 'args' means intermediate get the same permissions specified in the arguments to mkdirp -- default value is 'args' """ - mode = kwargs.get("mode", None) - group = kwargs.get("group", None) - default_perms = kwargs.get("default_perms", "args") + default_perms = default_perms or "args" paths = path_to_os_path(*paths) for path in paths: if not os.path.exists(path): try: - # detect missing intermediate folders - intermediate_folders = [] - last_parent = "" - - intermediate_path = os.path.dirname(path) - - while intermediate_path: - if os.path.exists(intermediate_path): - last_parent = intermediate_path - break - - intermediate_folders.append(intermediate_path) - intermediate_path = os.path.dirname(intermediate_path) + last_parent, intermediate_folders = longest_existing_parent(path) # create folders os.makedirs(path) @@ -884,7 +896,10 @@ def mkdirp(*paths, **kwargs): os.chmod(intermediate_path, intermediate_mode) if intermediate_group is not None: chgrp_if_not_world_writable(intermediate_path, intermediate_group) - os.chmod(intermediate_path, intermediate_mode) # reset sticky bit after + if intermediate_mode is not None: + os.chmod( + intermediate_path, intermediate_mode + ) # reset sticky bit after except OSError as e: if e.errno != errno.EEXIST or not os.path.isdir(path): @@ -893,6 +908,27 @@ def mkdirp(*paths, **kwargs): raise OSError(errno.EEXIST, "File already exists", path) +def longest_existing_parent(path: str) -> Tuple[str, List[str]]: + """Return the last existing parent and a list of all intermediate directories + to be created for the directory passed as input. + + Args: + path: directory to be created + """ + # detect missing intermediate folders + intermediate_folders = [] + last_parent = "" + intermediate_path = os.path.dirname(path) + while intermediate_path: + if os.path.lexists(intermediate_path): + last_parent = intermediate_path + break + + intermediate_folders.append(intermediate_path) + intermediate_path = os.path.dirname(intermediate_path) + return last_parent, intermediate_folders + + @system_path_filter def force_remove(*paths): """Remove files without printing errors. Like ``rm -f``, does NOT @@ -906,8 +942,8 @@ def force_remove(*paths): @contextmanager @system_path_filter -def working_dir(dirname, **kwargs): - if kwargs.get("create", False): +def working_dir(dirname: str, *, create: bool = False): + if create: mkdirp(dirname) orig_dir = os.getcwd() @@ -1118,7 +1154,16 @@ def can_access(file_name): @system_path_filter -def traverse_tree(source_root, dest_root, rel_path="", **kwargs): +def traverse_tree( + source_root: str, + dest_root: str, + rel_path: str = "", + *, + order: str = "pre", + ignore: Optional[Callable[[str], bool]] = None, + follow_nonexisting: bool = True, + follow_links: bool = False, +): """Traverse two filesystem trees simultaneously. Walks the LinkTree directory in pre or post order. Yields each @@ -1150,16 +1195,11 @@ def traverse_tree(source_root, dest_root, rel_path="", **kwargs): ``src`` that do not exit in ``dest``. Default is True follow_links (bool): Whether to descend into symlinks in ``src`` """ - follow_nonexisting = kwargs.get("follow_nonexisting", True) - follow_links = kwargs.get("follow_link", False) - - # Yield in pre or post order? - order = kwargs.get("order", "pre") if order not in ("pre", "post"): raise ValueError("Order must be 'pre' or 'post'.") # List of relative paths to ignore under the src root. - ignore = kwargs.get("ignore", None) or (lambda filename: False) + ignore = ignore or (lambda filename: False) # Don't descend into ignored directories if ignore(rel_path): @@ -1186,7 +1226,15 @@ def traverse_tree(source_root, dest_root, rel_path="", **kwargs): # When follow_nonexisting isn't set, don't descend into dirs # in source that do not exist in dest if follow_nonexisting or os.path.exists(dest_child): - tuples = traverse_tree(source_root, dest_root, rel_child, **kwargs) + tuples = traverse_tree( + source_root, + dest_root, + rel_child, + order=order, + ignore=ignore, + follow_nonexisting=follow_nonexisting, + follow_links=follow_links, + ) for t in tuples: yield t @@ -2573,13 +2621,15 @@ def keep_modification_time(*filenames): @contextmanager -def temporary_dir(*args, **kwargs): +def temporary_dir( + suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None +): """Create a temporary directory and cd's into it. Delete the directory on exit. Takes the same arguments as tempfile.mkdtemp() """ - tmp_dir = tempfile.mkdtemp(*args, **kwargs) + tmp_dir = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir) try: with working_dir(tmp_dir): yield tmp_dir -- cgit v1.2.3-60-g2f50