From 046416479a7de64e53ab8831f870cbe5e45a5aa5 Mon Sep 17 00:00:00 2001 From: Massimiliano Culpo Date: Fri, 3 Mar 2023 22:17:27 +0100 Subject: Polish `spack.util.environment` (#35812) Update `spack.util.environment` to remove legacy idioms. * Remove kwargs from method signature and use a class for traces * Uppercase a few global variables * spack.util.environment: add type-hints * Improve docstrings * Fixed most style issues reported by pylint --- lib/spack/spack/build_environment.py | 4 +- lib/spack/spack/main.py | 2 +- lib/spack/spack/modules/lmod.py | 2 +- lib/spack/spack/test/cc.py | 4 +- lib/spack/spack/test/environment_modifications.py | 10 - lib/spack/spack/test/modules/lmod.py | 2 +- lib/spack/spack/util/environment.py | 653 +++++++++++----------- 7 files changed, 341 insertions(+), 336 deletions(-) diff --git a/lib/spack/spack/build_environment.py b/lib/spack/spack/build_environment.py index 1506233de9..da5127a951 100644 --- a/lib/spack/spack/build_environment.py +++ b/lib/spack/spack/build_environment.py @@ -69,13 +69,13 @@ from spack.error import NoHeadersError, NoLibrariesError from spack.installer import InstallError from spack.util.cpus import cpus_available from spack.util.environment import ( + SYSTEM_DIRS, EnvironmentModifications, env_flag, filter_system_paths, get_path, inspect_path, is_system_path, - system_dirs, validate, ) from spack.util.executable import Executable @@ -397,7 +397,7 @@ def set_compiler_environment_variables(pkg, env): env.set("SPACK_COMPILER_SPEC", str(spec.compiler)) - env.set("SPACK_SYSTEM_DIRS", ":".join(system_dirs)) + env.set("SPACK_SYSTEM_DIRS", ":".join(SYSTEM_DIRS)) compiler.setup_custom_environment(pkg, env) diff --git a/lib/spack/spack/main.py b/lib/spack/spack/main.py index ae6bed7fd0..455bd29b9d 100644 --- a/lib/spack/spack/main.py +++ b/lib/spack/spack/main.py @@ -575,7 +575,7 @@ def setup_main_options(args): if args.debug: spack.util.debug.register_interrupt_handler() spack.config.set("config:debug", True, scope="command_line") - spack.util.environment.tracing_enabled = True + spack.util.environment.TRACING_ENABLED = True if args.timestamp: tty.set_timestamp(True) diff --git a/lib/spack/spack/modules/lmod.py b/lib/spack/spack/modules/lmod.py index b63a21feb8..d0f93d0ad3 100644 --- a/lib/spack/spack/modules/lmod.py +++ b/lib/spack/spack/modules/lmod.py @@ -71,7 +71,7 @@ def guess_core_compilers(name, store=False): # A compiler is considered to be a core compiler if any of the # C, C++ or Fortran compilers reside in a system directory is_system_compiler = any( - os.path.dirname(x) in spack.util.environment.system_dirs + os.path.dirname(x) in spack.util.environment.SYSTEM_DIRS for x in compiler["paths"].values() if x is not None ) diff --git a/lib/spack/spack/test/cc.py b/lib/spack/spack/test/cc.py index 3589eb49f8..48dd7fd8a0 100644 --- a/lib/spack/spack/test/cc.py +++ b/lib/spack/spack/test/cc.py @@ -16,7 +16,7 @@ import spack.build_environment import spack.config import spack.spec from spack.paths import build_env_path -from spack.util.environment import set_env, system_dirs +from spack.util.environment import SYSTEM_DIRS, set_env from spack.util.executable import Executable, ProcessError # @@ -160,7 +160,7 @@ def wrapper_environment(working_env): SPACK_DEBUG_LOG_ID="foo-hashabc", SPACK_COMPILER_SPEC="gcc@4.4.7", SPACK_SHORT_SPEC="foo@1.2 arch=linux-rhel6-x86_64 /hashabc", - SPACK_SYSTEM_DIRS=":".join(system_dirs), + SPACK_SYSTEM_DIRS=":".join(SYSTEM_DIRS), SPACK_CC_RPATH_ARG="-Wl,-rpath,", SPACK_CXX_RPATH_ARG="-Wl,-rpath,", SPACK_F77_RPATH_ARG="-Wl,-rpath,", diff --git a/lib/spack/spack/test/environment_modifications.py b/lib/spack/spack/test/environment_modifications.py index 42ce72ad67..9b743b6728 100644 --- a/lib/spack/spack/test/environment_modifications.py +++ b/lib/spack/spack/test/environment_modifications.py @@ -230,16 +230,6 @@ def test_path_manipulation(env): assert os.environ["PATH_LIST_WITH_DUPLICATES"].count("/duplicate") == 1 -def test_extra_arguments(env): - """Tests that we can attach extra arguments to any command.""" - env.set("A", "dummy value", who="Pkg1") - for x in env: - assert "who" in x.args - - env.apply_modifications() - assert "dummy value" == os.environ["A"] - - def test_extend(env): """Tests that we can construct a list of environment modifications starting from another list. diff --git a/lib/spack/spack/test/modules/lmod.py b/lib/spack/spack/test/modules/lmod.py index 9ca0c3aa50..a43c4f8c7b 100644 --- a/lib/spack/spack/test/modules/lmod.py +++ b/lib/spack/spack/test/modules/lmod.py @@ -237,7 +237,7 @@ class TestLmod(object): module_configuration("missing_core_compilers") # Our mock paths must be detected as system paths - monkeypatch.setattr(spack.util.environment, "system_dirs", ["/path/to"]) + monkeypatch.setattr(spack.util.environment, "SYSTEM_DIRS", ["/path/to"]) # We don't want to really write into user configuration # when running tests diff --git a/lib/spack/spack/util/environment.py b/lib/spack/spack/util/environment.py index cad8552f7b..6be232bf42 100644 --- a/lib/spack/spack/util/environment.py +++ b/lib/spack/spack/util/environment.py @@ -2,7 +2,7 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) -"""Utilities for setting and modifying environment variables.""" +"""Set, unset or modify environment variables.""" import collections import contextlib import inspect @@ -15,28 +15,34 @@ import re import shlex import socket import sys +from typing import Any, Callable, Dict, List, MutableMapping, Optional, Tuple, Union -import llnl.util.tty as tty +from llnl.util import tty from llnl.util.lang import dedupe -import spack.config import spack.platforms import spack.spec -import spack.util.executable as executable -from spack.util.path import path_to_os_path, system_path_filter -is_windows = sys.platform == "win32" +from .executable import Executable, which +from .path import path_to_os_path, system_path_filter -system_paths = ( - ["/", "/usr", "/usr/local"] - if not is_windows - else ["C:\\", "C:\\Program Files", "C:\\Program Files (x86)", "C:\\Users", "C:\\ProgramData"] -) -suffixes = ["bin", "bin64", "include", "lib", "lib64"] if not is_windows else [] -system_dirs = [os.path.join(p, s) for s in suffixes for p in system_paths] + system_paths +if sys.platform == "win32": + SYSTEM_PATHS = [ + "C:\\", + "C:\\Program Files", + "C:\\Program Files (x86)", + "C:\\Users", + "C:\\ProgramData", + ] + SUFFIXES = [] +else: + SYSTEM_PATHS = ["/", "/usr", "/usr/local"] + SUFFIXES = ["bin", "bin64", "include", "lib", "lib64"] + +SYSTEM_DIRS = [os.path.join(p, s) for s in SUFFIXES for p in SYSTEM_PATHS] + SYSTEM_PATHS -_shell_set_strings = { +_SHELL_SET_STRINGS = { "sh": "export {0}={1};\n", "csh": "setenv {0} {1};\n", "fish": "set -gx {0} {1};\n", @@ -44,7 +50,7 @@ _shell_set_strings = { } -_shell_unset_strings = { +_SHELL_UNSET_STRINGS = { "sh": "unset {0};\n", "csh": "unsetenv {0};\n", "fish": "set -e {0};\n", @@ -52,109 +58,116 @@ _shell_unset_strings = { } -tracing_enabled = False +TRACING_ENABLED = False +Path = str +ModificationList = List[Union["NameModifier", "NameValueModifier"]] -def is_system_path(path): - """Predicate that given a path returns True if it is a system path, - False otherwise. - Args: - path (str): path to a directory +def is_system_path(path: Path) -> bool: + """Returns True if the argument is a system path, False otherwise.""" + return bool(path) and (os.path.normpath(path) in SYSTEM_DIRS) - Returns: - True or False - """ - return path and os.path.normpath(path) in system_dirs - -def filter_system_paths(paths): - """Return only paths that are not system paths.""" +def filter_system_paths(paths: List[Path]) -> List[Path]: + """Returns a copy of the input where system paths are filtered out.""" return [p for p in paths if not is_system_path(p)] -def deprioritize_system_paths(paths): - """Put system paths at the end of paths, otherwise preserving order.""" - filtered_paths = filter_system_paths(paths) - fp = set(filtered_paths) - return filtered_paths + [p for p in paths if p not in fp] +def deprioritize_system_paths(paths: List[Path]) -> List[Path]: + """Reorders input paths by putting system paths at the end of the list, otherwise + preserving order. + """ + return list(sorted(paths, key=is_system_path)) -def prune_duplicate_paths(paths): - """Returns the paths with duplicates removed, order preserved.""" +def prune_duplicate_paths(paths: List[Path]) -> List[Path]: + """Returns the input list with duplicates removed, otherwise preserving order.""" return list(dedupe(paths)) -def get_path(name): +def get_path(name: str) -> List[Path]: + """Given the name of an environment variable containing multiple + paths separated by 'os.pathsep', returns a list of the paths. + """ path = os.environ.get(name, "").strip() if path: return path.split(os.pathsep) - else: - return [] + return [] -def env_flag(name): +def env_flag(name: str) -> bool: + """Given the name of an environment variable, returns True if it is set to + 'true' or to '1', False otherwise. + """ if name in os.environ: value = os.environ[name].lower() - return value == "true" or value == "1" + return value in ("true", "1") return False -def path_set(var_name, directories): +def path_set(var_name: str, directories: List[Path]): + """Sets the variable passed as input to the `os.pathsep` joined list of directories.""" path_str = os.pathsep.join(str(dir) for dir in directories) os.environ[var_name] = path_str -def path_put_first(var_name, directories): +def path_put_first(var_name: str, directories: List[Path]): """Puts the provided directories first in the path, adding them if they're not already there. """ path = os.environ.get(var_name, "").split(os.pathsep) - for dir in directories: - if dir in path: - path.remove(dir) + for directory in directories: + if directory in path: + path.remove(directory) - new_path = tuple(directories) + tuple(path) + new_path = list(directories) + list(path) path_set(var_name, new_path) -bash_function_finder = re.compile(r"BASH_FUNC_(.*?)\(\)") +BASH_FUNCTION_FINDER = re.compile(r"BASH_FUNC_(.*?)\(\)") -def env_var_to_source_line(var, val): +def _env_var_to_source_line(var: str, val: str) -> str: if var.startswith("BASH_FUNC"): source_line = "function {fname}{decl}; export -f {fname}".format( - fname=bash_function_finder.sub(r"\1", var), decl=val + fname=BASH_FUNCTION_FINDER.sub(r"\1", var), decl=val ) else: - source_line = "{var}={val}; export {var}".format(var=var, val=shlex.quote(val)) + source_line = f"{var}={shlex.quote(val)}; export {var}" return source_line @system_path_filter(arg_slice=slice(1)) -def dump_environment(path, environment=None): - """Dump an environment dictionary to a source-able file.""" +def dump_environment(path: Path, environment: Optional[MutableMapping[str, str]] = None): + """Dump an environment dictionary to a source-able file. + + Args: + path: path of the file to write + environment: environment to be writte. If None os.environ is used. + """ use_env = environment or os.environ - hidden_vars = set(["PS1", "PWD", "OLDPWD", "TERM_SESSION_ID"]) + hidden_vars = {"PS1", "PWD", "OLDPWD", "TERM_SESSION_ID"} - fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) - with os.fdopen(fd, "w") as env_file: + file_descriptor = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + with os.fdopen(file_descriptor, "w") as env_file: for var, val in sorted(use_env.items()): env_file.write( "".join( - ["#" if var in hidden_vars else "", env_var_to_source_line(var, val), "\n"] + ["#" if var in hidden_vars else "", _env_var_to_source_line(var, val), "\n"] ) ) @system_path_filter(arg_slice=slice(1)) -def pickle_environment(path, environment=None): +def pickle_environment(path: Path, environment: Optional[Dict[str, str]] = None): """Pickle an environment dictionary to a file.""" - pickle.dump(dict(environment if environment else os.environ), open(path, "wb"), protocol=2) + with open(path, "wb") as pickle_file: + pickle.dump(dict(environment if environment else os.environ), pickle_file, protocol=2) -def get_host_environment_metadata(): +def get_host_environment_metadata() -> Dict[str, str]: """Get the host environment, reduce to a subset that we can store in the install directory, and add the spack version. """ @@ -171,7 +184,7 @@ def get_host_environment_metadata(): } -def get_host_environment(): +def get_host_environment() -> Dict[str, Any]: """Return a dictionary (lookup) with host information (not including the os.environ). """ @@ -218,55 +231,89 @@ def set_env(**kwargs): del os.environ[var] -class NameModifier(object): - def __init__(self, name, **kwargs): - self.name = name - self.separator = kwargs.get("separator", os.pathsep) - self.args = {"name": name, "separator": self.separator} +class Trace: + """Trace information on a function call""" + + __slots__ = ("filename", "lineno", "context") - self.args.update(kwargs) + def __init__(self, *, filename: str, lineno: int, context: str): + self.filename = filename + self.lineno = lineno + self.context = context - def __eq__(self, other): + def __str__(self): + return f"{self.context} at {self.filename}:{self.lineno}" + + def __repr__(self): + return f"Trace(filename={self.filename}, lineno={self.lineno}, context={self.context})" + + +class NameModifier: + """Base class for modifiers that act on the environment variable as a whole, and thus + store just its name + """ + + __slots__ = ("name", "separator", "trace") + + def __init__(self, name: str, *, separator: str = os.pathsep, trace: Optional[Trace] = None): + self.name = name + self.separator = separator + self.trace = trace + + def __eq__(self, other: object): if not isinstance(other, NameModifier): - return False + return NotImplemented return self.name == other.name - def update_args(self, **kwargs): - self.__dict__.update(kwargs) - self.args.update(kwargs) + def execute(self, env: MutableMapping[str, str]): + """Apply the modification to the mapping passed as input""" + raise NotImplementedError("must be implemented by derived classes") + + +class NameValueModifier: + """Base class for modifiers that modify the value of an environment variable.""" + __slots__ = ("name", "value", "separator", "trace") -class NameValueModifier(object): - def __init__(self, name, value, **kwargs): + def __init__( + self, name: str, value: Any, *, separator: str = os.pathsep, trace: Optional[Trace] = None + ): self.name = name self.value = value - self.separator = kwargs.get("separator", os.pathsep) - self.args = {"name": name, "value": value, "separator": self.separator} - self.args.update(kwargs) + self.separator = separator + self.trace = trace - def __eq__(self, other): + def __eq__(self, other: object): if not isinstance(other, NameValueModifier): - return False + return NotImplemented return ( self.name == other.name and self.value == other.value and self.separator == other.separator ) - def update_args(self, **kwargs): - self.__dict__.update(kwargs) - self.args.update(kwargs) + def execute(self, env: MutableMapping[str, str]): + """Apply the modification to the mapping passed as input""" + raise NotImplementedError("must be implemented by derived classes") class SetEnv(NameValueModifier): - def execute(self, env): - tty.debug("SetEnv: {0}={1}".format(self.name, str(self.value)), level=3) + __slots__ = ("force",) + + def __init__( + self, name: str, value: str, *, trace: Optional[Trace] = None, force: bool = False + ): + super().__init__(name, value, trace=trace) + self.force = force + + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"SetEnv: {self.name}={str(self.value)}", level=3) env[self.name] = str(self.value) class AppendFlagsEnv(NameValueModifier): - def execute(self, env): - tty.debug("AppendFlagsEnv: {0}={1}".format(self.name, str(self.value)), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"AppendFlagsEnv: {self.name}={str(self.value)}", level=3) if self.name in env and env[self.name]: env[self.name] += self.separator + str(self.value) else: @@ -274,15 +321,15 @@ class AppendFlagsEnv(NameValueModifier): class UnsetEnv(NameModifier): - def execute(self, env): - tty.debug("UnsetEnv: {0}".format(self.name), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"UnsetEnv: {self.name}", level=3) # Avoid throwing if the variable was not set env.pop(self.name, None) class RemoveFlagsEnv(NameValueModifier): - def execute(self, env): - tty.debug("RemoveFlagsEnv: {0}-{1}".format(self.name, str(self.value)), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"RemoveFlagsEnv: {self.name}-{str(self.value)}", level=3) environment_value = env.get(self.name, "") flags = environment_value.split(self.separator) if environment_value else [] flags = [f for f in flags if f != self.value] @@ -290,15 +337,15 @@ class RemoveFlagsEnv(NameValueModifier): class SetPath(NameValueModifier): - def execute(self, env): - string_path = concatenate_paths(self.value, separator=self.separator) - tty.debug("SetPath: {0}={1}".format(self.name, string_path), level=3) + def execute(self, env: MutableMapping[str, str]): + string_path = self.separator.join(str(item) for item in self.value) + tty.debug(f"SetPath: {self.name}={string_path}", level=3) env[self.name] = string_path class AppendPath(NameValueModifier): - def execute(self, env): - tty.debug("AppendPath: {0}+{1}".format(self.name, str(self.value)), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"AppendPath: {self.name}+{str(self.value)}", level=3) environment_value = env.get(self.name, "") directories = environment_value.split(self.separator) if environment_value else [] directories.append(path_to_os_path(os.path.normpath(self.value)).pop()) @@ -306,8 +353,8 @@ class AppendPath(NameValueModifier): class PrependPath(NameValueModifier): - def execute(self, env): - tty.debug("PrependPath: {0}+{1}".format(self.name, str(self.value)), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"PrependPath: {self.name}+{str(self.value)}", level=3) environment_value = env.get(self.name, "") directories = environment_value.split(self.separator) if environment_value else [] directories = [path_to_os_path(os.path.normpath(self.value)).pop()] + directories @@ -315,8 +362,8 @@ class PrependPath(NameValueModifier): class RemovePath(NameValueModifier): - def execute(self, env): - tty.debug("RemovePath: {0}-{1}".format(self.name, str(self.value)), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"RemovePath: {self.name}-{str(self.value)}", level=3) environment_value = env.get(self.name, "") directories = environment_value.split(self.separator) if environment_value else [] directories = [ @@ -328,8 +375,8 @@ class RemovePath(NameValueModifier): class DeprioritizeSystemPaths(NameModifier): - def execute(self, env): - tty.debug("DeprioritizeSystemPaths: {0}".format(self.name), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"DeprioritizeSystemPaths: {self.name}", level=3) environment_value = env.get(self.name, "") directories = environment_value.split(self.separator) if environment_value else [] directories = deprioritize_system_paths( @@ -339,8 +386,8 @@ class DeprioritizeSystemPaths(NameModifier): class PruneDuplicatePaths(NameModifier): - def execute(self, env): - tty.debug("PruneDuplicatePaths: {0}".format(self.name), level=3) + def execute(self, env: MutableMapping[str, str]): + tty.debug(f"PruneDuplicatePaths: {self.name}", level=3) environment_value = env.get(self.name, "") directories = environment_value.split(self.separator) if environment_value else [] directories = prune_duplicate_paths( @@ -349,29 +396,22 @@ class PruneDuplicatePaths(NameModifier): env[self.name] = self.separator.join(directories) -class EnvironmentModifications(object): - """Keeps track of requests to modify the current environment. +class EnvironmentModifications: + """Keeps track of requests to modify the current environment.""" - Each call to a method to modify the environment stores the extra - information on the caller in the request: - - * 'filename' : filename of the module where the caller is defined - * 'lineno': line number where the request occurred - * 'context' : line of code that issued the request that failed - """ - - def __init__(self, other=None, traced=None): + def __init__( + self, other: Optional["EnvironmentModifications"] = None, traced: Union[None, bool] = None + ): """Initializes a new instance, copying commands from 'other' if it is not None. Args: - other (EnvironmentModifications): list of environment modifications - to be extended (optional) - traced (bool): enable or disable stack trace inspection to log the origin - of the environment modifications. + other: list of environment modifications to be extended (optional) + traced: enable or disable stack trace inspection to log the origin + of the environment modifications """ - self.traced = tracing_enabled if traced is None else bool(traced) - self.env_modifications = [] + self.traced = TRACING_ENABLED if traced is None else bool(traced) + self.env_modifications: List[Union[NameModifier, NameValueModifier]] = [] if other is not None: self.extend(other) @@ -381,179 +421,163 @@ class EnvironmentModifications(object): def __len__(self): return len(self.env_modifications) - def extend(self, other): + def extend(self, other: "EnvironmentModifications"): self._check_other(other) self.env_modifications.extend(other.env_modifications) @staticmethod - def _check_other(other): + def _check_other(other: "EnvironmentModifications"): if not isinstance(other, EnvironmentModifications): raise TypeError("other must be an instance of EnvironmentModifications") - def _maybe_trace(self, kwargs): - """Provide the modification with stack trace info so that we can track its - origin to find issues in packages. This is very slow and expensive.""" + def _trace(self) -> Optional[Trace]: + """Returns a trace object if tracing is enabled, else None.""" if not self.traced: - return + return None stack = inspect.stack() try: _, filename, lineno, _, context, index = stack[2] - context = context[index].strip() + assert index is not None, "index must be an integer" + current_context = context[index].strip() if context is not None else "unknown context" except Exception: filename = "unknown file" - lineno = "unknown line" - context = "unknown context" - kwargs.update({"filename": filename, "lineno": lineno, "context": context}) + lineno = -1 + current_context = "unknown context" + + return Trace(filename=filename, lineno=lineno, context=current_context) - def set(self, name, value, **kwargs): + def set(self, name: str, value: str, *, force: bool = False): """Stores a request to set an environment variable. Args: - name: name of the environment variable to be set + name: name of the environment variable value: value of the environment variable + force: if True, audit will not consider this modification a warning """ - self._maybe_trace(kwargs) - item = SetEnv(name, value, **kwargs) + item = SetEnv(name, value, trace=self._trace(), force=force) self.env_modifications.append(item) - def append_flags(self, name, value, sep=" ", **kwargs): - """ - Stores in the current object a request to append to an env variable + def append_flags(self, name: str, value: str, sep: str = " "): + """Stores a request to append 'flags' to an environment variable. Args: - name: name of the environment variable to be appended to - value: value to append to the environment variable - Appends with spaces separating different additions to the variable + name: name of the environment variable + value: flags to be appended + sep: separator for the flags (default: " ") """ - self._maybe_trace(kwargs) - kwargs.update({"separator": sep}) - item = AppendFlagsEnv(name, value, **kwargs) + item = AppendFlagsEnv(name, value, separator=sep, trace=self._trace()) self.env_modifications.append(item) - def unset(self, name, **kwargs): + def unset(self, name: str): """Stores a request to unset an environment variable. Args: - name: name of the environment variable to be unset + name: name of the environment variable """ - self._maybe_trace(kwargs) - item = UnsetEnv(name, **kwargs) + item = UnsetEnv(name, trace=self._trace()) self.env_modifications.append(item) - def remove_flags(self, name, value, sep=" ", **kwargs): - """ - Stores in the current object a request to remove flags from an - env variable + def remove_flags(self, name: str, value: str, sep: str = " "): + """Stores a request to remove flags from an environment variable Args: - name: name of the environment variable to be removed from - value: value to remove to the environment variable - sep: separator to assume for environment variable + name: name of the environment variable + value: flags to be removed + sep: separator for the flags (default: " ") """ - self._maybe_trace(kwargs) - kwargs.update({"separator": sep}) - item = RemoveFlagsEnv(name, value, **kwargs) + item = RemoveFlagsEnv(name, value, separator=sep, trace=self._trace()) self.env_modifications.append(item) - def set_path(self, name, elements, **kwargs): - """Stores a request to set a path generated from a list. + def set_path(self, name: str, elements: List[str], separator: str = os.pathsep): + """Stores a request to set an environment variable to a list of paths, + separated by a character defined in input. Args: - name: name o the environment variable to be set. - elements: elements of the path to set. + name: name of the environment variable + elements: ordered list paths + separator: separator for the paths (default: os.pathsep) """ - self._maybe_trace(kwargs) - item = SetPath(name, elements, **kwargs) + item = SetPath(name, elements, separator=separator, trace=self._trace()) self.env_modifications.append(item) - def append_path(self, name, path, **kwargs): - """Stores a request to append a path to a path list. + def append_path(self, name: str, path: str, separator: str = os.pathsep): + """Stores a request to append a path to list of paths. Args: - name: name of the path list in the environment + name: name of the environment variable path: path to be appended + separator: separator for the paths (default: os.pathsep) """ - self._maybe_trace(kwargs) - item = AppendPath(name, path, **kwargs) + item = AppendPath(name, path, separator=separator, trace=self._trace()) self.env_modifications.append(item) - def prepend_path(self, name, path, **kwargs): - """Same as `append_path`, but the path is pre-pended. + def prepend_path(self, name: str, path: str, separator: str = os.pathsep): + """Stores a request to prepend a path to list of paths. Args: - name: name of the path list in the environment - path: path to be pre-pended + name: name of the environment variable + path: path to be prepended + separator: separator for the paths (default: os.pathsep) """ - self._maybe_trace(kwargs) - item = PrependPath(name, path, **kwargs) + item = PrependPath(name, path, separator=separator, trace=self._trace()) self.env_modifications.append(item) - def remove_path(self, name, path, **kwargs): - """Stores a request to remove a path from a path list. + def remove_path(self, name: str, path: str, separator: str = os.pathsep): + """Stores a request to remove a path from a list of paths. Args: - name: name of the path list in the environment + name: name of the environment variable path: path to be removed + separator: separator for the paths (default: os.pathsep) """ - self._maybe_trace(kwargs) - item = RemovePath(name, path, **kwargs) + item = RemovePath(name, path, separator=separator, trace=self._trace()) self.env_modifications.append(item) - def deprioritize_system_paths(self, name, **kwargs): + def deprioritize_system_paths(self, name: str, separator: str = os.pathsep): """Stores a request to deprioritize system paths in a path list, otherwise preserving the order. Args: - name: name of the path list in the environment. + name: name of the environment variable + separator: separator for the paths (default: os.pathsep) """ - self._maybe_trace(kwargs) - item = DeprioritizeSystemPaths(name, **kwargs) + item = DeprioritizeSystemPaths(name, separator=separator, trace=self._trace()) self.env_modifications.append(item) - def prune_duplicate_paths(self, name, **kwargs): + def prune_duplicate_paths(self, name: str, separator: str = os.pathsep): """Stores a request to remove duplicates from a path list, otherwise preserving the order. Args: - name: name of the path list in the environment. + name: name of the environment variable + separator: separator for the paths (default: os.pathsep) """ - self._maybe_trace(kwargs) - item = PruneDuplicatePaths(name, **kwargs) + item = PruneDuplicatePaths(name, separator=separator, trace=self._trace()) self.env_modifications.append(item) - def group_by_name(self): - """Returns a dict of the modifications grouped by variable name. - - Returns: - dict mapping the environment variable name to the modifications to - be done on it - """ + def group_by_name(self) -> Dict[str, ModificationList]: + """Returns a dict of the current modifications keyed by variable name.""" modifications = collections.defaultdict(list) for item in self: modifications[item.name].append(item) return modifications - def is_unset(self, var_name): + def is_unset(self, variable_name: str) -> bool: + """Returns True if the last modification to a variable is to unset it, False otherwise.""" modifications = self.group_by_name() - var_updates = modifications.get(var_name, None) - if not var_updates: - # We did not explicitly unset it + if variable_name not in modifications: return False - # The last modification must unset the variable for it to be considered - # unset - return type(var_updates[-1]) == UnsetEnv + # The last modification must unset the variable for it to be considered unset + return isinstance(modifications[variable_name][-1], UnsetEnv) def clear(self): - """ - Clears the current list of modifications - """ + """Clears the current list of modifications.""" self.env_modifications = [] - def reversed(self): - """ - Returns the EnvironmentModifications object that will reverse self + def reversed(self) -> "EnvironmentModifications": + """Returns the EnvironmentModifications object that will reverse self Only creates reversals for additions to the environment, as reversing ``unset`` and ``remove_path`` modifications is impossible. @@ -564,54 +588,55 @@ class EnvironmentModifications(object): rev = EnvironmentModifications() for envmod in reversed(self.env_modifications): - if type(envmod) == SetEnv: - tty.debug("Reversing `Set` environment operation may lose " "original value") + if isinstance(envmod, SetEnv): + tty.debug("Reversing `Set` environment operation may lose the original value") rev.unset(envmod.name) - elif type(envmod) == AppendPath: + elif isinstance(envmod, AppendPath): rev.remove_path(envmod.name, envmod.value) - elif type(envmod) == PrependPath: + elif isinstance(envmod, PrependPath): rev.remove_path(envmod.name, envmod.value) - elif type(envmod) == SetPath: - tty.debug("Reversing `SetPath` environment operation may lose " "original value") + elif isinstance(envmod, SetPath): + tty.debug("Reversing `SetPath` environment operation may lose the original value") rev.unset(envmod.name) - elif type(envmod) == AppendFlagsEnv: + elif isinstance(envmod, AppendFlagsEnv): rev.remove_flags(envmod.name, envmod.value) else: - # This is an un-reversable operation tty.warn( - "Skipping reversal of unreversable operation" - "%s %s" % (type(envmod), envmod.name) + f"Skipping reversal of unreversable operation {type(envmod)} {envmod.name}" ) return rev - def apply_modifications(self, env=None): - """Applies the modifications and clears the list.""" - # Use os.environ if not specified - # Do not copy, we want to modify it in place - if env is None: - env = os.environ + def apply_modifications(self, env: Optional[MutableMapping[str, str]] = None): + """Applies the modifications and clears the list. - modifications = self.group_by_name() - # Apply modifications one variable at a time - for name, actions in sorted(modifications.items()): - for x in actions: - x.execute(env) + Args: + env: environment to be modified. If None, os.environ will be used. + """ + env = os.environ if env is None else env - def shell_modifications(self, shell="sh", explicit=False, env=None): + modifications = self.group_by_name() + for _, actions in sorted(modifications.items()): + for modifier in actions: + modifier.execute(env) + + def shell_modifications( + self, + shell: str = "sh", + explicit: bool = False, + env: Optional[MutableMapping[str, str]] = None, + ) -> str: """Return shell code to apply the modifications and clears the list.""" modifications = self.group_by_name() - if env is None: - env = os.environ - - new_env = env.copy() + env = os.environ if env is None else env + new_env = dict(env.items()) - for name, actions in sorted(modifications.items()): - for x in actions: - x.execute(new_env) + for _, actions in sorted(modifications.items()): + for modifier in actions: + modifier.execute(new_env) - if "MANPATH" in new_env and not new_env.get("MANPATH").endswith(":"): + if "MANPATH" in new_env and not new_env["MANPATH"].endswith(":"): new_env["MANPATH"] += ":" cmds = "" @@ -621,24 +646,25 @@ class EnvironmentModifications(object): old = env.get(name, None) if explicit or new != old: if new is None: - cmds += _shell_unset_strings[shell].format(name) + cmds += _SHELL_UNSET_STRINGS[shell].format(name) else: if sys.platform != "win32": - cmd = _shell_set_strings[shell].format(name, shlex.quote(new_env[name])) + cmd = _SHELL_SET_STRINGS[shell].format(name, shlex.quote(new_env[name])) else: - cmd = _shell_set_strings[shell].format(name, new_env[name]) + cmd = _SHELL_SET_STRINGS[shell].format(name, new_env[name]) cmds += cmd return cmds @staticmethod - def from_sourcing_file(filename, *arguments, **kwargs): - """Constructs an instance of a - :py:class:`spack.util.environment.EnvironmentModifications` object - that has the same effect as sourcing a file. + def from_sourcing_file( + filename: Path, *arguments: str, **kwargs: Any + ) -> "EnvironmentModifications": + """Returns the environment modifications that have the same effect as + sourcing the input file. Args: - filename (str): the file to be sourced - *arguments (list): arguments to pass on the command line + filename: the file to be sourced + *arguments: arguments to pass on the command line Keyword Args: shell (str): the shell to use (default: ``bash``) @@ -655,10 +681,10 @@ class EnvironmentModifications(object): clean (bool): in addition to removing empty entries, also remove duplicate entries (default: False). """ - tty.debug("EnvironmentModifications.from_sourcing_file: {0}".format(filename)) + tty.debug(f"EnvironmentModifications.from_sourcing_file: {filename}") # Check if the file actually exists if not os.path.isfile(filename): - msg = "Trying to source non-existing file: {0}".format(filename) + msg = f"Trying to source non-existing file: {filename}" raise RuntimeError(msg) # Prepare include and exclude lists of environment variable names @@ -707,16 +733,15 @@ class EnvironmentModifications(object): return EnvironmentModifications.from_environment_diff(before, after, clean) @staticmethod - def from_environment_diff(before, after, clean=False): - """Constructs an instance of a - :py:class:`spack.util.environment.EnvironmentModifications` object - from the diff of two dictionaries. + def from_environment_diff( + before: MutableMapping[str, str], after: MutableMapping[str, str], clean: bool = False + ) -> "EnvironmentModifications": + """Constructs the environment modifications from the diff of two environments. Args: - before (dict): environment before the modifications are applied - after (dict): environment after the modifications are applied - clean (bool): in addition to removing empty entries, also remove - duplicate entries + before: environment before the modifications are applied + after: environment after the modifications are applied + clean: in addition to removing empty entries, also remove duplicate entries """ # Fill the EnvironmentModifications instance env = EnvironmentModifications() @@ -743,22 +768,22 @@ class EnvironmentModifications(object): # Add variables to env. # Assume that variables with 'PATH' in the name or that contain # separators like ':' or ';' are more likely to be paths - for x in new_variables: - sep = return_separator_if_any(after[x]) + for variable_name in new_variables: + sep = return_separator_if_any(after[variable_name]) if sep: - env.prepend_path(x, after[x], separator=sep) - elif "PATH" in x: - env.prepend_path(x, after[x]) + env.prepend_path(variable_name, after[variable_name], separator=sep) + elif "PATH" in variable_name: + env.prepend_path(variable_name, after[variable_name]) else: # We just need to set the variable to the new value - env.set(x, after[x]) + env.set(variable_name, after[variable_name]) - for x in unset_variables: - env.unset(x) + for variable_name in unset_variables: + env.unset(variable_name) - for x in modified_variables: - value_before = before[x] - value_after = after[x] + for variable_name in modified_variables: + value_before = before[variable_name] + value_after = after[variable_name] sep = return_separator_if_any(value_before, value_after) if sep: before_list = value_before.split(sep) @@ -786,12 +811,12 @@ class EnvironmentModifications(object): end = after_list.index(remaining_list[-1]) search = sep.join(after_list[start : end + 1]) except IndexError: - env.prepend_path(x, value_after) + env.prepend_path(variable_name, value_after) continue if search not in value_before: # We just need to set the variable to the new value - env.prepend_path(x, value_after) + env.prepend_path(variable_name, value_after) else: try: prepend_list = after_list[:start] @@ -804,52 +829,39 @@ class EnvironmentModifications(object): append_list = [] for item in remove_list: - env.remove_path(x, item) + env.remove_path(variable_name, item) for item in append_list: - env.append_path(x, item) + env.append_path(variable_name, item) for item in prepend_list: - env.prepend_path(x, item) + env.prepend_path(variable_name, item) else: # We just need to set the variable to the new value - env.set(x, value_after) + env.set(variable_name, value_after) return env -def concatenate_paths(paths, separator=os.pathsep): - """Concatenates an iterable of paths into a string of paths separated by - separator, defaulting to colon. - - Args: - paths: iterable of paths - separator: the separator to use, default ';' windows, ':' otherwise - - Returns: - string - """ - return separator.join(str(item) for item in paths) - - -def set_or_unset_not_first(variable, changes, errstream): +def _set_or_unset_not_first( + variable: str, changes: ModificationList, errstream: Callable[[str], None] +): """Check if we are going to set or unset something after other modifications have already been requested. """ indexes = [ ii for ii, item in enumerate(changes) - if ii != 0 and not item.args.get("force", False) and type(item) in [SetEnv, UnsetEnv] + if ii != 0 and isinstance(item, (SetEnv, UnsetEnv)) and not getattr(item, "force", False) ] if indexes: - good = "\t \t{context} at {filename}:{lineno}" - nogood = "\t--->\t{context} at {filename}:{lineno}" - message = "Suspicious requests to set or unset '{var}' found" - errstream(message.format(var=variable)) - for ii, item in enumerate(changes): - print_format = nogood if ii in indexes else good - errstream(print_format.format(**item.args)) + good = "\t \t{}" + nogood = "\t--->\t{}" + errstream(f"Different requests to set/unset '{variable}' have been found") + for idx, item in enumerate(changes): + print_format = nogood if idx in indexes else good + errstream(print_format.format(item.trace)) -def validate(env, errstream): +def validate(env: EnvironmentModifications, errstream: Callable[[str], None]): """Validates the environment modifications to check for the presence of suspicious patterns. Prompts a warning for everything that was found. @@ -858,27 +870,30 @@ def validate(env, errstream): Args: env: list of environment modifications + errstream: callable to log error messages """ if not env.traced: return modifications = env.group_by_name() for variable, list_of_changes in sorted(modifications.items()): - set_or_unset_not_first(variable, list_of_changes, errstream) + _set_or_unset_not_first(variable, list_of_changes, errstream) -def inspect_path(root, inspections, exclude=None): +def inspect_path( + root: Path, + inspections: MutableMapping[str, List[str]], + exclude: Optional[Callable[[Path], bool]] = None, +) -> EnvironmentModifications: """Inspects ``root`` to search for the subdirectories in ``inspections``. Adds every path found to a list of prepend-path commands and returns it. Args: - root (str): absolute path where to search for subdirectories - - inspections (dict): maps relative paths to a list of environment + root: absolute path where to search for subdirectories + inspections: maps relative paths to a list of environment variables that will be modified if the path exists. The modifications are not performed immediately, but stored in a command object that is returned to client - - exclude (typing.Callable): optional callable. If present it must accept an + exclude: optional callable. If present it must accept an absolute path and return True if it should be excluded from the inspection @@ -901,10 +916,6 @@ def inspect_path(root, inspections, exclude=None): # Eventually execute the commands env.apply_modifications() - - Returns: - instance of EnvironmentModifications containing the requested - modifications """ if exclude is None: exclude = lambda x: False @@ -922,7 +933,7 @@ def inspect_path(root, inspections, exclude=None): @contextlib.contextmanager -def preserve_environment(*variables): +def preserve_environment(*variables: str): """Ensures that the value of the environment variables passed as arguments is the same before entering to the context manager and after exiting it. @@ -931,7 +942,7 @@ def preserve_environment(*variables): explicitly unset on exit. Args: - variables (list): list of environment variables to be preserved + variables: list of environment variables to be preserved """ cache = {} for var in variables: @@ -959,7 +970,9 @@ def preserve_environment(*variables): del os.environ[var] -def environment_after_sourcing_files(*files, **kwargs): +def environment_after_sourcing_files( + *files: Union[Path, Tuple[str, ...]], **kwargs +) -> Dict[str, str]: """Returns a dictionary with the environment that one would have after sourcing the files passed as argument. @@ -986,7 +999,7 @@ def environment_after_sourcing_files(*files, **kwargs): suppress_output = kwargs.get("suppress_output", "&> /dev/null") concatenate_on_success = kwargs.get("concatenate_on_success", "&&") - shell = executable.Executable(" ".join([shell_cmd, shell_options])) + shell = Executable(" ".join([shell_cmd, shell_options])) def _source_single_file(file_and_args, environment): source_file = [source_command] @@ -996,31 +1009,33 @@ def environment_after_sourcing_files(*files, **kwargs): # If the environment contains 'python' use it, if not # go with sys.executable. Below we just need a working # Python interpreter, not necessarily sys.executable. - python_cmd = executable.which("python3", "python", "python2") + python_cmd = which("python3", "python", "python2") python_cmd = python_cmd.path if python_cmd else sys.executable dump_cmd = "import os, json; print(json.dumps(dict(os.environ)))" - dump_environment = python_cmd + ' -E -c "{0}"'.format(dump_cmd) + dump_environment_cmd = python_cmd + f' -E -c "{dump_cmd}"' # Try to source the file source_file_arguments = " ".join( - [source_file, suppress_output, concatenate_on_success, dump_environment] + [source_file, suppress_output, concatenate_on_success, dump_environment_cmd] ) output = shell(source_file_arguments, output=str, env=environment, ignore_quotes=True) return json.loads(output) current_environment = kwargs.get("env", dict(os.environ)) - for f in files: + for file in files: # Normalize the input to the helper function - if isinstance(f, str): - f = [f] + if isinstance(file, str): + file = (file,) - current_environment = _source_single_file(f, environment=current_environment) + current_environment = _source_single_file(file, environment=current_environment) return current_environment -def sanitize(environment, exclude, include): +def sanitize( + environment: MutableMapping[str, str], exclude: List[str], include: List[str] +) -> Dict[str, str]: """Returns a copy of the input dictionary where all the keys that match an excluded pattern and don't match an included pattern are removed. -- cgit v1.2.3-70-g09d2