summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHarmen Stoppels <me@harmenstoppels.nl>2024-10-23 15:06:13 +0200
committerTodd Gamblin <tgamblin@llnl.gov>2024-10-25 10:16:44 -0700
commitb63cbe4e6e59595b831803f277c8a49b1e4f9d72 (patch)
tree17644d57b1b66460fc5335aa5b6a8bc7642ee99d
parentef220daaca8eff90cd2001ff179b8cb37fab0390 (diff)
downloadspack-b63cbe4e6e59595b831803f277c8a49b1e4f9d72.tar.gz
spack-b63cbe4e6e59595b831803f277c8a49b1e4f9d72.tar.bz2
spack-b63cbe4e6e59595b831803f277c8a49b1e4f9d72.tar.xz
spack-b63cbe4e6e59595b831803f277c8a49b1e4f9d72.zip
Replace MultiProcessFd with Connection objects
Connection objects are Python version, platform and multiprocessing start method independent, so better to use those than a mix of plain file descriptors and inadequate guesses in the child process whether it was forked or not. This also allows us to delete the now redundant MultiProcessFd class, hopefully making things a bit easier to follow.
-rw-r--r--lib/spack/llnl/util/tty/log.py146
-rw-r--r--lib/spack/spack/build_environment.py56
2 files changed, 68 insertions, 134 deletions
diff --git a/lib/spack/llnl/util/tty/log.py b/lib/spack/llnl/util/tty/log.py
index 9d346fdabb..12d79e8d04 100644
--- a/lib/spack/llnl/util/tty/log.py
+++ b/lib/spack/llnl/util/tty/log.py
@@ -10,7 +10,6 @@ import ctypes
import errno
import io
import multiprocessing
-import multiprocessing.connection
import os
import re
import select
@@ -19,9 +18,10 @@ import sys
import threading
import traceback
from contextlib import contextmanager
+from multiprocessing.connection import Connection
from threading import Thread
from types import ModuleType
-from typing import Optional
+from typing import Callable, Optional
import llnl.util.tty as tty
@@ -345,48 +345,6 @@ class FileWrapper:
self.file.close()
-class MultiProcessFd:
- """Return an object which stores a file descriptor and can be passed as an
- argument to a function run with ``multiprocessing.Process``, such that
- the file descriptor is available in the subprocess. It provides access via
- the `fd` property.
-
- This object takes control over the associated FD: files opened from this
- using `fdopen` need to use `closefd=False`.
- """
-
- # As for why you have to fdopen(..., closefd=False): when a
- # multiprocessing.connection.Connection object stores an fd, it assumes
- # control over it, and will attempt to close it when gc'ed during __del__;
- # if you fdopen(multiprocessfd.fd, closefd=True) then the resulting file
- # will also assume control, and you can see warnings when there is an
- # attempted double close.
-
- def __init__(self, fd):
- self._connection = None
- self._fd = None
- if sys.version_info >= (3, 8):
- self._connection = multiprocessing.connection.Connection(fd)
- else:
- self._fd = fd
-
- @property
- def fd(self):
- if self._connection:
- return self._connection.fileno()
- else:
- return self._fd
-
- def close(self):
- """Rather than `.close()`ing any file opened from the associated
- `.fd`, the `MultiProcessFd` should be closed with this.
- """
- if self._connection:
- self._connection.close()
- else:
- os.close(self._fd)
-
-
@contextmanager
def replace_environment(env):
"""Replace the current environment (`os.environ`) with `env`.
@@ -545,9 +503,7 @@ class nixlog:
self._saved_debug = tty._debug
# OS-level pipe for redirecting output to logger
- read_fd, write_fd = os.pipe()
-
- read_multiprocess_fd = MultiProcessFd(read_fd)
+ read_fd, write_fd = multiprocessing.Pipe(duplex=False)
# Multiprocessing pipe for communication back from the daemon
# Currently only used to save echo value between uses
@@ -556,10 +512,10 @@ class nixlog:
# Sets a daemon that writes to file what it reads from a pipe
try:
# need to pass this b/c multiprocessing closes stdin in child.
- input_multiprocess_fd = None
+ input_fd = None
try:
if sys.stdin.isatty():
- input_multiprocess_fd = MultiProcessFd(os.dup(sys.stdin.fileno()))
+ input_fd = Connection(os.dup(sys.stdin.fileno()))
except BaseException:
# just don't forward input if this fails
pass
@@ -568,8 +524,8 @@ class nixlog:
self.process = multiprocessing.Process(
target=_writer_daemon,
args=(
- input_multiprocess_fd,
- read_multiprocess_fd,
+ input_fd,
+ read_fd,
write_fd,
self.echo,
self.log_file,
@@ -581,9 +537,9 @@ class nixlog:
self.process.start()
finally:
- if input_multiprocess_fd:
- input_multiprocess_fd.close()
- read_multiprocess_fd.close()
+ if input_fd:
+ input_fd.close()
+ read_fd.close()
# Flush immediately before redirecting so that anything buffered
# goes to the original stream
@@ -601,9 +557,9 @@ class nixlog:
self._saved_stderr = os.dup(sys.stderr.fileno())
# redirect to the pipe we created above
- os.dup2(write_fd, sys.stdout.fileno())
- os.dup2(write_fd, sys.stderr.fileno())
- os.close(write_fd)
+ os.dup2(write_fd.fileno(), sys.stdout.fileno())
+ os.dup2(write_fd.fileno(), sys.stderr.fileno())
+ write_fd.close()
else:
# Handle I/O the Python way. This won't redirect lower-level
@@ -616,7 +572,7 @@ class nixlog:
self._saved_stderr = sys.stderr
# create a file object for the pipe; redirect to it.
- pipe_fd_out = os.fdopen(write_fd, "w")
+ pipe_fd_out = os.fdopen(write_fd.fileno(), "w", closefd=False)
sys.stdout = pipe_fd_out
sys.stderr = pipe_fd_out
@@ -865,14 +821,14 @@ class winlog:
def _writer_daemon(
- stdin_multiprocess_fd,
- read_multiprocess_fd,
- write_fd,
- echo,
- log_file_wrapper,
- control_pipe,
- filter_fn,
-):
+ stdin_fd: Optional[Connection],
+ read_fd: Connection,
+ write_fd: Connection,
+ echo: bool,
+ log_file_wrapper: FileWrapper,
+ control_fd: Connection,
+ filter_fn: Optional[Callable[[str], str]],
+) -> None:
"""Daemon used by ``log_output`` to write to a log file and to ``stdout``.
The daemon receives output from the parent process and writes it both
@@ -909,43 +865,37 @@ def _writer_daemon(
``StringIO`` in the parent. This is mainly for testing.
Arguments:
- stdin_multiprocess_fd (int): input from the terminal
- read_multiprocess_fd (int): pipe for reading from parent's redirected
- stdout
- echo (bool): initial echo setting -- controlled by user and
- preserved across multiple writer daemons
- log_file_wrapper (FileWrapper): file to log all output
- control_pipe (Pipe): multiprocessing pipe on which to send control
- information to the parent
- filter_fn (callable, optional): function to filter each line of output
+ stdin_fd: optional input from the terminal
+ read_fd: pipe for reading from parent's redirected stdout
+ echo: initial echo setting -- controlled by user and preserved across multiple writer
+ daemons
+ log_file_wrapper: file to log all output
+ control_pipe: multiprocessing pipe on which to send control information to the parent
+ filter_fn: optional function to filter each line of output
"""
- # If this process was forked, then it will inherit file descriptors from
- # the parent process. This process depends on closing all instances of
- # write_fd to terminate the reading loop, so we close the file descriptor
- # here. Forking is the process spawning method everywhere except Mac OS
- # for Python >= 3.8 and on Windows
- if sys.version_info < (3, 8) or sys.platform != "darwin":
- os.close(write_fd)
+ # This process depends on closing all instances of write_pipe to terminate the reading loop
+ write_fd.close()
# 1. Use line buffering (3rd param = 1) since Python 3 has a bug
# that prevents unbuffered text I/O.
# 2. Python 3.x before 3.7 does not open with UTF-8 encoding by default
- in_pipe = os.fdopen(read_multiprocess_fd.fd, "r", 1, encoding="utf-8", closefd=False)
+ # 3. closefd=False because Connection has "ownership"
+ read_file = os.fdopen(read_fd.fileno(), "r", 1, encoding="utf-8", closefd=False)
- if stdin_multiprocess_fd:
- stdin = os.fdopen(stdin_multiprocess_fd.fd, closefd=False)
+ if stdin_fd:
+ stdin_file = os.fdopen(stdin_fd.fileno(), closefd=False)
else:
- stdin = None
+ stdin_file = None
# list of streams to select from
- istreams = [in_pipe, stdin] if stdin else [in_pipe]
+ istreams = [read_file, stdin_file] if stdin_file else [read_file]
force_echo = False # parent can force echo for certain output
log_file = log_file_wrapper.unwrap()
try:
- with keyboard_input(stdin) as kb:
+ with keyboard_input(stdin_file) as kb:
while True:
# fix the terminal settings if we recently came to
# the foreground
@@ -958,12 +908,12 @@ def _writer_daemon(
# Allow user to toggle echo with 'v' key.
# Currently ignores other chars.
# only read stdin if we're in the foreground
- if stdin in rlist and not _is_background_tty(stdin):
+ if stdin_file and stdin_file in rlist and not _is_background_tty(stdin_file):
# it's possible to be backgrounded between the above
# check and the read, so we ignore SIGTTIN here.
with ignore_signal(signal.SIGTTIN):
try:
- if stdin.read(1) == "v":
+ if stdin_file.read(1) == "v":
echo = not echo
except IOError as e:
# If SIGTTIN is ignored, the system gives EIO
@@ -972,13 +922,13 @@ def _writer_daemon(
if e.errno != errno.EIO:
raise
- if in_pipe in rlist:
+ if read_file in rlist:
line_count = 0
try:
while line_count < 100:
# Handle output from the calling process.
try:
- line = _retry(in_pipe.readline)()
+ line = _retry(read_file.readline)()
except UnicodeDecodeError:
# installs like --test=root gpgme produce non-UTF8 logs
line = "<line lost: output was not encoded as UTF-8>\n"
@@ -1007,7 +957,7 @@ def _writer_daemon(
if xoff in controls:
force_echo = False
- if not _input_available(in_pipe):
+ if not _input_available(read_file):
break
finally:
if line_count > 0:
@@ -1022,14 +972,14 @@ def _writer_daemon(
finally:
# send written data back to parent if we used a StringIO
if isinstance(log_file, io.StringIO):
- control_pipe.send(log_file.getvalue())
+ control_fd.send(log_file.getvalue())
log_file_wrapper.close()
- read_multiprocess_fd.close()
- if stdin_multiprocess_fd:
- stdin_multiprocess_fd.close()
+ read_fd.close()
+ if stdin_fd:
+ stdin_fd.close()
# send echo value back to the parent so it can be preserved.
- control_pipe.send(echo)
+ control_fd.send(echo)
def _retry(function):
diff --git a/lib/spack/spack/build_environment.py b/lib/spack/spack/build_environment.py
index e620357022..33586eccde 100644
--- a/lib/spack/spack/build_environment.py
+++ b/lib/spack/spack/build_environment.py
@@ -44,6 +44,7 @@ import types
from collections import defaultdict
from enum import Flag, auto
from itertools import chain
+from multiprocessing.connection import Connection
from typing import Callable, Dict, List, Optional, Set, Tuple
import archspec.cpu
@@ -54,7 +55,6 @@ from llnl.util.filesystem import join_path
from llnl.util.lang import dedupe, stable_partition
from llnl.util.symlink import symlink
from llnl.util.tty.color import cescape, colorize
-from llnl.util.tty.log import MultiProcessFd
import spack.build_systems._checks
import spack.build_systems.cmake
@@ -1143,10 +1143,10 @@ def _setup_pkg_and_run(
serialized_pkg: "spack.subprocess_context.PackageInstallContext",
function: Callable,
kwargs: Dict,
- write_pipe: multiprocessing.connection.Connection,
- input_multiprocess_fd: Optional[MultiProcessFd],
- jsfd1: Optional[MultiProcessFd],
- jsfd2: Optional[MultiProcessFd],
+ write_pipe: Connection,
+ input_pipe: Optional[Connection],
+ jsfd1: Optional[Connection],
+ jsfd2: Optional[Connection],
):
"""Main entry point in the child process for Spack builds.
@@ -1188,13 +1188,12 @@ def _setup_pkg_and_run(
context: str = kwargs.get("context", "build")
try:
- # We are in the child process. Python sets sys.stdin to
- # open(os.devnull) to prevent our process and its parent from
- # simultaneously reading from the original stdin. But, we assume
- # that the parent process is not going to read from it till we
- # are done with the child, so we undo Python's precaution.
- if input_multiprocess_fd is not None:
- sys.stdin = os.fdopen(input_multiprocess_fd.fd, closefd=False)
+ # We are in the child process. Python sets sys.stdin to open(os.devnull) to prevent our
+ # process and its parent from simultaneously reading from the original stdin. But, we
+ # assume that the parent process is not going to read from it till we are done with the
+ # child, so we undo Python's precaution. closefd=False since Connection has ownership.
+ if input_pipe is not None:
+ sys.stdin = os.fdopen(input_pipe.fileno(), closefd=False)
pkg = serialized_pkg.restore()
@@ -1263,8 +1262,8 @@ def _setup_pkg_and_run(
finally:
write_pipe.close()
- if input_multiprocess_fd is not None:
- input_multiprocess_fd.close()
+ if input_pipe is not None:
+ input_pipe.close()
def start_build_process(pkg, function, kwargs):
@@ -1291,23 +1290,9 @@ def start_build_process(pkg, function, kwargs):
If something goes wrong, the child process catches the error and
passes it to the parent wrapped in a ChildError. The parent is
expected to handle (or re-raise) the ChildError.
-
- This uses `multiprocessing.Process` to create the child process. The
- mechanism used to create the process differs on different operating
- systems and for different versions of Python. In some cases "fork"
- is used (i.e. the "fork" system call) and some cases it starts an
- entirely new Python interpreter process (in the docs this is referred
- to as the "spawn" start method). Breaking it down by OS:
-
- - Linux always uses fork.
- - Mac OS uses fork before Python 3.8 and "spawn" for 3.8 and after.
- - Windows always uses the "spawn" start method.
-
- For more information on `multiprocessing` child process creation
- mechanisms, see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
"""
read_pipe, write_pipe = multiprocessing.Pipe(duplex=False)
- input_multiprocess_fd = None
+ input_fd = None
jobserver_fd1 = None
jobserver_fd2 = None
@@ -1316,14 +1301,13 @@ def start_build_process(pkg, function, kwargs):
try:
# Forward sys.stdin when appropriate, to allow toggling verbosity
if sys.platform != "win32" and sys.stdin.isatty() and hasattr(sys.stdin, "fileno"):
- input_fd = os.dup(sys.stdin.fileno())
- input_multiprocess_fd = MultiProcessFd(input_fd)
+ input_fd = Connection(os.dup(sys.stdin.fileno()))
mflags = os.environ.get("MAKEFLAGS", False)
if mflags:
m = re.search(r"--jobserver-[^=]*=(\d),(\d)", mflags)
if m:
- jobserver_fd1 = MultiProcessFd(int(m.group(1)))
- jobserver_fd2 = MultiProcessFd(int(m.group(2)))
+ jobserver_fd1 = Connection(int(m.group(1)))
+ jobserver_fd2 = Connection(int(m.group(2)))
p = multiprocessing.Process(
target=_setup_pkg_and_run,
@@ -1332,7 +1316,7 @@ def start_build_process(pkg, function, kwargs):
function,
kwargs,
write_pipe,
- input_multiprocess_fd,
+ input_fd,
jobserver_fd1,
jobserver_fd2,
),
@@ -1352,8 +1336,8 @@ def start_build_process(pkg, function, kwargs):
finally:
# Close the input stream in the parent process
- if input_multiprocess_fd is not None:
- input_multiprocess_fd.close()
+ if input_fd is not None:
+ input_fd.close()
def exitcode_msg(p):
typ = "exit" if p.exitcode >= 0 else "signal"