summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/spack/build_environment.py4
-rw-r--r--lib/spack/spack/main.py2
-rw-r--r--lib/spack/spack/modules/lmod.py2
-rw-r--r--lib/spack/spack/test/cc.py4
-rw-r--r--lib/spack/spack/test/environment_modifications.py10
-rw-r--r--lib/spack/spack/test/modules/lmod.py2
-rw-r--r--lib/spack/spack/util/environment.py653
7 files changed, 341 insertions, 336 deletions
diff --git a/lib/spack/spack/build_environment.py b/lib/spack/spack/build_environment.py
index 1506233de9..da5127a951 100644
--- a/lib/spack/spack/build_environment.py
+++ b/lib/spack/spack/build_environment.py
@@ -69,13 +69,13 @@ from spack.error import NoHeadersError, NoLibrariesError
from spack.installer import InstallError
from spack.util.cpus import cpus_available
from spack.util.environment import (
+ SYSTEM_DIRS,
EnvironmentModifications,
env_flag,
filter_system_paths,
get_path,
inspect_path,
is_system_path,
- system_dirs,
validate,
)
from spack.util.executable import Executable
@@ -397,7 +397,7 @@ def set_compiler_environment_variables(pkg, env):
env.set("SPACK_COMPILER_SPEC", str(spec.compiler))
- env.set("SPACK_SYSTEM_DIRS", ":".join(system_dirs))
+ env.set("SPACK_SYSTEM_DIRS", ":".join(SYSTEM_DIRS))
compiler.setup_custom_environment(pkg, env)
diff --git a/lib/spack/spack/main.py b/lib/spack/spack/main.py
index ae6bed7fd0..455bd29b9d 100644
--- a/lib/spack/spack/main.py
+++ b/lib/spack/spack/main.py
@@ -575,7 +575,7 @@ def setup_main_options(args):
if args.debug:
spack.util.debug.register_interrupt_handler()
spack.config.set("config:debug", True, scope="command_line")
- spack.util.environment.tracing_enabled = True
+ spack.util.environment.TRACING_ENABLED = True
if args.timestamp:
tty.set_timestamp(True)
diff --git a/lib/spack/spack/modules/lmod.py b/lib/spack/spack/modules/lmod.py
index b63a21feb8..d0f93d0ad3 100644
--- a/lib/spack/spack/modules/lmod.py
+++ b/lib/spack/spack/modules/lmod.py
@@ -71,7 +71,7 @@ def guess_core_compilers(name, store=False):
# A compiler is considered to be a core compiler if any of the
# C, C++ or Fortran compilers reside in a system directory
is_system_compiler = any(
- os.path.dirname(x) in spack.util.environment.system_dirs
+ os.path.dirname(x) in spack.util.environment.SYSTEM_DIRS
for x in compiler["paths"].values()
if x is not None
)
diff --git a/lib/spack/spack/test/cc.py b/lib/spack/spack/test/cc.py
index 3589eb49f8..48dd7fd8a0 100644
--- a/lib/spack/spack/test/cc.py
+++ b/lib/spack/spack/test/cc.py
@@ -16,7 +16,7 @@ import spack.build_environment
import spack.config
import spack.spec
from spack.paths import build_env_path
-from spack.util.environment import set_env, system_dirs
+from spack.util.environment import SYSTEM_DIRS, set_env
from spack.util.executable import Executable, ProcessError
#
@@ -160,7 +160,7 @@ def wrapper_environment(working_env):
SPACK_DEBUG_LOG_ID="foo-hashabc",
SPACK_COMPILER_SPEC="gcc@4.4.7",
SPACK_SHORT_SPEC="foo@1.2 arch=linux-rhel6-x86_64 /hashabc",
- SPACK_SYSTEM_DIRS=":".join(system_dirs),
+ SPACK_SYSTEM_DIRS=":".join(SYSTEM_DIRS),
SPACK_CC_RPATH_ARG="-Wl,-rpath,",
SPACK_CXX_RPATH_ARG="-Wl,-rpath,",
SPACK_F77_RPATH_ARG="-Wl,-rpath,",
diff --git a/lib/spack/spack/test/environment_modifications.py b/lib/spack/spack/test/environment_modifications.py
index 42ce72ad67..9b743b6728 100644
--- a/lib/spack/spack/test/environment_modifications.py
+++ b/lib/spack/spack/test/environment_modifications.py
@@ -230,16 +230,6 @@ def test_path_manipulation(env):
assert os.environ["PATH_LIST_WITH_DUPLICATES"].count("/duplicate") == 1
-def test_extra_arguments(env):
- """Tests that we can attach extra arguments to any command."""
- env.set("A", "dummy value", who="Pkg1")
- for x in env:
- assert "who" in x.args
-
- env.apply_modifications()
- assert "dummy value" == os.environ["A"]
-
-
def test_extend(env):
"""Tests that we can construct a list of environment modifications
starting from another list.
diff --git a/lib/spack/spack/test/modules/lmod.py b/lib/spack/spack/test/modules/lmod.py
index 9ca0c3aa50..a43c4f8c7b 100644
--- a/lib/spack/spack/test/modules/lmod.py
+++ b/lib/spack/spack/test/modules/lmod.py
@@ -237,7 +237,7 @@ class TestLmod(object):
module_configuration("missing_core_compilers")
# Our mock paths must be detected as system paths
- monkeypatch.setattr(spack.util.environment, "system_dirs", ["/path/to"])
+ monkeypatch.setattr(spack.util.environment, "SYSTEM_DIRS", ["/path/to"])
# We don't want to really write into user configuration
# when running tests
diff --git a/lib/spack/spack/util/environment.py b/lib/spack/spack/util/environment.py
index cad8552f7b..6be232bf42 100644
--- a/lib/spack/spack/util/environment.py
+++ b/lib/spack/spack/util/environment.py
@@ -2,7 +2,7 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
-"""Utilities for setting and modifying environment variables."""
+"""Set, unset or modify environment variables."""
import collections
import contextlib
import inspect
@@ -15,28 +15,34 @@ import re
import shlex
import socket
import sys
+from typing import Any, Callable, Dict, List, MutableMapping, Optional, Tuple, Union
-import llnl.util.tty as tty
+from llnl.util import tty
from llnl.util.lang import dedupe
-import spack.config
import spack.platforms
import spack.spec
-import spack.util.executable as executable
-from spack.util.path import path_to_os_path, system_path_filter
-is_windows = sys.platform == "win32"
+from .executable import Executable, which
+from .path import path_to_os_path, system_path_filter
-system_paths = (
- ["/", "/usr", "/usr/local"]
- if not is_windows
- else ["C:\\", "C:\\Program Files", "C:\\Program Files (x86)", "C:\\Users", "C:\\ProgramData"]
-)
-suffixes = ["bin", "bin64", "include", "lib", "lib64"] if not is_windows else []
-system_dirs = [os.path.join(p, s) for s in suffixes for p in system_paths] + system_paths
+if sys.platform == "win32":
+ SYSTEM_PATHS = [
+ "C:\\",
+ "C:\\Program Files",
+ "C:\\Program Files (x86)",
+ "C:\\Users",
+ "C:\\ProgramData",
+ ]
+ SUFFIXES = []
+else:
+ SYSTEM_PATHS = ["/", "/usr", "/usr/local"]
+ SUFFIXES = ["bin", "bin64", "include", "lib", "lib64"]
+
+SYSTEM_DIRS = [os.path.join(p, s) for s in SUFFIXES for p in SYSTEM_PATHS] + SYSTEM_PATHS
-_shell_set_strings = {
+_SHELL_SET_STRINGS = {
"sh": "export {0}={1};\n",
"csh": "setenv {0} {1};\n",
"fish": "set -gx {0} {1};\n",
@@ -44,7 +50,7 @@ _shell_set_strings = {
}
-_shell_unset_strings = {
+_SHELL_UNSET_STRINGS = {
"sh": "unset {0};\n",
"csh": "unsetenv {0};\n",
"fish": "set -e {0};\n",
@@ -52,109 +58,116 @@ _shell_unset_strings = {
}
-tracing_enabled = False
+TRACING_ENABLED = False
+Path = str
+ModificationList = List[Union["NameModifier", "NameValueModifier"]]
-def is_system_path(path):
- """Predicate that given a path returns True if it is a system path,
- False otherwise.
- Args:
- path (str): path to a directory
+def is_system_path(path: Path) -> bool:
+ """Returns True if the argument is a system path, False otherwise."""
+ return bool(path) and (os.path.normpath(path) in SYSTEM_DIRS)
- Returns:
- True or False
- """
- return path and os.path.normpath(path) in system_dirs
-
-def filter_system_paths(paths):
- """Return only paths that are not system paths."""
+def filter_system_paths(paths: List[Path]) -> List[Path]:
+ """Returns a copy of the input where system paths are filtered out."""
return [p for p in paths if not is_system_path(p)]
-def deprioritize_system_paths(paths):
- """Put system paths at the end of paths, otherwise preserving order."""
- filtered_paths = filter_system_paths(paths)
- fp = set(filtered_paths)
- return filtered_paths + [p for p in paths if p not in fp]
+def deprioritize_system_paths(paths: List[Path]) -> List[Path]:
+ """Reorders input paths by putting system paths at the end of the list, otherwise
+ preserving order.
+ """
+ return list(sorted(paths, key=is_system_path))
-def prune_duplicate_paths(paths):
- """Returns the paths with duplicates removed, order preserved."""
+def prune_duplicate_paths(paths: List[Path]) -> List[Path]:
+ """Returns the input list with duplicates removed, otherwise preserving order."""
return list(dedupe(paths))
-def get_path(name):
+def get_path(name: str) -> List[Path]:
+ """Given the name of an environment variable containing multiple
+ paths separated by 'os.pathsep', returns a list of the paths.
+ """
path = os.environ.get(name, "").strip()
if path:
return path.split(os.pathsep)
- else:
- return []
+ return []
-def env_flag(name):
+def env_flag(name: str) -> bool:
+ """Given the name of an environment variable, returns True if it is set to
+ 'true' or to '1', False otherwise.
+ """
if name in os.environ:
value = os.environ[name].lower()
- return value == "true" or value == "1"
+ return value in ("true", "1")
return False
-def path_set(var_name, directories):
+def path_set(var_name: str, directories: List[Path]):
+ """Sets the variable passed as input to the `os.pathsep` joined list of directories."""
path_str = os.pathsep.join(str(dir) for dir in directories)
os.environ[var_name] = path_str
-def path_put_first(var_name, directories):
+def path_put_first(var_name: str, directories: List[Path]):
"""Puts the provided directories first in the path, adding them
if they're not already there.
"""
path = os.environ.get(var_name, "").split(os.pathsep)
- for dir in directories:
- if dir in path:
- path.remove(dir)
+ for directory in directories:
+ if directory in path:
+ path.remove(directory)
- new_path = tuple(directories) + tuple(path)
+ new_path = list(directories) + list(path)
path_set(var_name, new_path)
-bash_function_finder = re.compile(r"BASH_FUNC_(.*?)\(\)")
+BASH_FUNCTION_FINDER = re.compile(r"BASH_FUNC_(.*?)\(\)")
-def env_var_to_source_line(var, val):
+def _env_var_to_source_line(var: str, val: str) -> str:
if var.startswith("BASH_FUNC"):
source_line = "function {fname}{decl}; export -f {fname}".format(
- fname=bash_function_finder.sub(r"\1", var), decl=val
+ fname=BASH_FUNCTION_FINDER.sub(r"\1", var), decl=val
)
else:
- source_line = "{var}={val}; export {var}".format(var=var, val=shlex.quote(val))
+ source_line = f"{var}={shlex.quote(val)}; export {var}"
return source_line
@system_path_filter(arg_slice=slice(1))
-def dump_environment(path, environment=None):
- """Dump an environment dictionary to a source-able file."""
+def dump_environment(path: Path, environment: Optional[MutableMapping[str, str]] = None):
+ """Dump an environment dictionary to a source-able file.
+
+ Args:
+ path: path of the file to write
+ environment: environment to be writte. If None os.environ is used.
+ """
use_env = environment or os.environ
- hidden_vars = set(["PS1", "PWD", "OLDPWD", "TERM_SESSION_ID"])
+ hidden_vars = {"PS1", "PWD", "OLDPWD", "TERM_SESSION_ID"}
- fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
- with os.fdopen(fd, "w") as env_file:
+ file_descriptor = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
+ with os.fdopen(file_descriptor, "w") as env_file:
for var, val in sorted(use_env.items()):
env_file.write(
"".join(
- ["#" if var in hidden_vars else "", env_var_to_source_line(var, val), "\n"]
+ ["#" if var in hidden_vars else "", _env_var_to_source_line(var, val), "\n"]
)
)
@system_path_filter(arg_slice=slice(1))
-def pickle_environment(path, environment=None):
+def pickle_environment(path: Path, environment: Optional[Dict[str, str]] = None):
"""Pickle an environment dictionary to a file."""
- pickle.dump(dict(environment if environment else os.environ), open(path, "wb"), protocol=2)
+ with open(path, "wb") as pickle_file:
+ pickle.dump(dict(environment if environment else os.environ), pickle_file, protocol=2)
-def get_host_environment_metadata():
+def get_host_environment_metadata() -> Dict[str, str]:
"""Get the host environment, reduce to a subset that we can store in
the install directory, and add the spack version.
"""
@@ -171,7 +184,7 @@ def get_host_environment_metadata():
}
-def get_host_environment():
+def get_host_environment() -> Dict[str, Any]:
"""Return a dictionary (lookup) with host information (not including the
os.environ).
"""
@@ -218,55 +231,89 @@ def set_env(**kwargs):
del os.environ[var]
-class NameModifier(object):
- def __init__(self, name, **kwargs):
- self.name = name
- self.separator = kwargs.get("separator", os.pathsep)
- self.args = {"name": name, "separator": self.separator}
+class Trace:
+ """Trace information on a function call"""
+
+ __slots__ = ("filename", "lineno", "context")
- self.args.update(kwargs)
+ def __init__(self, *, filename: str, lineno: int, context: str):
+ self.filename = filename
+ self.lineno = lineno
+ self.context = context
- def __eq__(self, other):
+ def __str__(self):
+ return f"{self.context} at {self.filename}:{self.lineno}"
+
+ def __repr__(self):
+ return f"Trace(filename={self.filename}, lineno={self.lineno}, context={self.context})"
+
+
+class NameModifier:
+ """Base class for modifiers that act on the environment variable as a whole, and thus
+ store just its name
+ """
+
+ __slots__ = ("name", "separator", "trace")
+
+ def __init__(self, name: str, *, separator: str = os.pathsep, trace: Optional[Trace] = None):
+ self.name = name
+ self.separator = separator
+ self.trace = trace
+
+ def __eq__(self, other: object):
if not isinstance(other, NameModifier):
- return False
+ return NotImplemented
return self.name == other.name
- def update_args(self, **kwargs):
- self.__dict__.update(kwargs)
- self.args.update(kwargs)
+ def execute(self, env: MutableMapping[str, str]):
+ """Apply the modification to the mapping passed as input"""
+ raise NotImplementedError("must be implemented by derived classes")
+
+
+class NameValueModifier:
+ """Base class for modifiers that modify the value of an environment variable."""
+ __slots__ = ("name", "value", "separator", "trace")
-class NameValueModifier(object):
- def __init__(self, name, value, **kwargs):
+ def __init__(
+ self, name: str, value: Any, *, separator: str = os.pathsep, trace: Optional[Trace] = None
+ ):
self.name = name
self.value = value
- self.separator = kwargs.get("separator", os.pathsep)
- self.args = {"name": name, "value": value, "separator": self.separator}
- self.args.update(kwargs)
+ self.separator = separator
+ self.trace = trace
- def __eq__(self, other):
+ def __eq__(self, other: object):
if not isinstance(other, NameValueModifier):
- return False
+ return NotImplemented
return (
self.name == other.name
and self.value == other.value
and self.separator == other.separator
)
- def update_args(self, **kwargs):
- self.__dict__.update(kwargs)
- self.args.update(kwargs)
+ def execute(self, env: MutableMapping[str, str]):
+ """Apply the modification to the mapping passed as input"""
+ raise NotImplementedError("must be implemented by derived classes")
class SetEnv(NameValueModifier):
- def execute(self, env):
- tty.debug("SetEnv: {0}={1}".format(self.name, str(self.value)), level=3)
+ __slots__ = ("force",)
+
+ def __init__(
+ self, name: str, value: str, *, trace: Optional[Trace] = None, force: bool = False
+ ):
+ super().__init__(name, value, trace=trace)
+ self.force = force
+
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"SetEnv: {self.name}={str(self.value)}", level=3)
env[self.name] = str(self.value)
class AppendFlagsEnv(NameValueModifier):
- def execute(self, env):
- tty.debug("AppendFlagsEnv: {0}={1}".format(self.name, str(self.value)), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"AppendFlagsEnv: {self.name}={str(self.value)}", level=3)
if self.name in env and env[self.name]:
env[self.name] += self.separator + str(self.value)
else:
@@ -274,15 +321,15 @@ class AppendFlagsEnv(NameValueModifier):
class UnsetEnv(NameModifier):
- def execute(self, env):
- tty.debug("UnsetEnv: {0}".format(self.name), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"UnsetEnv: {self.name}", level=3)
# Avoid throwing if the variable was not set
env.pop(self.name, None)
class RemoveFlagsEnv(NameValueModifier):
- def execute(self, env):
- tty.debug("RemoveFlagsEnv: {0}-{1}".format(self.name, str(self.value)), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"RemoveFlagsEnv: {self.name}-{str(self.value)}", level=3)
environment_value = env.get(self.name, "")
flags = environment_value.split(self.separator) if environment_value else []
flags = [f for f in flags if f != self.value]
@@ -290,15 +337,15 @@ class RemoveFlagsEnv(NameValueModifier):
class SetPath(NameValueModifier):
- def execute(self, env):
- string_path = concatenate_paths(self.value, separator=self.separator)
- tty.debug("SetPath: {0}={1}".format(self.name, string_path), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ string_path = self.separator.join(str(item) for item in self.value)
+ tty.debug(f"SetPath: {self.name}={string_path}", level=3)
env[self.name] = string_path
class AppendPath(NameValueModifier):
- def execute(self, env):
- tty.debug("AppendPath: {0}+{1}".format(self.name, str(self.value)), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"AppendPath: {self.name}+{str(self.value)}", level=3)
environment_value = env.get(self.name, "")
directories = environment_value.split(self.separator) if environment_value else []
directories.append(path_to_os_path(os.path.normpath(self.value)).pop())
@@ -306,8 +353,8 @@ class AppendPath(NameValueModifier):
class PrependPath(NameValueModifier):
- def execute(self, env):
- tty.debug("PrependPath: {0}+{1}".format(self.name, str(self.value)), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"PrependPath: {self.name}+{str(self.value)}", level=3)
environment_value = env.get(self.name, "")
directories = environment_value.split(self.separator) if environment_value else []
directories = [path_to_os_path(os.path.normpath(self.value)).pop()] + directories
@@ -315,8 +362,8 @@ class PrependPath(NameValueModifier):
class RemovePath(NameValueModifier):
- def execute(self, env):
- tty.debug("RemovePath: {0}-{1}".format(self.name, str(self.value)), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"RemovePath: {self.name}-{str(self.value)}", level=3)
environment_value = env.get(self.name, "")
directories = environment_value.split(self.separator) if environment_value else []
directories = [
@@ -328,8 +375,8 @@ class RemovePath(NameValueModifier):
class DeprioritizeSystemPaths(NameModifier):
- def execute(self, env):
- tty.debug("DeprioritizeSystemPaths: {0}".format(self.name), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"DeprioritizeSystemPaths: {self.name}", level=3)
environment_value = env.get(self.name, "")
directories = environment_value.split(self.separator) if environment_value else []
directories = deprioritize_system_paths(
@@ -339,8 +386,8 @@ class DeprioritizeSystemPaths(NameModifier):
class PruneDuplicatePaths(NameModifier):
- def execute(self, env):
- tty.debug("PruneDuplicatePaths: {0}".format(self.name), level=3)
+ def execute(self, env: MutableMapping[str, str]):
+ tty.debug(f"PruneDuplicatePaths: {self.name}", level=3)
environment_value = env.get(self.name, "")
directories = environment_value.split(self.separator) if environment_value else []
directories = prune_duplicate_paths(
@@ -349,29 +396,22 @@ class PruneDuplicatePaths(NameModifier):
env[self.name] = self.separator.join(directories)
-class EnvironmentModifications(object):
- """Keeps track of requests to modify the current environment.
+class EnvironmentModifications:
+ """Keeps track of requests to modify the current environment."""
- Each call to a method to modify the environment stores the extra
- information on the caller in the request:
-
- * 'filename' : filename of the module where the caller is defined
- * 'lineno': line number where the request occurred
- * 'context' : line of code that issued the request that failed
- """
-
- def __init__(self, other=None, traced=None):
+ def __init__(
+ self, other: Optional["EnvironmentModifications"] = None, traced: Union[None, bool] = None
+ ):
"""Initializes a new instance, copying commands from 'other'
if it is not None.
Args:
- other (EnvironmentModifications): list of environment modifications
- to be extended (optional)
- traced (bool): enable or disable stack trace inspection to log the origin
- of the environment modifications.
+ other: list of environment modifications to be extended (optional)
+ traced: enable or disable stack trace inspection to log the origin
+ of the environment modifications
"""
- self.traced = tracing_enabled if traced is None else bool(traced)
- self.env_modifications = []
+ self.traced = TRACING_ENABLED if traced is None else bool(traced)
+ self.env_modifications: List[Union[NameModifier, NameValueModifier]] = []
if other is not None:
self.extend(other)
@@ -381,179 +421,163 @@ class EnvironmentModifications(object):
def __len__(self):
return len(self.env_modifications)
- def extend(self, other):
+ def extend(self, other: "EnvironmentModifications"):
self._check_other(other)
self.env_modifications.extend(other.env_modifications)
@staticmethod
- def _check_other(other):
+ def _check_other(other: "EnvironmentModifications"):
if not isinstance(other, EnvironmentModifications):
raise TypeError("other must be an instance of EnvironmentModifications")
- def _maybe_trace(self, kwargs):
- """Provide the modification with stack trace info so that we can track its
- origin to find issues in packages. This is very slow and expensive."""
+ def _trace(self) -> Optional[Trace]:
+ """Returns a trace object if tracing is enabled, else None."""
if not self.traced:
- return
+ return None
stack = inspect.stack()
try:
_, filename, lineno, _, context, index = stack[2]
- context = context[index].strip()
+ assert index is not None, "index must be an integer"
+ current_context = context[index].strip() if context is not None else "unknown context"
except Exception:
filename = "unknown file"
- lineno = "unknown line"
- context = "unknown context"
- kwargs.update({"filename": filename, "lineno": lineno, "context": context})
+ lineno = -1
+ current_context = "unknown context"
+
+ return Trace(filename=filename, lineno=lineno, context=current_context)
- def set(self, name, value, **kwargs):
+ def set(self, name: str, value: str, *, force: bool = False):
"""Stores a request to set an environment variable.
Args:
- name: name of the environment variable to be set
+ name: name of the environment variable
value: value of the environment variable
+ force: if True, audit will not consider this modification a warning
"""
- self._maybe_trace(kwargs)
- item = SetEnv(name, value, **kwargs)
+ item = SetEnv(name, value, trace=self._trace(), force=force)
self.env_modifications.append(item)
- def append_flags(self, name, value, sep=" ", **kwargs):
- """
- Stores in the current object a request to append to an env variable
+ def append_flags(self, name: str, value: str, sep: str = " "):
+ """Stores a request to append 'flags' to an environment variable.
Args:
- name: name of the environment variable to be appended to
- value: value to append to the environment variable
- Appends with spaces separating different additions to the variable
+ name: name of the environment variable
+ value: flags to be appended
+ sep: separator for the flags (default: " ")
"""
- self._maybe_trace(kwargs)
- kwargs.update({"separator": sep})
- item = AppendFlagsEnv(name, value, **kwargs)
+ item = AppendFlagsEnv(name, value, separator=sep, trace=self._trace())
self.env_modifications.append(item)
- def unset(self, name, **kwargs):
+ def unset(self, name: str):
"""Stores a request to unset an environment variable.
Args:
- name: name of the environment variable to be unset
+ name: name of the environment variable
"""
- self._maybe_trace(kwargs)
- item = UnsetEnv(name, **kwargs)
+ item = UnsetEnv(name, trace=self._trace())
self.env_modifications.append(item)
- def remove_flags(self, name, value, sep=" ", **kwargs):
- """
- Stores in the current object a request to remove flags from an
- env variable
+ def remove_flags(self, name: str, value: str, sep: str = " "):
+ """Stores a request to remove flags from an environment variable
Args:
- name: name of the environment variable to be removed from
- value: value to remove to the environment variable
- sep: separator to assume for environment variable
+ name: name of the environment variable
+ value: flags to be removed
+ sep: separator for the flags (default: " ")
"""
- self._maybe_trace(kwargs)
- kwargs.update({"separator": sep})
- item = RemoveFlagsEnv(name, value, **kwargs)
+ item = RemoveFlagsEnv(name, value, separator=sep, trace=self._trace())
self.env_modifications.append(item)
- def set_path(self, name, elements, **kwargs):
- """Stores a request to set a path generated from a list.
+ def set_path(self, name: str, elements: List[str], separator: str = os.pathsep):
+ """Stores a request to set an environment variable to a list of paths,
+ separated by a character defined in input.
Args:
- name: name o the environment variable to be set.
- elements: elements of the path to set.
+ name: name of the environment variable
+ elements: ordered list paths
+ separator: separator for the paths (default: os.pathsep)
"""
- self._maybe_trace(kwargs)
- item = SetPath(name, elements, **kwargs)
+ item = SetPath(name, elements, separator=separator, trace=self._trace())
self.env_modifications.append(item)
- def append_path(self, name, path, **kwargs):
- """Stores a request to append a path to a path list.
+ def append_path(self, name: str, path: str, separator: str = os.pathsep):
+ """Stores a request to append a path to list of paths.
Args:
- name: name of the path list in the environment
+ name: name of the environment variable
path: path to be appended
+ separator: separator for the paths (default: os.pathsep)
"""
- self._maybe_trace(kwargs)
- item = AppendPath(name, path, **kwargs)
+ item = AppendPath(name, path, separator=separator, trace=self._trace())
self.env_modifications.append(item)
- def prepend_path(self, name, path, **kwargs):
- """Same as `append_path`, but the path is pre-pended.
+ def prepend_path(self, name: str, path: str, separator: str = os.pathsep):
+ """Stores a request to prepend a path to list of paths.
Args:
- name: name of the path list in the environment
- path: path to be pre-pended
+ name: name of the environment variable
+ path: path to be prepended
+ separator: separator for the paths (default: os.pathsep)
"""
- self._maybe_trace(kwargs)
- item = PrependPath(name, path, **kwargs)
+ item = PrependPath(name, path, separator=separator, trace=self._trace())
self.env_modifications.append(item)
- def remove_path(self, name, path, **kwargs):
- """Stores a request to remove a path from a path list.
+ def remove_path(self, name: str, path: str, separator: str = os.pathsep):
+ """Stores a request to remove a path from a list of paths.
Args:
- name: name of the path list in the environment
+ name: name of the environment variable
path: path to be removed
+ separator: separator for the paths (default: os.pathsep)
"""
- self._maybe_trace(kwargs)
- item = RemovePath(name, path, **kwargs)
+ item = RemovePath(name, path, separator=separator, trace=self._trace())
self.env_modifications.append(item)
- def deprioritize_system_paths(self, name, **kwargs):
+ def deprioritize_system_paths(self, name: str, separator: str = os.pathsep):
"""Stores a request to deprioritize system paths in a path list,
otherwise preserving the order.
Args:
- name: name of the path list in the environment.
+ name: name of the environment variable
+ separator: separator for the paths (default: os.pathsep)
"""
- self._maybe_trace(kwargs)
- item = DeprioritizeSystemPaths(name, **kwargs)
+ item = DeprioritizeSystemPaths(name, separator=separator, trace=self._trace())
self.env_modifications.append(item)
- def prune_duplicate_paths(self, name, **kwargs):
+ def prune_duplicate_paths(self, name: str, separator: str = os.pathsep):
"""Stores a request to remove duplicates from a path list, otherwise
preserving the order.
Args:
- name: name of the path list in the environment.
+ name: name of the environment variable
+ separator: separator for the paths (default: os.pathsep)
"""
- self._maybe_trace(kwargs)
- item = PruneDuplicatePaths(name, **kwargs)
+ item = PruneDuplicatePaths(name, separator=separator, trace=self._trace())
self.env_modifications.append(item)
- def group_by_name(self):
- """Returns a dict of the modifications grouped by variable name.
-
- Returns:
- dict mapping the environment variable name to the modifications to
- be done on it
- """
+ def group_by_name(self) -> Dict[str, ModificationList]:
+ """Returns a dict of the current modifications keyed by variable name."""
modifications = collections.defaultdict(list)
for item in self:
modifications[item.name].append(item)
return modifications
- def is_unset(self, var_name):
+ def is_unset(self, variable_name: str) -> bool:
+ """Returns True if the last modification to a variable is to unset it, False otherwise."""
modifications = self.group_by_name()
- var_updates = modifications.get(var_name, None)
- if not var_updates:
- # We did not explicitly unset it
+ if variable_name not in modifications:
return False
- # The last modification must unset the variable for it to be considered
- # unset
- return type(var_updates[-1]) == UnsetEnv
+ # The last modification must unset the variable for it to be considered unset
+ return isinstance(modifications[variable_name][-1], UnsetEnv)
def clear(self):
- """
- Clears the current list of modifications
- """
+ """Clears the current list of modifications."""
self.env_modifications = []
- def reversed(self):
- """
- Returns the EnvironmentModifications object that will reverse self
+ def reversed(self) -> "EnvironmentModifications":
+ """Returns the EnvironmentModifications object that will reverse self
Only creates reversals for additions to the environment, as reversing
``unset`` and ``remove_path`` modifications is impossible.
@@ -564,54 +588,55 @@ class EnvironmentModifications(object):
rev = EnvironmentModifications()
for envmod in reversed(self.env_modifications):
- if type(envmod) == SetEnv:
- tty.debug("Reversing `Set` environment operation may lose " "original value")
+ if isinstance(envmod, SetEnv):
+ tty.debug("Reversing `Set` environment operation may lose the original value")
rev.unset(envmod.name)
- elif type(envmod) == AppendPath:
+ elif isinstance(envmod, AppendPath):
rev.remove_path(envmod.name, envmod.value)
- elif type(envmod) == PrependPath:
+ elif isinstance(envmod, PrependPath):
rev.remove_path(envmod.name, envmod.value)
- elif type(envmod) == SetPath:
- tty.debug("Reversing `SetPath` environment operation may lose " "original value")
+ elif isinstance(envmod, SetPath):
+ tty.debug("Reversing `SetPath` environment operation may lose the original value")
rev.unset(envmod.name)
- elif type(envmod) == AppendFlagsEnv:
+ elif isinstance(envmod, AppendFlagsEnv):
rev.remove_flags(envmod.name, envmod.value)
else:
- # This is an un-reversable operation
tty.warn(
- "Skipping reversal of unreversable operation"
- "%s %s" % (type(envmod), envmod.name)
+ f"Skipping reversal of unreversable operation {type(envmod)} {envmod.name}"
)
return rev
- def apply_modifications(self, env=None):
- """Applies the modifications and clears the list."""
- # Use os.environ if not specified
- # Do not copy, we want to modify it in place
- if env is None:
- env = os.environ
+ def apply_modifications(self, env: Optional[MutableMapping[str, str]] = None):
+ """Applies the modifications and clears the list.
- modifications = self.group_by_name()
- # Apply modifications one variable at a time
- for name, actions in sorted(modifications.items()):
- for x in actions:
- x.execute(env)
+ Args:
+ env: environment to be modified. If None, os.environ will be used.
+ """
+ env = os.environ if env is None else env
- def shell_modifications(self, shell="sh", explicit=False, env=None):
+ modifications = self.group_by_name()
+ for _, actions in sorted(modifications.items()):
+ for modifier in actions:
+ modifier.execute(env)
+
+ def shell_modifications(
+ self,
+ shell: str = "sh",
+ explicit: bool = False,
+ env: Optional[MutableMapping[str, str]] = None,
+ ) -> str:
"""Return shell code to apply the modifications and clears the list."""
modifications = self.group_by_name()
- if env is None:
- env = os.environ
-
- new_env = env.copy()
+ env = os.environ if env is None else env
+ new_env = dict(env.items())
- for name, actions in sorted(modifications.items()):
- for x in actions:
- x.execute(new_env)
+ for _, actions in sorted(modifications.items()):
+ for modifier in actions:
+ modifier.execute(new_env)
- if "MANPATH" in new_env and not new_env.get("MANPATH").endswith(":"):
+ if "MANPATH" in new_env and not new_env["MANPATH"].endswith(":"):
new_env["MANPATH"] += ":"
cmds = ""
@@ -621,24 +646,25 @@ class EnvironmentModifications(object):
old = env.get(name, None)
if explicit or new != old:
if new is None:
- cmds += _shell_unset_strings[shell].format(name)
+ cmds += _SHELL_UNSET_STRINGS[shell].format(name)
else:
if sys.platform != "win32":
- cmd = _shell_set_strings[shell].format(name, shlex.quote(new_env[name]))
+ cmd = _SHELL_SET_STRINGS[shell].format(name, shlex.quote(new_env[name]))
else:
- cmd = _shell_set_strings[shell].format(name, new_env[name])
+ cmd = _SHELL_SET_STRINGS[shell].format(name, new_env[name])
cmds += cmd
return cmds
@staticmethod
- def from_sourcing_file(filename, *arguments, **kwargs):
- """Constructs an instance of a
- :py:class:`spack.util.environment.EnvironmentModifications` object
- that has the same effect as sourcing a file.
+ def from_sourcing_file(
+ filename: Path, *arguments: str, **kwargs: Any
+ ) -> "EnvironmentModifications":
+ """Returns the environment modifications that have the same effect as
+ sourcing the input file.
Args:
- filename (str): the file to be sourced
- *arguments (list): arguments to pass on the command line
+ filename: the file to be sourced
+ *arguments: arguments to pass on the command line
Keyword Args:
shell (str): the shell to use (default: ``bash``)
@@ -655,10 +681,10 @@ class EnvironmentModifications(object):
clean (bool): in addition to removing empty entries,
also remove duplicate entries (default: False).
"""
- tty.debug("EnvironmentModifications.from_sourcing_file: {0}".format(filename))
+ tty.debug(f"EnvironmentModifications.from_sourcing_file: {filename}")
# Check if the file actually exists
if not os.path.isfile(filename):
- msg = "Trying to source non-existing file: {0}".format(filename)
+ msg = f"Trying to source non-existing file: {filename}"
raise RuntimeError(msg)
# Prepare include and exclude lists of environment variable names
@@ -707,16 +733,15 @@ class EnvironmentModifications(object):
return EnvironmentModifications.from_environment_diff(before, after, clean)
@staticmethod
- def from_environment_diff(before, after, clean=False):
- """Constructs an instance of a
- :py:class:`spack.util.environment.EnvironmentModifications` object
- from the diff of two dictionaries.
+ def from_environment_diff(
+ before: MutableMapping[str, str], after: MutableMapping[str, str], clean: bool = False
+ ) -> "EnvironmentModifications":
+ """Constructs the environment modifications from the diff of two environments.
Args:
- before (dict): environment before the modifications are applied
- after (dict): environment after the modifications are applied
- clean (bool): in addition to removing empty entries, also remove
- duplicate entries
+ before: environment before the modifications are applied
+ after: environment after the modifications are applied
+ clean: in addition to removing empty entries, also remove duplicate entries
"""
# Fill the EnvironmentModifications instance
env = EnvironmentModifications()
@@ -743,22 +768,22 @@ class EnvironmentModifications(object):
# Add variables to env.
# Assume that variables with 'PATH' in the name or that contain
# separators like ':' or ';' are more likely to be paths
- for x in new_variables:
- sep = return_separator_if_any(after[x])
+ for variable_name in new_variables:
+ sep = return_separator_if_any(after[variable_name])
if sep:
- env.prepend_path(x, after[x], separator=sep)
- elif "PATH" in x:
- env.prepend_path(x, after[x])
+ env.prepend_path(variable_name, after[variable_name], separator=sep)
+ elif "PATH" in variable_name:
+ env.prepend_path(variable_name, after[variable_name])
else:
# We just need to set the variable to the new value
- env.set(x, after[x])
+ env.set(variable_name, after[variable_name])
- for x in unset_variables:
- env.unset(x)
+ for variable_name in unset_variables:
+ env.unset(variable_name)
- for x in modified_variables:
- value_before = before[x]
- value_after = after[x]
+ for variable_name in modified_variables:
+ value_before = before[variable_name]
+ value_after = after[variable_name]
sep = return_separator_if_any(value_before, value_after)
if sep:
before_list = value_before.split(sep)
@@ -786,12 +811,12 @@ class EnvironmentModifications(object):
end = after_list.index(remaining_list[-1])
search = sep.join(after_list[start : end + 1])
except IndexError:
- env.prepend_path(x, value_after)
+ env.prepend_path(variable_name, value_after)
continue
if search not in value_before:
# We just need to set the variable to the new value
- env.prepend_path(x, value_after)
+ env.prepend_path(variable_name, value_after)
else:
try:
prepend_list = after_list[:start]
@@ -804,52 +829,39 @@ class EnvironmentModifications(object):
append_list = []
for item in remove_list:
- env.remove_path(x, item)
+ env.remove_path(variable_name, item)
for item in append_list:
- env.append_path(x, item)
+ env.append_path(variable_name, item)
for item in prepend_list:
- env.prepend_path(x, item)
+ env.prepend_path(variable_name, item)
else:
# We just need to set the variable to the new value
- env.set(x, value_after)
+ env.set(variable_name, value_after)
return env
-def concatenate_paths(paths, separator=os.pathsep):
- """Concatenates an iterable of paths into a string of paths separated by
- separator, defaulting to colon.
-
- Args:
- paths: iterable of paths
- separator: the separator to use, default ';' windows, ':' otherwise
-
- Returns:
- string
- """
- return separator.join(str(item) for item in paths)
-
-
-def set_or_unset_not_first(variable, changes, errstream):
+def _set_or_unset_not_first(
+ variable: str, changes: ModificationList, errstream: Callable[[str], None]
+):
"""Check if we are going to set or unset something after other
modifications have already been requested.
"""
indexes = [
ii
for ii, item in enumerate(changes)
- if ii != 0 and not item.args.get("force", False) and type(item) in [SetEnv, UnsetEnv]
+ if ii != 0 and isinstance(item, (SetEnv, UnsetEnv)) and not getattr(item, "force", False)
]
if indexes:
- good = "\t \t{context} at {filename}:{lineno}"
- nogood = "\t--->\t{context} at {filename}:{lineno}"
- message = "Suspicious requests to set or unset '{var}' found"
- errstream(message.format(var=variable))
- for ii, item in enumerate(changes):
- print_format = nogood if ii in indexes else good
- errstream(print_format.format(**item.args))
+ good = "\t \t{}"
+ nogood = "\t--->\t{}"
+ errstream(f"Different requests to set/unset '{variable}' have been found")
+ for idx, item in enumerate(changes):
+ print_format = nogood if idx in indexes else good
+ errstream(print_format.format(item.trace))
-def validate(env, errstream):
+def validate(env: EnvironmentModifications, errstream: Callable[[str], None]):
"""Validates the environment modifications to check for the presence of
suspicious patterns. Prompts a warning for everything that was found.
@@ -858,27 +870,30 @@ def validate(env, errstream):
Args:
env: list of environment modifications
+ errstream: callable to log error messages
"""
if not env.traced:
return
modifications = env.group_by_name()
for variable, list_of_changes in sorted(modifications.items()):
- set_or_unset_not_first(variable, list_of_changes, errstream)
+ _set_or_unset_not_first(variable, list_of_changes, errstream)
-def inspect_path(root, inspections, exclude=None):
+def inspect_path(
+ root: Path,
+ inspections: MutableMapping[str, List[str]],
+ exclude: Optional[Callable[[Path], bool]] = None,
+) -> EnvironmentModifications:
"""Inspects ``root`` to search for the subdirectories in ``inspections``.
Adds every path found to a list of prepend-path commands and returns it.
Args:
- root (str): absolute path where to search for subdirectories
-
- inspections (dict): maps relative paths to a list of environment
+ root: absolute path where to search for subdirectories
+ inspections: maps relative paths to a list of environment
variables that will be modified if the path exists. The
modifications are not performed immediately, but stored in a
command object that is returned to client
-
- exclude (typing.Callable): optional callable. If present it must accept an
+ exclude: optional callable. If present it must accept an
absolute path and return True if it should be excluded from the
inspection
@@ -901,10 +916,6 @@ def inspect_path(root, inspections, exclude=None):
# Eventually execute the commands
env.apply_modifications()
-
- Returns:
- instance of EnvironmentModifications containing the requested
- modifications
"""
if exclude is None:
exclude = lambda x: False
@@ -922,7 +933,7 @@ def inspect_path(root, inspections, exclude=None):
@contextlib.contextmanager
-def preserve_environment(*variables):
+def preserve_environment(*variables: str):
"""Ensures that the value of the environment variables passed as
arguments is the same before entering to the context manager and after
exiting it.
@@ -931,7 +942,7 @@ def preserve_environment(*variables):
explicitly unset on exit.
Args:
- variables (list): list of environment variables to be preserved
+ variables: list of environment variables to be preserved
"""
cache = {}
for var in variables:
@@ -959,7 +970,9 @@ def preserve_environment(*variables):
del os.environ[var]
-def environment_after_sourcing_files(*files, **kwargs):
+def environment_after_sourcing_files(
+ *files: Union[Path, Tuple[str, ...]], **kwargs
+) -> Dict[str, str]:
"""Returns a dictionary with the environment that one would have
after sourcing the files passed as argument.
@@ -986,7 +999,7 @@ def environment_after_sourcing_files(*files, **kwargs):
suppress_output = kwargs.get("suppress_output", "&> /dev/null")
concatenate_on_success = kwargs.get("concatenate_on_success", "&&")
- shell = executable.Executable(" ".join([shell_cmd, shell_options]))
+ shell = Executable(" ".join([shell_cmd, shell_options]))
def _source_single_file(file_and_args, environment):
source_file = [source_command]
@@ -996,31 +1009,33 @@ def environment_after_sourcing_files(*files, **kwargs):
# If the environment contains 'python' use it, if not
# go with sys.executable. Below we just need a working
# Python interpreter, not necessarily sys.executable.
- python_cmd = executable.which("python3", "python", "python2")
+ python_cmd = which("python3", "python", "python2")
python_cmd = python_cmd.path if python_cmd else sys.executable
dump_cmd = "import os, json; print(json.dumps(dict(os.environ)))"
- dump_environment = python_cmd + ' -E -c "{0}"'.format(dump_cmd)
+ dump_environment_cmd = python_cmd + f' -E -c "{dump_cmd}"'
# Try to source the file
source_file_arguments = " ".join(
- [source_file, suppress_output, concatenate_on_success, dump_environment]
+ [source_file, suppress_output, concatenate_on_success, dump_environment_cmd]
)
output = shell(source_file_arguments, output=str, env=environment, ignore_quotes=True)
return json.loads(output)
current_environment = kwargs.get("env", dict(os.environ))
- for f in files:
+ for file in files:
# Normalize the input to the helper function
- if isinstance(f, str):
- f = [f]
+ if isinstance(file, str):
+ file = (file,)
- current_environment = _source_single_file(f, environment=current_environment)
+ current_environment = _source_single_file(file, environment=current_environment)
return current_environment
-def sanitize(environment, exclude, include):
+def sanitize(
+ environment: MutableMapping[str, str], exclude: List[str], include: List[str]
+) -> Dict[str, str]:
"""Returns a copy of the input dictionary where all the keys that
match an excluded pattern and don't match an included pattern are
removed.