summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/spack/llnl/util/tty/log.py146
-rw-r--r--lib/spack/spack/build_environment.py56
2 files changed, 68 insertions, 134 deletions
diff --git a/lib/spack/llnl/util/tty/log.py b/lib/spack/llnl/util/tty/log.py
index 9d346fdabb..12d79e8d04 100644
--- a/lib/spack/llnl/util/tty/log.py
+++ b/lib/spack/llnl/util/tty/log.py
@@ -10,7 +10,6 @@ import ctypes
import errno
import io
import multiprocessing
-import multiprocessing.connection
import os
import re
import select
@@ -19,9 +18,10 @@ import sys
import threading
import traceback
from contextlib import contextmanager
+from multiprocessing.connection import Connection
from threading import Thread
from types import ModuleType
-from typing import Optional
+from typing import Callable, Optional
import llnl.util.tty as tty
@@ -345,48 +345,6 @@ class FileWrapper:
self.file.close()
-class MultiProcessFd:
- """Return an object which stores a file descriptor and can be passed as an
- argument to a function run with ``multiprocessing.Process``, such that
- the file descriptor is available in the subprocess. It provides access via
- the `fd` property.
-
- This object takes control over the associated FD: files opened from this
- using `fdopen` need to use `closefd=False`.
- """
-
- # As for why you have to fdopen(..., closefd=False): when a
- # multiprocessing.connection.Connection object stores an fd, it assumes
- # control over it, and will attempt to close it when gc'ed during __del__;
- # if you fdopen(multiprocessfd.fd, closefd=True) then the resulting file
- # will also assume control, and you can see warnings when there is an
- # attempted double close.
-
- def __init__(self, fd):
- self._connection = None
- self._fd = None
- if sys.version_info >= (3, 8):
- self._connection = multiprocessing.connection.Connection(fd)
- else:
- self._fd = fd
-
- @property
- def fd(self):
- if self._connection:
- return self._connection.fileno()
- else:
- return self._fd
-
- def close(self):
- """Rather than `.close()`ing any file opened from the associated
- `.fd`, the `MultiProcessFd` should be closed with this.
- """
- if self._connection:
- self._connection.close()
- else:
- os.close(self._fd)
-
-
@contextmanager
def replace_environment(env):
"""Replace the current environment (`os.environ`) with `env`.
@@ -545,9 +503,7 @@ class nixlog:
self._saved_debug = tty._debug
# OS-level pipe for redirecting output to logger
- read_fd, write_fd = os.pipe()
-
- read_multiprocess_fd = MultiProcessFd(read_fd)
+ read_fd, write_fd = multiprocessing.Pipe(duplex=False)
# Multiprocessing pipe for communication back from the daemon
# Currently only used to save echo value between uses
@@ -556,10 +512,10 @@ class nixlog:
# Sets a daemon that writes to file what it reads from a pipe
try:
# need to pass this b/c multiprocessing closes stdin in child.
- input_multiprocess_fd = None
+ input_fd = None
try:
if sys.stdin.isatty():
- input_multiprocess_fd = MultiProcessFd(os.dup(sys.stdin.fileno()))
+ input_fd = Connection(os.dup(sys.stdin.fileno()))
except BaseException:
# just don't forward input if this fails
pass
@@ -568,8 +524,8 @@ class nixlog:
self.process = multiprocessing.Process(
target=_writer_daemon,
args=(
- input_multiprocess_fd,
- read_multiprocess_fd,
+ input_fd,
+ read_fd,
write_fd,
self.echo,
self.log_file,
@@ -581,9 +537,9 @@ class nixlog:
self.process.start()
finally:
- if input_multiprocess_fd:
- input_multiprocess_fd.close()
- read_multiprocess_fd.close()
+ if input_fd:
+ input_fd.close()
+ read_fd.close()
# Flush immediately before redirecting so that anything buffered
# goes to the original stream
@@ -601,9 +557,9 @@ class nixlog:
self._saved_stderr = os.dup(sys.stderr.fileno())
# redirect to the pipe we created above
- os.dup2(write_fd, sys.stdout.fileno())
- os.dup2(write_fd, sys.stderr.fileno())
- os.close(write_fd)
+ os.dup2(write_fd.fileno(), sys.stdout.fileno())
+ os.dup2(write_fd.fileno(), sys.stderr.fileno())
+ write_fd.close()
else:
# Handle I/O the Python way. This won't redirect lower-level
@@ -616,7 +572,7 @@ class nixlog:
self._saved_stderr = sys.stderr
# create a file object for the pipe; redirect to it.
- pipe_fd_out = os.fdopen(write_fd, "w")
+ pipe_fd_out = os.fdopen(write_fd.fileno(), "w", closefd=False)
sys.stdout = pipe_fd_out
sys.stderr = pipe_fd_out
@@ -865,14 +821,14 @@ class winlog:
def _writer_daemon(
- stdin_multiprocess_fd,
- read_multiprocess_fd,
- write_fd,
- echo,
- log_file_wrapper,
- control_pipe,
- filter_fn,
-):
+ stdin_fd: Optional[Connection],
+ read_fd: Connection,
+ write_fd: Connection,
+ echo: bool,
+ log_file_wrapper: FileWrapper,
+ control_fd: Connection,
+ filter_fn: Optional[Callable[[str], str]],
+) -> None:
"""Daemon used by ``log_output`` to write to a log file and to ``stdout``.
The daemon receives output from the parent process and writes it both
@@ -909,43 +865,37 @@ def _writer_daemon(
``StringIO`` in the parent. This is mainly for testing.
Arguments:
- stdin_multiprocess_fd (int): input from the terminal
- read_multiprocess_fd (int): pipe for reading from parent's redirected
- stdout
- echo (bool): initial echo setting -- controlled by user and
- preserved across multiple writer daemons
- log_file_wrapper (FileWrapper): file to log all output
- control_pipe (Pipe): multiprocessing pipe on which to send control
- information to the parent
- filter_fn (callable, optional): function to filter each line of output
+ stdin_fd: optional input from the terminal
+ read_fd: pipe for reading from parent's redirected stdout
+ echo: initial echo setting -- controlled by user and preserved across multiple writer
+ daemons
+ log_file_wrapper: file to log all output
+ control_pipe: multiprocessing pipe on which to send control information to the parent
+ filter_fn: optional function to filter each line of output
"""
- # If this process was forked, then it will inherit file descriptors from
- # the parent process. This process depends on closing all instances of
- # write_fd to terminate the reading loop, so we close the file descriptor
- # here. Forking is the process spawning method everywhere except Mac OS
- # for Python >= 3.8 and on Windows
- if sys.version_info < (3, 8) or sys.platform != "darwin":
- os.close(write_fd)
+ # This process depends on closing all instances of write_pipe to terminate the reading loop
+ write_fd.close()
# 1. Use line buffering (3rd param = 1) since Python 3 has a bug
# that prevents unbuffered text I/O.
# 2. Python 3.x before 3.7 does not open with UTF-8 encoding by default
- in_pipe = os.fdopen(read_multiprocess_fd.fd, "r", 1, encoding="utf-8", closefd=False)
+ # 3. closefd=False because Connection has "ownership"
+ read_file = os.fdopen(read_fd.fileno(), "r", 1, encoding="utf-8", closefd=False)
- if stdin_multiprocess_fd:
- stdin = os.fdopen(stdin_multiprocess_fd.fd, closefd=False)
+ if stdin_fd:
+ stdin_file = os.fdopen(stdin_fd.fileno(), closefd=False)
else:
- stdin = None
+ stdin_file = None
# list of streams to select from
- istreams = [in_pipe, stdin] if stdin else [in_pipe]
+ istreams = [read_file, stdin_file] if stdin_file else [read_file]
force_echo = False # parent can force echo for certain output
log_file = log_file_wrapper.unwrap()
try:
- with keyboard_input(stdin) as kb:
+ with keyboard_input(stdin_file) as kb:
while True:
# fix the terminal settings if we recently came to
# the foreground
@@ -958,12 +908,12 @@ def _writer_daemon(
# Allow user to toggle echo with 'v' key.
# Currently ignores other chars.
# only read stdin if we're in the foreground
- if stdin in rlist and not _is_background_tty(stdin):
+ if stdin_file and stdin_file in rlist and not _is_background_tty(stdin_file):
# it's possible to be backgrounded between the above
# check and the read, so we ignore SIGTTIN here.
with ignore_signal(signal.SIGTTIN):
try:
- if stdin.read(1) == "v":
+ if stdin_file.read(1) == "v":
echo = not echo
except IOError as e:
# If SIGTTIN is ignored, the system gives EIO
@@ -972,13 +922,13 @@ def _writer_daemon(
if e.errno != errno.EIO:
raise
- if in_pipe in rlist:
+ if read_file in rlist:
line_count = 0
try:
while line_count < 100:
# Handle output from the calling process.
try:
- line = _retry(in_pipe.readline)()
+ line = _retry(read_file.readline)()
except UnicodeDecodeError:
# installs like --test=root gpgme produce non-UTF8 logs
line = "<line lost: output was not encoded as UTF-8>\n"
@@ -1007,7 +957,7 @@ def _writer_daemon(
if xoff in controls:
force_echo = False
- if not _input_available(in_pipe):
+ if not _input_available(read_file):
break
finally:
if line_count > 0:
@@ -1022,14 +972,14 @@ def _writer_daemon(
finally:
# send written data back to parent if we used a StringIO
if isinstance(log_file, io.StringIO):
- control_pipe.send(log_file.getvalue())
+ control_fd.send(log_file.getvalue())
log_file_wrapper.close()
- read_multiprocess_fd.close()
- if stdin_multiprocess_fd:
- stdin_multiprocess_fd.close()
+ read_fd.close()
+ if stdin_fd:
+ stdin_fd.close()
# send echo value back to the parent so it can be preserved.
- control_pipe.send(echo)
+ control_fd.send(echo)
def _retry(function):
diff --git a/lib/spack/spack/build_environment.py b/lib/spack/spack/build_environment.py
index e620357022..33586eccde 100644
--- a/lib/spack/spack/build_environment.py
+++ b/lib/spack/spack/build_environment.py
@@ -44,6 +44,7 @@ import types
from collections import defaultdict
from enum import Flag, auto
from itertools import chain
+from multiprocessing.connection import Connection
from typing import Callable, Dict, List, Optional, Set, Tuple
import archspec.cpu
@@ -54,7 +55,6 @@ from llnl.util.filesystem import join_path
from llnl.util.lang import dedupe, stable_partition
from llnl.util.symlink import symlink
from llnl.util.tty.color import cescape, colorize
-from llnl.util.tty.log import MultiProcessFd
import spack.build_systems._checks
import spack.build_systems.cmake
@@ -1143,10 +1143,10 @@ def _setup_pkg_and_run(
serialized_pkg: "spack.subprocess_context.PackageInstallContext",
function: Callable,
kwargs: Dict,
- write_pipe: multiprocessing.connection.Connection,
- input_multiprocess_fd: Optional[MultiProcessFd],
- jsfd1: Optional[MultiProcessFd],
- jsfd2: Optional[MultiProcessFd],
+ write_pipe: Connection,
+ input_pipe: Optional[Connection],
+ jsfd1: Optional[Connection],
+ jsfd2: Optional[Connection],
):
"""Main entry point in the child process for Spack builds.
@@ -1188,13 +1188,12 @@ def _setup_pkg_and_run(
context: str = kwargs.get("context", "build")
try:
- # We are in the child process. Python sets sys.stdin to
- # open(os.devnull) to prevent our process and its parent from
- # simultaneously reading from the original stdin. But, we assume
- # that the parent process is not going to read from it till we
- # are done with the child, so we undo Python's precaution.
- if input_multiprocess_fd is not None:
- sys.stdin = os.fdopen(input_multiprocess_fd.fd, closefd=False)
+ # We are in the child process. Python sets sys.stdin to open(os.devnull) to prevent our
+ # process and its parent from simultaneously reading from the original stdin. But, we
+ # assume that the parent process is not going to read from it till we are done with the
+ # child, so we undo Python's precaution. closefd=False since Connection has ownership.
+ if input_pipe is not None:
+ sys.stdin = os.fdopen(input_pipe.fileno(), closefd=False)
pkg = serialized_pkg.restore()
@@ -1263,8 +1262,8 @@ def _setup_pkg_and_run(
finally:
write_pipe.close()
- if input_multiprocess_fd is not None:
- input_multiprocess_fd.close()
+ if input_pipe is not None:
+ input_pipe.close()
def start_build_process(pkg, function, kwargs):
@@ -1291,23 +1290,9 @@ def start_build_process(pkg, function, kwargs):
If something goes wrong, the child process catches the error and
passes it to the parent wrapped in a ChildError. The parent is
expected to handle (or re-raise) the ChildError.
-
- This uses `multiprocessing.Process` to create the child process. The
- mechanism used to create the process differs on different operating
- systems and for different versions of Python. In some cases "fork"
- is used (i.e. the "fork" system call) and some cases it starts an
- entirely new Python interpreter process (in the docs this is referred
- to as the "spawn" start method). Breaking it down by OS:
-
- - Linux always uses fork.
- - Mac OS uses fork before Python 3.8 and "spawn" for 3.8 and after.
- - Windows always uses the "spawn" start method.
-
- For more information on `multiprocessing` child process creation
- mechanisms, see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
"""
read_pipe, write_pipe = multiprocessing.Pipe(duplex=False)
- input_multiprocess_fd = None
+ input_fd = None
jobserver_fd1 = None
jobserver_fd2 = None
@@ -1316,14 +1301,13 @@ def start_build_process(pkg, function, kwargs):
try:
# Forward sys.stdin when appropriate, to allow toggling verbosity
if sys.platform != "win32" and sys.stdin.isatty() and hasattr(sys.stdin, "fileno"):
- input_fd = os.dup(sys.stdin.fileno())
- input_multiprocess_fd = MultiProcessFd(input_fd)
+ input_fd = Connection(os.dup(sys.stdin.fileno()))
mflags = os.environ.get("MAKEFLAGS", False)
if mflags:
m = re.search(r"--jobserver-[^=]*=(\d),(\d)", mflags)
if m:
- jobserver_fd1 = MultiProcessFd(int(m.group(1)))
- jobserver_fd2 = MultiProcessFd(int(m.group(2)))
+ jobserver_fd1 = Connection(int(m.group(1)))
+ jobserver_fd2 = Connection(int(m.group(2)))
p = multiprocessing.Process(
target=_setup_pkg_and_run,
@@ -1332,7 +1316,7 @@ def start_build_process(pkg, function, kwargs):
function,
kwargs,
write_pipe,
- input_multiprocess_fd,
+ input_fd,
jobserver_fd1,
jobserver_fd2,
),
@@ -1352,8 +1336,8 @@ def start_build_process(pkg, function, kwargs):
finally:
# Close the input stream in the parent process
- if input_multiprocess_fd is not None:
- input_multiprocess_fd.close()
+ if input_fd is not None:
+ input_fd.close()
def exitcode_msg(p):
typ = "exit" if p.exitcode >= 0 else "signal"