diff options
-rw-r--r-- | lib/spack/spack/cmd/tutorial.py | 2 | ||||
-rw-r--r-- | lib/spack/spack/test/bindist.py | 6 | ||||
-rw-r--r-- | lib/spack/spack/test/ci.py | 2 | ||||
-rw-r--r-- | lib/spack/spack/test/cmd/buildcache.py | 2 | ||||
-rw-r--r-- | lib/spack/spack/test/cmd/ci.py | 4 | ||||
-rw-r--r-- | lib/spack/spack/test/cmd/gpg.py | 14 | ||||
-rw-r--r-- | lib/spack/spack/test/conftest.py | 8 | ||||
-rw-r--r-- | lib/spack/spack/test/packaging.py | 2 | ||||
-rw-r--r-- | lib/spack/spack/test/util/util_gpg.py | 33 | ||||
-rw-r--r-- | lib/spack/spack/util/gpg.py | 599 |
10 files changed, 304 insertions, 368 deletions
diff --git a/lib/spack/spack/cmd/tutorial.py b/lib/spack/spack/cmd/tutorial.py index a8bc1f1ddd..10dd8f84c3 100644 --- a/lib/spack/spack/cmd/tutorial.py +++ b/lib/spack/spack/cmd/tutorial.py @@ -73,7 +73,7 @@ def tutorial(parser, args): tty.msg("Ensuring that we trust tutorial binaries", "spack gpg trust %s" % tutorial_key) - spack.util.gpg.Gpg().trust(tutorial_key) + spack.util.gpg.trust(tutorial_key) # Note that checkout MUST be last. It changes Spack under our feet. # If you don't put this last, you'll get import errors for the code diff --git a/lib/spack/spack/test/bindist.py b/lib/spack/spack/test/bindist.py index fb2db23365..d7e4ddb50f 100644 --- a/lib/spack/spack/test/bindist.py +++ b/lib/spack/spack/test/bindist.py @@ -318,8 +318,6 @@ def test_relative_rpaths_install_nondefault(mirror_dir): buildcache_cmd('install', '-auf', cspec.name) -@pytest.mark.skipif(not spack.util.gpg.has_gpg(), - reason='This test requires gpg') def test_push_and_fetch_keys(mock_gnupghome): testpath = str(mock_gnupghome) @@ -333,7 +331,7 @@ def test_push_and_fetch_keys(mock_gnupghome): # dir 1: create a new key, record its fingerprint, and push it to a new # mirror - with spack.util.gpg.gnupg_home_override(gpg_dir1): + with spack.util.gpg.gnupghome_override(gpg_dir1): spack.util.gpg.create(name='test-key', email='fake@test.key', expires='0', @@ -347,7 +345,7 @@ def test_push_and_fetch_keys(mock_gnupghome): # dir 2: import the key from the mirror, and confirm that its fingerprint # matches the one created above - with spack.util.gpg.gnupg_home_override(gpg_dir2): + with spack.util.gpg.gnupghome_override(gpg_dir2): assert len(spack.util.gpg.public_keys()) == 0 bindist.get_keys(mirrors=mirrors, install=True, trust=True, force=True) diff --git a/lib/spack/spack/test/ci.py b/lib/spack/spack/test/ci.py index 5599d18d5b..d9c337e4f8 100644 --- a/lib/spack/spack/test/ci.py +++ b/lib/spack/spack/test/ci.py @@ -56,8 +56,6 @@ def test_urlencode_string(): assert(s_enc == 'Spack+Test+Project') -@pytest.mark.skipif(not spack.util.gpg.has_gpg(), - reason='This test requires gpg') def test_import_signing_key(mock_gnupghome): signing_key_dir = spack_paths.mock_gpg_keys_path signing_key_path = os.path.join(signing_key_dir, 'package-signing-key') diff --git a/lib/spack/spack/test/cmd/buildcache.py b/lib/spack/spack/test/cmd/buildcache.py index 5c056fb33b..addb286a41 100644 --- a/lib/spack/spack/test/cmd/buildcache.py +++ b/lib/spack/spack/test/cmd/buildcache.py @@ -138,8 +138,6 @@ def test_buildcache_create_fail_on_perm_denied( tmpdir.chmod(0o700) -@pytest.mark.skipif(not spack.util.gpg.has_gpg(), - reason='This test requires gpg') def test_update_key_index(tmpdir, mutable_mock_env_path, install_mockery, mock_packages, mock_fetch, mock_stage, mock_gnupghome): diff --git a/lib/spack/spack/test/cmd/ci.py b/lib/spack/spack/test/cmd/ci.py index 64ab6d13b9..a6fe1b4c9d 100644 --- a/lib/spack/spack/test/cmd/ci.py +++ b/lib/spack/spack/test/cmd/ci.py @@ -647,8 +647,6 @@ spack: assert not any('externaltool' in key for key in yaml_contents) -@pytest.mark.skipif(not spack.util.gpg.has_gpg(), - reason='This test requires gpg') def test_ci_rebuild(tmpdir, mutable_mock_env_path, env_deactivate, install_mockery, mock_packages, monkeypatch, mock_gnupghome, mock_fetch): @@ -864,8 +862,6 @@ spack: @pytest.mark.disable_clean_stage_check -@pytest.mark.skipif(not spack.util.gpg.has_gpg(), - reason='This test requires gpg') def test_push_mirror_contents(tmpdir, mutable_mock_env_path, env_deactivate, install_mockery, mock_packages, mock_fetch, mock_stage, mock_gnupghome): diff --git a/lib/spack/spack/test/cmd/gpg.py b/lib/spack/spack/test/cmd/gpg.py index eb7eab2734..3834876f58 100644 --- a/lib/spack/spack/test/cmd/gpg.py +++ b/lib/spack/spack/test/cmd/gpg.py @@ -41,24 +41,20 @@ def test_find_gpg(cmd_name, version, tmpdir, mock_gnupghome, monkeypatch): monkeypatch.setitem(os.environ, "PATH", str(tmpdir)) if version == 'undetectable' or version.endswith('1.3.4'): with pytest.raises(spack.util.gpg.SpackGPGError): - spack.util.gpg.ensure_gpg(reevaluate=True) + spack.util.gpg.init(force=True) else: - spack.util.gpg.ensure_gpg(reevaluate=True) - gpg_exe = spack.util.gpg.get_global_gpg_instance().gpg_exe - assert isinstance(gpg_exe, spack.util.executable.Executable) - gpgconf_exe = spack.util.gpg.get_global_gpg_instance().gpgconf_exe - assert isinstance(gpgconf_exe, spack.util.executable.Executable) + spack.util.gpg.init(force=True) + assert spack.util.gpg.GPG is not None + assert spack.util.gpg.GPGCONF is not None def test_no_gpg_in_path(tmpdir, mock_gnupghome, monkeypatch): monkeypatch.setitem(os.environ, "PATH", str(tmpdir)) with pytest.raises(spack.util.gpg.SpackGPGError): - spack.util.gpg.ensure_gpg(reevaluate=True) + spack.util.gpg.init(force=True) @pytest.mark.maybeslow -@pytest.mark.skipif(not spack.util.gpg.has_gpg(), - reason='These tests require gnupg2') def test_gpg(tmpdir, mock_gnupghome): # Verify a file with an empty keyring. with pytest.raises(ProcessError): diff --git a/lib/spack/spack/test/conftest.py b/lib/spack/spack/test/conftest.py index 8fe566fee7..6e003dd870 100644 --- a/lib/spack/spack/test/conftest.py +++ b/lib/spack/spack/test/conftest.py @@ -842,8 +842,14 @@ def mock_gnupghome(monkeypatch): # have to make our own tmpdir with a shorter name than pytest's. # This comes up because tmp paths on macOS are already long-ish, and # pytest makes them longer. + try: + spack.util.gpg.init() + except spack.util.gpg.SpackGPGError: + if not spack.util.gpg.GPG: + pytest.skip('This test requires gpg') + short_name_tmpdir = tempfile.mkdtemp() - with spack.util.gpg.gnupg_home_override(short_name_tmpdir): + with spack.util.gpg.gnupghome_override(short_name_tmpdir): yield short_name_tmpdir # clean up, since we are doing this manually diff --git a/lib/spack/spack/test/packaging.py b/lib/spack/spack/test/packaging.py index 57446b6e2d..bd02214512 100644 --- a/lib/spack/spack/test/packaging.py +++ b/lib/spack/spack/test/packaging.py @@ -39,8 +39,6 @@ def fake_fetchify(url, pkg): pkg.fetcher = fetcher -@pytest.mark.skipif(not spack.util.gpg.has_gpg(), - reason='This test requires gpg') @pytest.mark.usefixtures('install_mockery', 'mock_gnupghome') def test_buildcache(mock_archive, tmpdir): # tweak patchelf to only do a download diff --git a/lib/spack/spack/test/util/util_gpg.py b/lib/spack/spack/test/util/util_gpg.py index 987dcf7f0e..82a410561c 100644 --- a/lib/spack/spack/test/util/util_gpg.py +++ b/lib/spack/spack/test/util/util_gpg.py @@ -9,6 +9,12 @@ import pytest import spack.util.gpg +@pytest.fixture() +def has_socket_dir(): + spack.util.gpg.init() + return bool(spack.util.gpg.SOCKET_DIR) + + def test_parse_gpg_output_case_one(): # Two keys, fingerprint for primary keys, but not subkeys output = """sec::2048:1:AAAAAAAAAAAAAAAA:AAAAAAAAAA:AAAAAAAAAA::::::::: @@ -20,7 +26,7 @@ fpr:::::::::YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY: uid:::::::AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA::Joe (Test) <j.s@s.com>: ssb::2048:1:AAAAAAAAAAAAAAAA:AAAAAAAAAA:::::::::: """ - keys = spack.util.gpg.parse_secret_keys_output(output) + keys = spack.util.gpg._parse_secret_keys_output(output) assert len(keys) == 2 assert keys[0] == 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' @@ -37,7 +43,7 @@ ssb:-:2048:1:AAAAAAAAA::::::esa:::+:::23: fpr:::::::::YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY: grp:::::::::AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: """ - keys = spack.util.gpg.parse_secret_keys_output(output) + keys = spack.util.gpg._parse_secret_keys_output(output) assert len(keys) == 1 assert keys[0] == 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' @@ -56,19 +62,19 @@ uid:::::::AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA::Joe (Test) <j.s@s.com>: ssb::2048:1:AAAAAAAAAAAAAAAA:AAAAAAAAAA:::::::::: fpr:::::::::ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ:""" - keys = spack.util.gpg.parse_secret_keys_output(output) + keys = spack.util.gpg._parse_secret_keys_output(output) assert len(keys) == 2 assert keys[0] == 'WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW' assert keys[1] == 'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY' -@pytest.mark.skipif(not spack.util.gpg.GpgConstants.user_run_dir, - reason='This test requires /var/run/user/$(id -u)') @pytest.mark.requires_executables('gpg2') -def test_really_long_gnupg_home_dir(tmpdir): - N = 960 +def test_really_long_gnupghome_dir(tmpdir, has_socket_dir): + if not has_socket_dir: + pytest.skip('This test requires /var/run/user/$(id -u)') + N = 960 tdir = str(tmpdir) while len(tdir) < N: tdir = os.path.join(tdir, 'filler') @@ -76,10 +82,11 @@ def test_really_long_gnupg_home_dir(tmpdir): tdir = tdir[:N].rstrip(os.sep) tdir += '0' * (N - len(tdir)) - with spack.util.gpg.gnupg_home_override(tdir): - spack.util.gpg.create(name='Spack testing 1', - email='test@spack.io', - comment='Spack testing key', - expires='0') - + with spack.util.gpg.gnupghome_override(tdir): + spack.util.gpg.create( + name='Spack testing 1', + email='test@spack.io', + comment='Spack testing key', + expires='0' + ) spack.util.gpg.list(True, True) diff --git a/lib/spack/spack/util/gpg.py b/lib/spack/spack/util/gpg.py index 03b33af234..e224c2978f 100644 --- a/lib/spack/spack/util/gpg.py +++ b/lib/spack/spack/util/gpg.py @@ -2,74 +2,123 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) - import contextlib import errno import functools import os import re -import llnl.util.lang - import spack.error import spack.paths import spack.util.executable import spack.version -_gnupg_version_re = r"^gpg(conf)? \(GnuPG\) (.*)$" -_gnupg_home_override = None -_global_gpg_instance = None +#: Executable instance for "gpg", initialized lazily +GPG = None +#: Executable instance for "gpgconf", initialized lazily +GPGCONF = None +#: Socket directory required if a non default home directory is used +SOCKET_DIR = None +#: GNUPGHOME environment variable in the context of this Python module +GNUPGHOME = None -def get_gnupg_home(gnupg_home=None): - """Returns the directory that should be used as the GNUPGHOME environment - variable when calling gpg. +def clear(): + """Reset the global state to uninitialized.""" + global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME + GPG, GPGCONF, SOCKET_DIR, GNUPGHOME = None, None, None, None - If a [gnupg_home] is passed directly (and not None), that value will be - used. - Otherwise, if there is an override set (and it is not None), then that - value will be used. +def init(gnupghome=None, force=False): + """Initialize the global objects in the module, if not set. - Otherwise, if the environment variable "SPACK_GNUPGHOME" is set, then that - value will be used. + When calling any gpg executable, the GNUPGHOME environment + variable is set to: - Otherwise, the default gpg path for Spack will be used. + 1. The value of the `gnupghome` argument, if not None + 2. The value of the "SPACK_GNUPGHOME" environment variable, if set + 3. The default gpg path for Spack otherwise - See also: gnupg_home_override() + Args: + gnupghome (str): value to be used for GNUPGHOME when calling + GnuPG executables + force (bool): if True forces the re-initialization even if the + global objects are set already + """ + global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME + if force: + clear() + + # If the executables are already set, there's nothing to do + if GPG and GNUPGHOME: + return + + # Set the value of GNUPGHOME to be used in this module + GNUPGHOME = (gnupghome or + os.getenv('SPACK_GNUPGHOME') or + spack.paths.gpg_path) + + # Set the executable objects for "gpg" and "gpgconf" + GPG, GPGCONF = _gpg(), _gpgconf() + GPG.add_default_env('GNUPGHOME', GNUPGHOME) + if GPGCONF: + GPGCONF.add_default_env('GNUPGHOME', GNUPGHOME) + # Set the socket dir if not using GnuPG defaults + SOCKET_DIR = _socket_dir(GPGCONF) + + # Make sure that the GNUPGHOME exists + if not os.path.exists(GNUPGHOME): + os.makedirs(GNUPGHOME) + os.chmod(GNUPGHOME, 0o700) + + if not os.path.isdir(GNUPGHOME): + msg = 'GNUPGHOME "{0}" exists and is not a directory'.format(GNUPGHOME) + raise SpackGPGError(msg) + + if SOCKET_DIR is not None: + GPGCONF('--create-socketdir') + + +def _autoinit(func): + """Decorator to ensure that global variables have been initialized before + running the decorated function. + + Args: + func (callable): decorated function """ - return (gnupg_home or - _gnupg_home_override or - os.getenv('SPACK_GNUPGHOME') or - spack.paths.gpg_path) + @functools.wraps(func) + def _wrapped(*args, **kwargs): + init() + return func(*args, **kwargs) + return _wrapped @contextlib.contextmanager -def gnupg_home_override(new_gnupg_home): - global _gnupg_home_override - global _global_gpg_instance +def gnupghome_override(dir): + """Set the GNUPGHOME to a new location for this context. - old_gnupg_home_override = _gnupg_home_override - old_global_gpg_instance = _global_gpg_instance - - _gnupg_home_override = new_gnupg_home - _global_gpg_instance = None + Args: + dir (str): new value for GNUPGHOME + """ + global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME - yield + # Store backup values + _GPG, _GPGCONF = GPG, GPGCONF + _SOCKET_DIR, _GNUPGHOME = SOCKET_DIR, GNUPGHOME + clear() - _gnupg_home_override = old_gnupg_home_override - _global_gpg_instance = old_global_gpg_instance + # Clear global state + init(gnupghome=dir, force=True) + yield -def get_global_gpg_instance(): - global _global_gpg_instance - if _global_gpg_instance is None: - _global_gpg_instance = Gpg() - return _global_gpg_instance + clear() + GPG, GPGCONF = _GPG, _GPGCONF + SOCKET_DIR, GNUPGHOME = _SOCKET_DIR, _GNUPGHOME -def parse_secret_keys_output(output): +def _parse_secret_keys_output(output): keys = [] found_sec = False for line in output.split('\n'): @@ -84,7 +133,7 @@ def parse_secret_keys_output(output): return keys -def parse_public_keys_output(output): +def _parse_public_keys_output(output): keys = [] found_pub = False for line in output.split('\n'): @@ -99,334 +148,224 @@ def parse_public_keys_output(output): return keys -cached_property = getattr(functools, 'cached_property', None) - -# If older python version has no cached_property, emulate it here. -# TODO(opadron): maybe this shim should be moved to llnl.util.lang? -if not cached_property: - def cached_property(*args, **kwargs): - result = property(llnl.util.lang.memoized(*args, **kwargs)) - attr = result.fget.__name__ - - @result.deleter - def result(self): - getattr(type(self), attr).fget.cache.pop((self,), None) - - return result - - -class _GpgConstants(object): - @cached_property - def target_version(self): - return spack.version.Version('2') - - @cached_property - def gpgconf_string(self): - exe_str = spack.util.executable.which_string( - 'gpgconf', 'gpg2conf', 'gpgconf2') - - no_gpgconf_msg = ( - 'Spack requires gpgconf version >= 2\n' - ' To install a suitable version using Spack, run\n' - ' spack install gnupg@2:\n' - ' and load it by running\n' - ' spack load gnupg@2:') - - if not exe_str: - raise SpackGPGError(no_gpgconf_msg) - - exe = spack.util.executable.Executable(exe_str) - output = exe('--version', output=str) - match = re.search(_gnupg_version_re, output, re.M) - - if not match: - raise SpackGPGError('Could not determine gpgconf version') - - if spack.version.Version(match.group(2)) < self.target_version: - raise SpackGPGError(no_gpgconf_msg) - - # ensure that the gpgconf we found can run "gpgconf --create-socketdir" - try: - exe('--dry-run', '--create-socketdir') - except spack.util.executable.ProcessError: - # no dice - exe_str = None - - return exe_str +class SpackGPGError(spack.error.SpackError): + """Class raised when GPG errors are detected.""" - @cached_property - def gpg_string(self): - exe_str = spack.util.executable.which_string('gpg2', 'gpg') - no_gpg_msg = ( - 'Spack requires gpg version >= 2\n' - ' To install a suitable version using Spack, run\n' - ' spack install gnupg@2:\n' - ' and load it by running\n' - ' spack load gnupg@2:') +@_autoinit +def create(**kwargs): + """Create a new key pair.""" + r, w = os.pipe() + with contextlib.closing(os.fdopen(r, 'r')) as r: + with contextlib.closing(os.fdopen(w, 'w')) as w: + w.write(''' +Key-Type: rsa +Key-Length: 4096 +Key-Usage: sign +Name-Real: %(name)s +Name-Email: %(email)s +Name-Comment: %(comment)s +Expire-Date: %(expires)s +%%no-protection +%%commit +''' % kwargs) + GPG('--gen-key', '--batch', input=r) + + +@_autoinit +def signing_keys(*args): + """Return the keys that can be used to sign binaries.""" + output = GPG( + '--list-secret-keys', '--with-colons', '--fingerprint', + *args, output=str + ) + return _parse_secret_keys_output(output) + + +@_autoinit +def public_keys(*args): + """Return the keys that can be used to verify binaries.""" + output = GPG( + '--list-public-keys', '--with-colons', '--fingerprint', + *args, output=str + ) + return _parse_public_keys_output(output) + + +@_autoinit +def export_keys(location, keys, secret=False): + """Export public keys to a location passed as argument. + + Args: + location (str): where to export the keys + keys (list): keys to be exported + secret (bool): whether to export secret keys or not + """ + if secret: + GPG("--export-secret-keys", "--armor", "--output", location, *keys) + else: + GPG("--batch", "--yes", "--armor", "--export", "--output", location, *keys) - if not exe_str: - raise SpackGPGError(no_gpg_msg) - exe = spack.util.executable.Executable(exe_str) - output = exe('--version', output=str) - match = re.search(_gnupg_version_re, output, re.M) +@_autoinit +def trust(keyfile): + """Import a public key from a file. - if not match: - raise SpackGPGError('Could not determine gpg version') + Args: + keyfile (str): file with the public key + """ + GPG('--import', keyfile) - if spack.version.Version(match.group(2)) < self.target_version: - raise SpackGPGError(no_gpg_msg) - return exe_str +@_autoinit +def untrust(signing, *keys): + """Delete known keys. - @cached_property - def user_run_dir(self): - # Try to ensure that (/var)/run/user/$(id -u) exists so that - # `gpgconf --create-socketdir` can be run later. - # - # NOTE(opadron): This action helps prevent a large class of - # "file-name-too-long" errors in gpg. + Args: + signing (bool): if True deletes the secret keys + *keys: keys to be deleted + """ + if signing: + skeys = signing_keys(*keys) + GPG('--batch', '--yes', '--delete-secret-keys', *skeys) - try: - has_suitable_gpgconf = bool(GpgConstants.gpgconf_string) - except SpackGPGError: - has_suitable_gpgconf = False + pkeys = public_keys(*keys) + GPG('--batch', '--yes', '--delete-keys', *pkeys) - # If there is no suitable gpgconf, don't even bother trying to - # precreate a user run dir. - if not has_suitable_gpgconf: - return None - result = None - for var_run in ('/run', '/var/run'): - if not os.path.exists(var_run): - continue +@_autoinit +def sign(key, file, output, clearsign=False): + """Sign a file with a key. - var_run_user = os.path.join(var_run, 'user') - try: - if not os.path.exists(var_run_user): - os.mkdir(var_run_user) - os.chmod(var_run_user, 0o777) + Args: + key: key to be used to sign + file (str): file to be signed + output (str): output file (either the clearsigned file or + the detached signature) + clearsign (bool): if True wraps the document in an ASCII-armored + signature, if False creates a detached signature + """ + signopt = '--clearsign' if clearsign else '--detach-sign' + GPG(signopt, '--armor', '--default-key', key, '--output', output, file) - user_dir = os.path.join(var_run_user, str(os.getuid())) - if not os.path.exists(user_dir): - os.mkdir(user_dir) - os.chmod(user_dir, 0o700) +@_autoinit +def verify(signature, file, suppress_warnings=False): + """Verify the signature on a file. - # If the above operation fails due to lack of permissions, then - # just carry on without running gpgconf and hope for the best. - # - # NOTE(opadron): Without a dir in which to create a socket for IPC, - # gnupg may fail if GNUPGHOME is set to a path that - # is too long, where "too long" in this context is - # actually quite short; somewhere in the - # neighborhood of more than 100 characters. - # - # TODO(opadron): Maybe a warning should be printed in this case? - except OSError as exc: - if exc.errno not in (errno.EPERM, errno.EACCES): - raise - user_dir = None + Args: + signature (str): signature of the file + file (str): file to be verified + suppress_warnings (bool): whether or not to suppress warnings + from GnuPG + """ + kwargs = {'error': str} if suppress_warnings else {} + GPG('--verify', signature, file, **kwargs) - # return the last iteration that provides a usable user run dir - if user_dir is not None: - result = user_dir - return result +@_autoinit +def list(trusted, signing): + """List known keys. - def clear(self): - for attr in ('gpgconf_string', 'gpg_string', 'user_run_dir'): - try: - delattr(self, attr) - except AttributeError: - pass + Args: + trusted (bool): if True list public keys + signing (bool): if True list private keys + """ + if trusted: + GPG('--list-public-keys') + if signing: + GPG('--list-secret-keys') -GpgConstants = _GpgConstants() +def _verify_exe_or_raise(exe): + msg = ( + 'Spack requires gpgconf version >= 2\n' + ' To install a suitable version using Spack, run\n' + ' spack install gnupg@2:\n' + ' and load it by running\n' + ' spack load gnupg@2:' + ) + if not exe: + raise SpackGPGError(msg) -def ensure_gpg(reevaluate=False): - if reevaluate: - GpgConstants.clear() + output = exe('--version', output=str) + match = re.search(r"^gpg(conf)? \(GnuPG\) (.*)$", output, re.M) + if not match: + raise SpackGPGError( + 'Could not determine "{0}" version'.format(exe.name) + ) - if GpgConstants.user_run_dir is not None: - GpgConstants.gpgconf_string + if spack.version.Version(match.group(2)) < spack.version.Version('2'): + raise SpackGPGError(msg) - GpgConstants.gpg_string - return True +def _gpgconf(): + exe = spack.util.executable.which('gpgconf', 'gpg2conf', 'gpgconf2') + _verify_exe_or_raise(exe) -def has_gpg(*args, **kwargs): + # ensure that the gpgconf we found can run "gpgconf --create-socketdir" try: - return ensure_gpg(*args, **kwargs) - except SpackGPGError: - return False - - -# NOTE(opadron): When adding methods to this class, consider adding convenience -# wrapper functions further down in this file. -class Gpg(object): - def __init__(self, gnupg_home=None): - self.gnupg_home = get_gnupg_home(gnupg_home) - - @cached_property - def prep(self): - # Make sure that suitable versions of gpgconf and gpg are available - ensure_gpg() - - # Make sure that the GNUPGHOME exists - if not os.path.exists(self.gnupg_home): - os.makedirs(self.gnupg_home) - os.chmod(self.gnupg_home, 0o700) - - if not os.path.isdir(self.gnupg_home): - raise SpackGPGError( - 'GNUPGHOME "{0}" exists and is not a directory'.format( - self.gnupg_home)) - - if GpgConstants.user_run_dir is not None: - self.gpgconf_exe('--create-socketdir') - - return True - - @cached_property - def gpgconf_exe(self): - exe = spack.util.executable.Executable(GpgConstants.gpgconf_string) - exe.add_default_env('GNUPGHOME', self.gnupg_home) - return exe - - @cached_property - def gpg_exe(self): - exe = spack.util.executable.Executable(GpgConstants.gpg_string) - exe.add_default_env('GNUPGHOME', self.gnupg_home) - return exe - - def __call__(self, *args, **kwargs): - if self.prep: - return self.gpg_exe(*args, **kwargs) - - def create(self, **kwargs): - r, w = os.pipe() - r = os.fdopen(r, 'r') - w = os.fdopen(w, 'w') - w.write(''' - Key-Type: rsa - Key-Length: 4096 - Key-Usage: sign - Name-Real: %(name)s - Name-Email: %(email)s - Name-Comment: %(comment)s - Expire-Date: %(expires)s - %%no-protection - %%commit - ''' % kwargs) - w.close() - self('--gen-key', '--batch', input=r) - r.close() - - def signing_keys(self, *args): - output = self('--list-secret-keys', '--with-colons', '--fingerprint', - *args, output=str) - return parse_secret_keys_output(output) - - def public_keys(self, *args): - output = self('--list-public-keys', '--with-colons', '--fingerprint', - *args, output=str) - return parse_public_keys_output(output) - - def export_keys(self, location, keys, secret=False): - if secret: - self("--export-secret-keys", "--armor", "--output", location, *keys) - else: - self('--batch', '--yes', '--armor', '--export', '--output', - location, *keys) - - def trust(self, keyfile): - self('--import', keyfile) - - def untrust(self, signing, *keys): - if signing: - skeys = self.signing_keys(*keys) - self('--batch', '--yes', '--delete-secret-keys', *skeys) - - pkeys = self.public_keys(*keys) - self('--batch', '--yes', '--delete-keys', *pkeys) - - def sign(self, key, file, output, clearsign=False): - self(('--clearsign' if clearsign else '--detach-sign'), - '--armor', '--default-key', key, - '--output', output, file) - - def verify(self, signature, file, suppress_warnings=False): - self('--verify', signature, file, - **({'error': str} if suppress_warnings else {})) - - def list(self, trusted, signing): - if trusted: - self('--list-public-keys') - - if signing: - self('--list-secret-keys') - - -class SpackGPGError(spack.error.SpackError): - """Class raised when GPG errors are detected.""" - + exe('--dry-run', '--create-socketdir') + except spack.util.executable.ProcessError: + # no dice + exe = None -# Convenience wrappers for methods of the Gpg class + return exe -# __call__ is a bit of a special case, since the Gpg instance is, itself, the -# "thing" that is being called. -@functools.wraps(Gpg.__call__) -def gpg(*args, **kwargs): - return get_global_gpg_instance()(*args, **kwargs) +def _gpg(): + exe = spack.util.executable.which('gpg2', 'gpg') + _verify_exe_or_raise(exe) + return exe -gpg.name = 'gpg' # type: ignore[attr-defined] +def _socket_dir(gpgconf): + # Try to ensure that (/var)/run/user/$(id -u) exists so that + # `gpgconf --create-socketdir` can be run later. + # + # NOTE(opadron): This action helps prevent a large class of + # "file-name-too-long" errors in gpg. -@functools.wraps(Gpg.create) -def create(*args, **kwargs): - return get_global_gpg_instance().create(*args, **kwargs) + # If there is no suitable gpgconf, don't even bother trying to + # pre-create a user run dir. + if not gpgconf: + return None + result = None + for var_run in ('/run', '/var/run'): + if not os.path.exists(var_run): + continue -@functools.wraps(Gpg.signing_keys) -def signing_keys(*args, **kwargs): - return get_global_gpg_instance().signing_keys(*args, **kwargs) - - -@functools.wraps(Gpg.public_keys) -def public_keys(*args, **kwargs): - return get_global_gpg_instance().public_keys(*args, **kwargs) - - -@functools.wraps(Gpg.export_keys) -def export_keys(*args, **kwargs): - return get_global_gpg_instance().export_keys(*args, **kwargs) - - -@functools.wraps(Gpg.trust) -def trust(*args, **kwargs): - return get_global_gpg_instance().trust(*args, **kwargs) - - -@functools.wraps(Gpg.untrust) -def untrust(*args, **kwargs): - return get_global_gpg_instance().untrust(*args, **kwargs) - + var_run_user = os.path.join(var_run, 'user') + try: + if not os.path.exists(var_run_user): + os.mkdir(var_run_user) + os.chmod(var_run_user, 0o777) -@functools.wraps(Gpg.sign) -def sign(*args, **kwargs): - return get_global_gpg_instance().sign(*args, **kwargs) + user_dir = os.path.join(var_run_user, str(os.getuid())) + if not os.path.exists(user_dir): + os.mkdir(user_dir) + os.chmod(user_dir, 0o700) -@functools.wraps(Gpg.verify) -def verify(*args, **kwargs): - return get_global_gpg_instance().verify(*args, **kwargs) + # If the above operation fails due to lack of permissions, then + # just carry on without running gpgconf and hope for the best. + # + # NOTE(opadron): Without a dir in which to create a socket for IPC, + # gnupg may fail if GNUPGHOME is set to a path that + # is too long, where "too long" in this context is + # actually quite short; somewhere in the + # neighborhood of more than 100 characters. + # + # TODO(opadron): Maybe a warning should be printed in this case? + except OSError as exc: + if exc.errno not in (errno.EPERM, errno.EACCES): + raise + user_dir = None + # return the last iteration that provides a usable user run dir + if user_dir is not None: + result = user_dir -@functools.wraps(Gpg.list) -def list(*args, **kwargs): - return get_global_gpg_instance().list(*args, **kwargs) + return result |