diff options
-rw-r--r-- | lib/spack/llnl/util/tty/log.py | 146 | ||||
-rw-r--r-- | lib/spack/spack/build_environment.py | 56 |
2 files changed, 68 insertions, 134 deletions
diff --git a/lib/spack/llnl/util/tty/log.py b/lib/spack/llnl/util/tty/log.py index 9d346fdabb..12d79e8d04 100644 --- a/lib/spack/llnl/util/tty/log.py +++ b/lib/spack/llnl/util/tty/log.py @@ -10,7 +10,6 @@ import ctypes import errno import io import multiprocessing -import multiprocessing.connection import os import re import select @@ -19,9 +18,10 @@ import sys import threading import traceback from contextlib import contextmanager +from multiprocessing.connection import Connection from threading import Thread from types import ModuleType -from typing import Optional +from typing import Callable, Optional import llnl.util.tty as tty @@ -345,48 +345,6 @@ class FileWrapper: self.file.close() -class MultiProcessFd: - """Return an object which stores a file descriptor and can be passed as an - argument to a function run with ``multiprocessing.Process``, such that - the file descriptor is available in the subprocess. It provides access via - the `fd` property. - - This object takes control over the associated FD: files opened from this - using `fdopen` need to use `closefd=False`. - """ - - # As for why you have to fdopen(..., closefd=False): when a - # multiprocessing.connection.Connection object stores an fd, it assumes - # control over it, and will attempt to close it when gc'ed during __del__; - # if you fdopen(multiprocessfd.fd, closefd=True) then the resulting file - # will also assume control, and you can see warnings when there is an - # attempted double close. - - def __init__(self, fd): - self._connection = None - self._fd = None - if sys.version_info >= (3, 8): - self._connection = multiprocessing.connection.Connection(fd) - else: - self._fd = fd - - @property - def fd(self): - if self._connection: - return self._connection.fileno() - else: - return self._fd - - def close(self): - """Rather than `.close()`ing any file opened from the associated - `.fd`, the `MultiProcessFd` should be closed with this. - """ - if self._connection: - self._connection.close() - else: - os.close(self._fd) - - @contextmanager def replace_environment(env): """Replace the current environment (`os.environ`) with `env`. @@ -545,9 +503,7 @@ class nixlog: self._saved_debug = tty._debug # OS-level pipe for redirecting output to logger - read_fd, write_fd = os.pipe() - - read_multiprocess_fd = MultiProcessFd(read_fd) + read_fd, write_fd = multiprocessing.Pipe(duplex=False) # Multiprocessing pipe for communication back from the daemon # Currently only used to save echo value between uses @@ -556,10 +512,10 @@ class nixlog: # Sets a daemon that writes to file what it reads from a pipe try: # need to pass this b/c multiprocessing closes stdin in child. - input_multiprocess_fd = None + input_fd = None try: if sys.stdin.isatty(): - input_multiprocess_fd = MultiProcessFd(os.dup(sys.stdin.fileno())) + input_fd = Connection(os.dup(sys.stdin.fileno())) except BaseException: # just don't forward input if this fails pass @@ -568,8 +524,8 @@ class nixlog: self.process = multiprocessing.Process( target=_writer_daemon, args=( - input_multiprocess_fd, - read_multiprocess_fd, + input_fd, + read_fd, write_fd, self.echo, self.log_file, @@ -581,9 +537,9 @@ class nixlog: self.process.start() finally: - if input_multiprocess_fd: - input_multiprocess_fd.close() - read_multiprocess_fd.close() + if input_fd: + input_fd.close() + read_fd.close() # Flush immediately before redirecting so that anything buffered # goes to the original stream @@ -601,9 +557,9 @@ class nixlog: self._saved_stderr = os.dup(sys.stderr.fileno()) # redirect to the pipe we created above - os.dup2(write_fd, sys.stdout.fileno()) - os.dup2(write_fd, sys.stderr.fileno()) - os.close(write_fd) + os.dup2(write_fd.fileno(), sys.stdout.fileno()) + os.dup2(write_fd.fileno(), sys.stderr.fileno()) + write_fd.close() else: # Handle I/O the Python way. This won't redirect lower-level @@ -616,7 +572,7 @@ class nixlog: self._saved_stderr = sys.stderr # create a file object for the pipe; redirect to it. - pipe_fd_out = os.fdopen(write_fd, "w") + pipe_fd_out = os.fdopen(write_fd.fileno(), "w", closefd=False) sys.stdout = pipe_fd_out sys.stderr = pipe_fd_out @@ -865,14 +821,14 @@ class winlog: def _writer_daemon( - stdin_multiprocess_fd, - read_multiprocess_fd, - write_fd, - echo, - log_file_wrapper, - control_pipe, - filter_fn, -): + stdin_fd: Optional[Connection], + read_fd: Connection, + write_fd: Connection, + echo: bool, + log_file_wrapper: FileWrapper, + control_fd: Connection, + filter_fn: Optional[Callable[[str], str]], +) -> None: """Daemon used by ``log_output`` to write to a log file and to ``stdout``. The daemon receives output from the parent process and writes it both @@ -909,43 +865,37 @@ def _writer_daemon( ``StringIO`` in the parent. This is mainly for testing. Arguments: - stdin_multiprocess_fd (int): input from the terminal - read_multiprocess_fd (int): pipe for reading from parent's redirected - stdout - echo (bool): initial echo setting -- controlled by user and - preserved across multiple writer daemons - log_file_wrapper (FileWrapper): file to log all output - control_pipe (Pipe): multiprocessing pipe on which to send control - information to the parent - filter_fn (callable, optional): function to filter each line of output + stdin_fd: optional input from the terminal + read_fd: pipe for reading from parent's redirected stdout + echo: initial echo setting -- controlled by user and preserved across multiple writer + daemons + log_file_wrapper: file to log all output + control_pipe: multiprocessing pipe on which to send control information to the parent + filter_fn: optional function to filter each line of output """ - # If this process was forked, then it will inherit file descriptors from - # the parent process. This process depends on closing all instances of - # write_fd to terminate the reading loop, so we close the file descriptor - # here. Forking is the process spawning method everywhere except Mac OS - # for Python >= 3.8 and on Windows - if sys.version_info < (3, 8) or sys.platform != "darwin": - os.close(write_fd) + # This process depends on closing all instances of write_pipe to terminate the reading loop + write_fd.close() # 1. Use line buffering (3rd param = 1) since Python 3 has a bug # that prevents unbuffered text I/O. # 2. Python 3.x before 3.7 does not open with UTF-8 encoding by default - in_pipe = os.fdopen(read_multiprocess_fd.fd, "r", 1, encoding="utf-8", closefd=False) + # 3. closefd=False because Connection has "ownership" + read_file = os.fdopen(read_fd.fileno(), "r", 1, encoding="utf-8", closefd=False) - if stdin_multiprocess_fd: - stdin = os.fdopen(stdin_multiprocess_fd.fd, closefd=False) + if stdin_fd: + stdin_file = os.fdopen(stdin_fd.fileno(), closefd=False) else: - stdin = None + stdin_file = None # list of streams to select from - istreams = [in_pipe, stdin] if stdin else [in_pipe] + istreams = [read_file, stdin_file] if stdin_file else [read_file] force_echo = False # parent can force echo for certain output log_file = log_file_wrapper.unwrap() try: - with keyboard_input(stdin) as kb: + with keyboard_input(stdin_file) as kb: while True: # fix the terminal settings if we recently came to # the foreground @@ -958,12 +908,12 @@ def _writer_daemon( # Allow user to toggle echo with 'v' key. # Currently ignores other chars. # only read stdin if we're in the foreground - if stdin in rlist and not _is_background_tty(stdin): + if stdin_file and stdin_file in rlist and not _is_background_tty(stdin_file): # it's possible to be backgrounded between the above # check and the read, so we ignore SIGTTIN here. with ignore_signal(signal.SIGTTIN): try: - if stdin.read(1) == "v": + if stdin_file.read(1) == "v": echo = not echo except IOError as e: # If SIGTTIN is ignored, the system gives EIO @@ -972,13 +922,13 @@ def _writer_daemon( if e.errno != errno.EIO: raise - if in_pipe in rlist: + if read_file in rlist: line_count = 0 try: while line_count < 100: # Handle output from the calling process. try: - line = _retry(in_pipe.readline)() + line = _retry(read_file.readline)() except UnicodeDecodeError: # installs like --test=root gpgme produce non-UTF8 logs line = "<line lost: output was not encoded as UTF-8>\n" @@ -1007,7 +957,7 @@ def _writer_daemon( if xoff in controls: force_echo = False - if not _input_available(in_pipe): + if not _input_available(read_file): break finally: if line_count > 0: @@ -1022,14 +972,14 @@ def _writer_daemon( finally: # send written data back to parent if we used a StringIO if isinstance(log_file, io.StringIO): - control_pipe.send(log_file.getvalue()) + control_fd.send(log_file.getvalue()) log_file_wrapper.close() - read_multiprocess_fd.close() - if stdin_multiprocess_fd: - stdin_multiprocess_fd.close() + read_fd.close() + if stdin_fd: + stdin_fd.close() # send echo value back to the parent so it can be preserved. - control_pipe.send(echo) + control_fd.send(echo) def _retry(function): diff --git a/lib/spack/spack/build_environment.py b/lib/spack/spack/build_environment.py index e620357022..33586eccde 100644 --- a/lib/spack/spack/build_environment.py +++ b/lib/spack/spack/build_environment.py @@ -44,6 +44,7 @@ import types from collections import defaultdict from enum import Flag, auto from itertools import chain +from multiprocessing.connection import Connection from typing import Callable, Dict, List, Optional, Set, Tuple import archspec.cpu @@ -54,7 +55,6 @@ from llnl.util.filesystem import join_path from llnl.util.lang import dedupe, stable_partition from llnl.util.symlink import symlink from llnl.util.tty.color import cescape, colorize -from llnl.util.tty.log import MultiProcessFd import spack.build_systems._checks import spack.build_systems.cmake @@ -1143,10 +1143,10 @@ def _setup_pkg_and_run( serialized_pkg: "spack.subprocess_context.PackageInstallContext", function: Callable, kwargs: Dict, - write_pipe: multiprocessing.connection.Connection, - input_multiprocess_fd: Optional[MultiProcessFd], - jsfd1: Optional[MultiProcessFd], - jsfd2: Optional[MultiProcessFd], + write_pipe: Connection, + input_pipe: Optional[Connection], + jsfd1: Optional[Connection], + jsfd2: Optional[Connection], ): """Main entry point in the child process for Spack builds. @@ -1188,13 +1188,12 @@ def _setup_pkg_and_run( context: str = kwargs.get("context", "build") try: - # We are in the child process. Python sets sys.stdin to - # open(os.devnull) to prevent our process and its parent from - # simultaneously reading from the original stdin. But, we assume - # that the parent process is not going to read from it till we - # are done with the child, so we undo Python's precaution. - if input_multiprocess_fd is not None: - sys.stdin = os.fdopen(input_multiprocess_fd.fd, closefd=False) + # We are in the child process. Python sets sys.stdin to open(os.devnull) to prevent our + # process and its parent from simultaneously reading from the original stdin. But, we + # assume that the parent process is not going to read from it till we are done with the + # child, so we undo Python's precaution. closefd=False since Connection has ownership. + if input_pipe is not None: + sys.stdin = os.fdopen(input_pipe.fileno(), closefd=False) pkg = serialized_pkg.restore() @@ -1263,8 +1262,8 @@ def _setup_pkg_and_run( finally: write_pipe.close() - if input_multiprocess_fd is not None: - input_multiprocess_fd.close() + if input_pipe is not None: + input_pipe.close() def start_build_process(pkg, function, kwargs): @@ -1291,23 +1290,9 @@ def start_build_process(pkg, function, kwargs): If something goes wrong, the child process catches the error and passes it to the parent wrapped in a ChildError. The parent is expected to handle (or re-raise) the ChildError. - - This uses `multiprocessing.Process` to create the child process. The - mechanism used to create the process differs on different operating - systems and for different versions of Python. In some cases "fork" - is used (i.e. the "fork" system call) and some cases it starts an - entirely new Python interpreter process (in the docs this is referred - to as the "spawn" start method). Breaking it down by OS: - - - Linux always uses fork. - - Mac OS uses fork before Python 3.8 and "spawn" for 3.8 and after. - - Windows always uses the "spawn" start method. - - For more information on `multiprocessing` child process creation - mechanisms, see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods """ read_pipe, write_pipe = multiprocessing.Pipe(duplex=False) - input_multiprocess_fd = None + input_fd = None jobserver_fd1 = None jobserver_fd2 = None @@ -1316,14 +1301,13 @@ def start_build_process(pkg, function, kwargs): try: # Forward sys.stdin when appropriate, to allow toggling verbosity if sys.platform != "win32" and sys.stdin.isatty() and hasattr(sys.stdin, "fileno"): - input_fd = os.dup(sys.stdin.fileno()) - input_multiprocess_fd = MultiProcessFd(input_fd) + input_fd = Connection(os.dup(sys.stdin.fileno())) mflags = os.environ.get("MAKEFLAGS", False) if mflags: m = re.search(r"--jobserver-[^=]*=(\d),(\d)", mflags) if m: - jobserver_fd1 = MultiProcessFd(int(m.group(1))) - jobserver_fd2 = MultiProcessFd(int(m.group(2))) + jobserver_fd1 = Connection(int(m.group(1))) + jobserver_fd2 = Connection(int(m.group(2))) p = multiprocessing.Process( target=_setup_pkg_and_run, @@ -1332,7 +1316,7 @@ def start_build_process(pkg, function, kwargs): function, kwargs, write_pipe, - input_multiprocess_fd, + input_fd, jobserver_fd1, jobserver_fd2, ), @@ -1352,8 +1336,8 @@ def start_build_process(pkg, function, kwargs): finally: # Close the input stream in the parent process - if input_multiprocess_fd is not None: - input_multiprocess_fd.close() + if input_fd is not None: + input_fd.close() def exitcode_msg(p): typ = "exit" if p.exitcode >= 0 else "signal" |