summaryrefslogtreecommitdiff
path: root/lib/spack/spack/test/llnl/util/lock.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/spack/spack/test/llnl/util/lock.py')
-rw-r--r--lib/spack/spack/test/llnl/util/lock.py105
1 files changed, 104 insertions, 1 deletions
diff --git a/lib/spack/spack/test/llnl/util/lock.py b/lib/spack/spack/test/llnl/util/lock.py
index c84573e75e..c279d7e918 100644
--- a/lib/spack/spack/test/llnl/util/lock.py
+++ b/lib/spack/spack/test/llnl/util/lock.py
@@ -73,8 +73,11 @@ from multiprocessing import Process
import pytest
from llnl.util.filesystem import touch
+
+import spack.util.lock
+from spack.util.executable import which
from spack.util.multiproc import Barrier
-from llnl.util.lock import Lock, WriteTransaction, ReadTransaction, LockError
+from spack.util.lock import Lock, WriteTransaction, ReadTransaction, LockError
#
@@ -183,15 +186,23 @@ def private_lock_path(lock_dir):
lock_file = os.path.join(lock_dir, 'lockfile')
if mpi:
lock_file += '.%s' % comm.rank
+
yield lock_file
+ if os.path.exists(lock_file):
+ os.unlink(lock_file)
+
@pytest.fixture
def lock_path(lock_dir):
"""This lock is shared among all processes in a multiproc test."""
lock_file = os.path.join(lock_dir, 'lockfile')
+
yield lock_file
+ if os.path.exists(lock_file):
+ os.unlink(lock_file)
+
def local_multiproc_test(*functions):
"""Order some processes using simple barrier synchronization."""
@@ -900,3 +911,95 @@ def test_transaction_with_context_manager_and_exception(lock_path):
assert vals['exception']
assert not vals['exited_fn']
assert not vals['exception_fn']
+
+
+def test_disable_locking(private_lock_path):
+ """Ensure that locks do no real locking when disabled."""
+ old_value = spack.config.get('config:locks')
+
+ with spack.config.override('config:locks', False):
+ lock = Lock(private_lock_path)
+
+ lock.acquire_read()
+ assert not os.path.exists(private_lock_path)
+
+ lock.acquire_write()
+ assert not os.path.exists(private_lock_path)
+
+ lock.release_write()
+ assert not os.path.exists(private_lock_path)
+
+ lock.release_read()
+ assert not os.path.exists(private_lock_path)
+
+ assert old_value == spack.config.get('config:locks')
+
+
+def test_lock_checks_user(tmpdir):
+ """Ensure lock checks work."""
+ path = str(tmpdir)
+ uid = os.getuid()
+
+ # self-owned, own group
+ tmpdir.chown(uid, uid)
+
+ # safe
+ tmpdir.chmod(0o744)
+ spack.util.lock.check_lock_safety(path)
+
+ # safe
+ tmpdir.chmod(0o774)
+ spack.util.lock.check_lock_safety(path)
+
+ # unsafe
+ tmpdir.chmod(0o777)
+ with pytest.raises(spack.error.SpackError):
+ spack.util.lock.check_lock_safety(path)
+
+ # safe
+ tmpdir.chmod(0o474)
+ spack.util.lock.check_lock_safety(path)
+
+ # safe
+ tmpdir.chmod(0o477)
+ spack.util.lock.check_lock_safety(path)
+
+
+def test_lock_checks_group(tmpdir):
+ path = str(tmpdir)
+ uid = os.getuid()
+
+ id_cmd = which('id')
+ if not id_cmd:
+ pytest.skip("Can't determine user's groups.")
+
+ # find a legal gid to user that is NOT the user's uid
+ gids = [int(gid) for gid in id_cmd('-G', output=str).split(' ')]
+ gid = next((g for g in gids if g != uid), None)
+ if gid is None:
+ pytest.skip("Can't determine user's groups.")
+
+ # self-owned, another group
+ tmpdir.chown(uid, gid)
+
+ # safe
+ tmpdir.chmod(0o744)
+ spack.util.lock.check_lock_safety(path)
+
+ # unsafe
+ tmpdir.chmod(0o774)
+ with pytest.raises(spack.error.SpackError):
+ spack.util.lock.check_lock_safety(path)
+
+ # unsafe
+ tmpdir.chmod(0o777)
+ with pytest.raises(spack.error.SpackError):
+ spack.util.lock.check_lock_safety(path)
+
+ # safe
+ tmpdir.chmod(0o474)
+ spack.util.lock.check_lock_safety(path)
+
+ # safe
+ tmpdir.chmod(0o477)
+ spack.util.lock.check_lock_safety(path)