summaryrefslogtreecommitdiff
path: root/lib/spack/spack/util/gpg.py
blob: ea1e42131d91cefbe5c299da1f7cca9b14079ddb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import contextlib
import errno
import functools
import os
import re

import spack.error
import spack.paths
import spack.util.executable
import spack.version

#: Executable instance for "gpg", initialized lazily
GPG = None
#: Executable instance for "gpgconf", initialized lazily
GPGCONF = None
#: Socket directory required if a non default home directory is used
SOCKET_DIR = None
#: GNUPGHOME environment variable in the context of this Python module
GNUPGHOME = None


def clear():
    """Reset the global state to uninitialized."""
    global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME
    GPG, GPGCONF, SOCKET_DIR, GNUPGHOME = None, None, None, None


def init(gnupghome=None, force=False):
    """Initialize the global objects in the module, if not set.

    When calling any gpg executable, the GNUPGHOME environment
    variable is set to:

    1. The value of the `gnupghome` argument, if not None
    2. The value of the "SPACK_GNUPGHOME" environment variable, if set
    3. The default gpg path for Spack otherwise

    Args:
        gnupghome (str): value to be used for GNUPGHOME when calling
            GnuPG executables
        force (bool): if True forces the re-initialization even if the
            global objects are set already
    """
    global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME
    import spack.bootstrap

    if force:
        clear()

    # If the executables are already set, there's nothing to do
    if GPG and GNUPGHOME:
        return

    # Set the value of GNUPGHOME to be used in this module
    GNUPGHOME = gnupghome or os.getenv("SPACK_GNUPGHOME") or spack.paths.gpg_path

    # Set the executable objects for "gpg" and "gpgconf"
    with spack.bootstrap.ensure_bootstrap_configuration():
        spack.bootstrap.ensure_core_dependencies()
        GPG, GPGCONF = _gpg(), _gpgconf()

    GPG.add_default_env("GNUPGHOME", GNUPGHOME)
    if GPGCONF:
        GPGCONF.add_default_env("GNUPGHOME", GNUPGHOME)
        # Set the socket dir if not using GnuPG defaults
        SOCKET_DIR = _socket_dir(GPGCONF)

    # Make sure that the GNUPGHOME exists
    if not os.path.exists(GNUPGHOME):
        os.makedirs(GNUPGHOME)
        os.chmod(GNUPGHOME, 0o700)

    if not os.path.isdir(GNUPGHOME):
        msg = 'GNUPGHOME "{0}" exists and is not a directory'.format(GNUPGHOME)
        raise SpackGPGError(msg)

    if SOCKET_DIR is not None:
        GPGCONF("--create-socketdir")


def _autoinit(func):
    """Decorator to ensure that global variables have been initialized before
    running the decorated function.

    Args:
        func (callable): decorated function
    """

    @functools.wraps(func)
    def _wrapped(*args, **kwargs):
        init()
        return func(*args, **kwargs)

    return _wrapped


@contextlib.contextmanager
def gnupghome_override(dir):
    """Set the GNUPGHOME to a new location for this context.

    Args:
        dir (str): new value for GNUPGHOME
    """
    global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME

    # Store backup values
    _GPG, _GPGCONF = GPG, GPGCONF
    _SOCKET_DIR, _GNUPGHOME = SOCKET_DIR, GNUPGHOME
    clear()

    # Clear global state
    init(gnupghome=dir, force=True)

    yield

    clear()
    GPG, GPGCONF = _GPG, _GPGCONF
    SOCKET_DIR, GNUPGHOME = _SOCKET_DIR, _GNUPGHOME


def _parse_secret_keys_output(output):
    keys = []
    found_sec = False
    for line in output.split("\n"):
        if found_sec:
            if line.startswith("fpr"):
                keys.append(line.split(":")[9])
                found_sec = False
            elif line.startswith("ssb"):
                found_sec = False
        elif line.startswith("sec"):
            found_sec = True
    return keys


def _parse_public_keys_output(output):
    """
    Returns a list of public keys with their fingerprints
    """
    keys = []
    found_pub = False
    current_pub_key = ""
    for line in output.split("\n"):
        if found_pub:
            if line.startswith("fpr"):
                keys.append((current_pub_key, line.split(":")[9]))
                found_pub = False
            elif line.startswith("ssb"):
                found_pub = False
        elif line.startswith("pub"):
            current_pub_key = line.split(":")[4]
            found_pub = True
    return keys


def _get_unimported_public_keys(output):
    keys = []
    for line in output.split("\n"):
        if line.startswith("pub"):
            keys.append(line.split(":")[4])
    return keys


class SpackGPGError(spack.error.SpackError):
    """Class raised when GPG errors are detected."""


@_autoinit
def create(**kwargs):
    """Create a new key pair."""
    r, w = os.pipe()
    with contextlib.closing(os.fdopen(r, "r")) as r:
        with contextlib.closing(os.fdopen(w, "w")) as w:
            w.write(
                """
Key-Type: rsa
Key-Length: 4096
Key-Usage: sign
Name-Real: %(name)s
Name-Email: %(email)s
Name-Comment: %(comment)s
Expire-Date: %(expires)s
%%no-protection
%%commit
"""
                % kwargs
            )
        GPG("--gen-key", "--batch", input=r)


@_autoinit
def signing_keys(*args):
    """Return the keys that can be used to sign binaries."""
    output = GPG("--list-secret-keys", "--with-colons", "--fingerprint", *args, output=str)
    return _parse_secret_keys_output(output)


@_autoinit
def public_keys_to_fingerprint(*args):
    """Return the keys that can be used to verify binaries."""
    output = GPG("--list-public-keys", "--with-colons", "--fingerprint", *args, output=str)
    return _parse_public_keys_output(output)


@_autoinit
def public_keys(*args):
    """Return a list of fingerprints"""
    keys_and_fpr = public_keys_to_fingerprint(*args)
    return [key_and_fpr[1] for key_and_fpr in keys_and_fpr]


@_autoinit
def export_keys(location, keys, secret=False):
    """Export public keys to a location passed as argument.

    Args:
        location (str): where to export the keys
        keys (list): keys to be exported
        secret (bool): whether to export secret keys or not
    """
    if secret:
        GPG("--export-secret-keys", "--armor", "--output", location, *keys)
    else:
        GPG("--batch", "--yes", "--armor", "--export", "--output", location, *keys)


@_autoinit
def trust(keyfile):
    """Import a public key from a file and trust it.

    Args:
        keyfile (str): file with the public key
    """
    # Get the public keys we are about to import
    output = GPG("--with-colons", keyfile, output=str, error=str)
    keys = _get_unimported_public_keys(output)

    # Import them
    GPG("--batch", "--import", keyfile)

    # Set trust to ultimate
    key_to_fpr = dict(public_keys_to_fingerprint())
    for key in keys:
        # Skip over keys we cannot find a fingerprint for.
        if key not in key_to_fpr:
            continue

        fpr = key_to_fpr[key]
        r, w = os.pipe()
        with contextlib.closing(os.fdopen(r, "r")) as r:
            with contextlib.closing(os.fdopen(w, "w")) as w:
                w.write("{0}:6:\n".format(fpr))
            GPG("--import-ownertrust", input=r)


@_autoinit
def untrust(signing, *keys):
    """Delete known keys.

    Args:
        signing (bool): if True deletes the secret keys
        *keys: keys to be deleted
    """
    if signing:
        skeys = signing_keys(*keys)
        GPG("--batch", "--yes", "--delete-secret-keys", *skeys)

    pkeys = public_keys(*keys)
    GPG("--batch", "--yes", "--delete-keys", *pkeys)


@_autoinit
def sign(key, file, output, clearsign=False):
    """Sign a file with a key.

    Args:
        key: key to be used to sign
        file (str): file to be signed
        output (str): output file (either the clearsigned file or
            the detached signature)
        clearsign (bool): if True wraps the document in an ASCII-armored
            signature, if False creates a detached signature
    """
    signopt = "--clearsign" if clearsign else "--detach-sign"
    GPG(signopt, "--armor", "--local-user", key, "--output", output, file)


@_autoinit
def verify(signature, file=None, suppress_warnings=False):
    """Verify the signature on a file.

    Args:
        signature (str): signature of the file (or clearsigned file)
        file (str): file to be verified.  If None, then signature is
            assumed to be a clearsigned file.
        suppress_warnings (bool): whether or not to suppress warnings
            from GnuPG
    """
    args = [signature]
    if file:
        args.append(file)
    kwargs = {"error": str} if suppress_warnings else {}
    GPG("--verify", *args, **kwargs)


@_autoinit
def list(trusted, signing):
    """List known keys.

    Args:
        trusted (bool): if True list public keys
        signing (bool): if True list private keys
    """
    if trusted:
        GPG("--list-public-keys")

    if signing:
        GPG("--list-secret-keys")


def _verify_exe_or_raise(exe):
    msg = (
        "Spack requires gpgconf version >= 2\n"
        "  To install a suitable version using Spack, run\n"
        "    spack install gnupg@2:\n"
        "  and load it by running\n"
        "    spack load gnupg@2:"
    )
    if not exe:
        raise SpackGPGError(msg)

    output = exe("--version", output=str)
    match = re.search(r"^gpg(conf)? \(GnuPG\) (.*)$", output, re.M)
    if not match:
        raise SpackGPGError('Could not determine "{0}" version'.format(exe.name))

    if spack.version.Version(match.group(2)) < spack.version.Version("2"):
        raise SpackGPGError(msg)


def _gpgconf():
    exe = spack.util.executable.which("gpgconf", "gpg2conf", "gpgconf2")
    _verify_exe_or_raise(exe)

    # ensure that the gpgconf we found can run "gpgconf --create-socketdir"
    try:
        exe("--dry-run", "--create-socketdir", output=os.devnull, error=os.devnull)
    except spack.util.executable.ProcessError:
        # no dice
        exe = None

    return exe


def _gpg():
    exe = spack.util.executable.which("gpg2", "gpg")
    _verify_exe_or_raise(exe)
    return exe


def _socket_dir(gpgconf):
    # Try to ensure that (/var)/run/user/$(id -u) exists so that
    # `gpgconf --create-socketdir` can be run later.
    #
    # NOTE(opadron): This action helps prevent a large class of
    #                "file-name-too-long" errors in gpg.

    # If there is no suitable gpgconf, don't even bother trying to
    # pre-create a user run dir.
    if not gpgconf:
        return None

    result = None
    for var_run in ("/run", "/var/run"):
        if not os.path.exists(var_run):
            continue

        var_run_user = os.path.join(var_run, "user")
        try:
            if not os.path.exists(var_run_user):
                os.mkdir(var_run_user)
                os.chmod(var_run_user, 0o777)

            user_dir = os.path.join(var_run_user, str(os.getuid()))

            if not os.path.exists(user_dir):
                os.mkdir(user_dir)
                os.chmod(user_dir, 0o700)

        # If the above operation fails due to lack of permissions, then
        # just carry on without running gpgconf and hope for the best.
        #
        # NOTE(opadron): Without a dir in which to create a socket for IPC,
        #                gnupg may fail if GNUPGHOME is set to a path that
        #                is too long, where "too long" in this context is
        #                actually quite short; somewhere in the
        #                neighborhood of more than 100 characters.
        #
        # TODO(opadron): Maybe a warning should be printed in this case?
        except OSError as exc:
            if exc.errno not in (errno.EPERM, errno.EACCES):
                raise
            user_dir = None

        # return the last iteration that provides a usable user run dir
        if user_dir is not None:
            result = user_dir

    return result