summaryrefslogtreecommitdiff
path: root/lib/spack/llnl/util/tty/pty.py
blob: 1d9fe724fcde91b9653c2264a5cb9569e3272262 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# Copyright 2013-2024 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

"""The pty module handles pseudo-terminals.

Currently, the infrastructure here is only used to test llnl.util.tty.log.

If this is used outside a testing environment, we will want to reconsider
things like timeouts in ``ProcessController.wait()``, which are set to
get tests done quickly, not to avoid high CPU usage.

Note: The functionality in this module is unsupported on Windows
"""
import multiprocessing
import os
import re
import signal
import sys
import time
import traceback

import llnl.util.tty.log as log

from spack.util.executable import which

termios = None
try:
    import termios as term_mod

    termios = term_mod
except ImportError:
    pass


class ProcessController:
    """Wrapper around some fundamental process control operations.

    This allows one process (the controller) to drive another (the
    minion) similar to the way a shell would, by sending signals and I/O.

    """

    def __init__(self, pid, controller_fd, timeout=1, sleep_time=1e-1, debug=False):
        """Create a controller to manipulate the process with id ``pid``

        Args:
            pid (int): id of process to control
            controller_fd (int): controller fd attached to pid's stdin
            timeout (int): time in seconds for wait operations to time out
                (default 1 second)
            sleep_time (int): time to sleep after signals, to control the
                signal rate of the controller (default 1e-1)
            debug (bool): whether ``horizontal_line()`` and ``status()`` should
                produce output when called (default False)

        ``sleep_time`` allows the caller to insert delays after calls
        that signal or modify the controlled process. Python behaves very
        poorly if signals arrive too fast, and drowning a Python process
        with a Python handler with signals can kill the process and hang
        our tests, so we throttle this a closer-to-interactive rate.

        """
        self.pid = pid
        self.pgid = os.getpgid(pid)
        self.controller_fd = controller_fd
        self.timeout = timeout
        self.sleep_time = sleep_time
        self.debug = debug

        # we need the ps command to wait for process statuses
        self.ps = which("ps", required=True)

    def get_canon_echo_attrs(self):
        """Get echo and canon attributes of the terminal of controller_fd."""
        cfg = termios.tcgetattr(self.controller_fd)
        return (bool(cfg[3] & termios.ICANON), bool(cfg[3] & termios.ECHO))

    def horizontal_line(self, name):
        """Labled horizontal line for debugging."""
        if self.debug:
            sys.stderr.write("------------------------------------------- %s\n" % name)

    def status(self):
        """Print debug message with status info for the minion."""
        if self.debug:
            canon, echo = self.get_canon_echo_attrs()
            sys.stderr.write(
                "canon: %s, echo: %s\n" % ("on" if canon else "off", "on" if echo else "off")
            )
            sys.stderr.write("input: %s\n" % self.input_on())
            sys.stderr.write("bg: %s\n" % self.background())
            sys.stderr.write("\n")

    def input_on(self):
        """True if keyboard input is enabled on the controller_fd pty."""
        return self.get_canon_echo_attrs() == (False, False)

    def background(self):
        """True if pgid is in a background pgroup of controller_fd's tty."""
        return self.pgid != os.tcgetpgrp(self.controller_fd)

    def tstp(self):
        """Send SIGTSTP to the controlled process."""
        self.horizontal_line("tstp")
        os.killpg(self.pgid, signal.SIGTSTP)
        time.sleep(self.sleep_time)

    def cont(self):
        self.horizontal_line("cont")
        os.killpg(self.pgid, signal.SIGCONT)
        time.sleep(self.sleep_time)

    def fg(self):
        self.horizontal_line("fg")
        with log.ignore_signal(signal.SIGTTOU):
            os.tcsetpgrp(self.controller_fd, os.getpgid(self.pid))
        time.sleep(self.sleep_time)

    def bg(self):
        self.horizontal_line("bg")
        with log.ignore_signal(signal.SIGTTOU):
            os.tcsetpgrp(self.controller_fd, os.getpgrp())
        time.sleep(self.sleep_time)

    def write(self, byte_string):
        self.horizontal_line("write '%s'" % byte_string.decode("utf-8"))
        os.write(self.controller_fd, byte_string)

    def wait(self, condition):
        start = time.time()
        while ((time.time() - start) < self.timeout) and not condition():
            time.sleep(1e-2)
        assert condition()

    def wait_enabled(self):
        self.wait(lambda: self.input_on() and not self.background())

    def wait_disabled(self):
        self.wait(lambda: not self.input_on() and self.background())

    def wait_disabled_fg(self):
        self.wait(lambda: not self.input_on() and not self.background())

    def proc_status(self):
        status = self.ps("-p", str(self.pid), "-o", "stat", output=str)
        status = re.split(r"\s+", status.strip(), re.M)
        return status[1]

    def wait_stopped(self):
        self.wait(lambda: "T" in self.proc_status())

    def wait_running(self):
        self.wait(lambda: "T" not in self.proc_status())


class PseudoShell:
    """Sets up controller and minion processes with a PTY.

    You can create a ``PseudoShell`` if you want to test how some
    function responds to terminal input.  This is a pseudo-shell from a
    job control perspective; ``controller_function`` and ``minion_function``
    are set up with a pseudoterminal (pty) so that the controller can drive
    the minion through process control signals and I/O.

    The two functions should have signatures like this::

        def controller_function(proc, ctl, **kwargs)
        def minion_function(**kwargs)

    ``controller_function`` is spawned in its own process and passed three
    arguments:

    proc
        the ``multiprocessing.Process`` object representing the minion
    ctl
        a ``ProcessController`` object tied to the minion
    kwargs
        keyword arguments passed from ``PseudoShell.start()``.

    ``minion_function`` is only passed ``kwargs`` delegated from
    ``PseudoShell.start()``.

    The ``ctl.controller_fd`` will have its ``controller_fd`` connected to
    ``sys.stdin`` in the minion process. Both processes will share the
    same ``sys.stdout`` and ``sys.stderr`` as the process instantiating
    ``PseudoShell``.

    Here are the relationships between processes created::

        ._________________________________________________________.
        | Minion Process                                          | pid     2
        | - runs minion_function                                  | pgroup  2
        |_________________________________________________________| session 1
            ^
            | create process with controller_fd connected to stdin
            | stdout, stderr are the same as caller
        ._________________________________________________________.
        | Controller Process                                      | pid     1
        | - runs controller_function                              | pgroup  1
        | - uses ProcessController and controller_fd to           | session 1
        |   control minion                                        |
        |_________________________________________________________|
            ^
            | create process
            | stdin, stdout, stderr are the same as caller
        ._________________________________________________________.
        | Caller                                                  |  pid     0
        | - Constructs, starts, joins PseudoShell                 |  pgroup  0
        | - provides controller_function, minion_function         |  session 0
        |_________________________________________________________|

    """

    def __init__(self, controller_function, minion_function):
        self.proc = None
        self.controller_function = controller_function
        self.minion_function = minion_function

        # these can be optionally set to change defaults
        self.controller_timeout = 3
        self.sleep_time = 0.1

    def start(self, **kwargs):
        """Start the controller and minion processes.

        Arguments:
            kwargs (dict): arbitrary keyword arguments that will be
                passed to controller and minion functions

        The controller process will create the minion, then call
        ``controller_function``.  The minion process will call
        ``minion_function``.

        """
        self.proc = multiprocessing.Process(
            target=PseudoShell._set_up_and_run_controller_function,
            args=(
                self.controller_function,
                self.minion_function,
                self.controller_timeout,
                self.sleep_time,
            ),
            kwargs=kwargs,
        )
        self.proc.start()

    def join(self):
        """Wait for the minion process to finish, and return its exit code."""
        self.proc.join()
        return self.proc.exitcode

    @staticmethod
    def _set_up_and_run_minion_function(
        tty_name, stdout_fd, stderr_fd, ready, minion_function, **kwargs
    ):
        """Minion process wrapper for PseudoShell.

        Handles the mechanics of setting up a PTY, then calls
        ``minion_function``.

        """
        # new process group, like a command or pipeline launched by a shell
        os.setpgrp()

        # take controlling terminal and set up pty IO
        stdin_fd = os.open(tty_name, os.O_RDWR)
        os.dup2(stdin_fd, sys.stdin.fileno())
        os.dup2(stdout_fd, sys.stdout.fileno())
        os.dup2(stderr_fd, sys.stderr.fileno())
        os.close(stdin_fd)

        if kwargs.get("debug"):
            sys.stderr.write("minion: stdin.isatty(): %s\n" % sys.stdin.isatty())

        # tell the parent that we're really running
        if kwargs.get("debug"):
            sys.stderr.write("minion: ready!\n")
        ready.value = True

        try:
            minion_function(**kwargs)
        except BaseException:
            traceback.print_exc()

    @staticmethod
    def _set_up_and_run_controller_function(
        controller_function, minion_function, controller_timeout, sleep_time, **kwargs
    ):
        """Set up a pty, spawn a minion process, execute controller_function.

        Handles the mechanics of setting up a PTY, then calls
        ``controller_function``.

        """
        os.setsid()  # new session; this process is the controller

        controller_fd, minion_fd = os.openpty()
        pty_name = os.ttyname(minion_fd)

        # take controlling terminal
        pty_fd = os.open(pty_name, os.O_RDWR)
        os.close(pty_fd)

        ready = multiprocessing.Value("i", False)
        minion_process = multiprocessing.Process(
            target=PseudoShell._set_up_and_run_minion_function,
            args=(pty_name, sys.stdout.fileno(), sys.stderr.fileno(), ready, minion_function),
            kwargs=kwargs,
        )
        minion_process.start()

        # wait for subprocess to be running and connected.
        while not ready.value:
            time.sleep(1e-5)
            pass

        if kwargs.get("debug"):
            sys.stderr.write("pid:        %d\n" % os.getpid())
            sys.stderr.write("pgid:       %d\n" % os.getpgrp())
            sys.stderr.write("sid:        %d\n" % os.getsid(0))
            sys.stderr.write("tcgetpgrp:  %d\n" % os.tcgetpgrp(controller_fd))
            sys.stderr.write("\n")

            minion_pgid = os.getpgid(minion_process.pid)
            sys.stderr.write("minion pid:  %d\n" % minion_process.pid)
            sys.stderr.write("minion pgid: %d\n" % minion_pgid)
            sys.stderr.write("minion sid:  %d\n" % os.getsid(minion_process.pid))
            sys.stderr.write("\n")
            sys.stderr.flush()
        # set up controller to ignore SIGTSTP, like a shell
        signal.signal(signal.SIGTSTP, signal.SIG_IGN)

        # call the controller function once the minion is ready
        try:
            controller = ProcessController(
                minion_process.pid, controller_fd, debug=kwargs.get("debug")
            )
            controller.timeout = controller_timeout
            controller.sleep_time = sleep_time
            error = controller_function(minion_process, controller, **kwargs)
        except BaseException:
            error = 1
            traceback.print_exc()

        minion_process.join()

        # return whether either the parent or minion failed
        return error or minion_process.exitcode