summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorTodd Gamblin <tgamblin@llnl.gov>2017-07-03 17:30:18 -0700
committerTodd Gamblin <tgamblin@llnl.gov>2017-07-04 11:41:37 -0700
commitb4d1654e68880a6c917f8c8e07e6ac2edb6d70ea (patch)
treee0e3eef5406db20f1be34f7a43a35d694c5b7177 /lib
parentbd7a591df17c38d6ce9f51e092364a98ed4c1d7b (diff)
downloadspack-b4d1654e68880a6c917f8c8e07e6ac2edb6d70ea.tar.gz
spack-b4d1654e68880a6c917f8c8e07e6ac2edb6d70ea.tar.bz2
spack-b4d1654e68880a6c917f8c8e07e6ac2edb6d70ea.tar.xz
spack-b4d1654e68880a6c917f8c8e07e6ac2edb6d70ea.zip
Parametrized lock test and make it work with MPI
- Lock test can be run either as a node-local test or as an MPI test. - Lock test is now parametrized by filesystem, so you can test the locking capabilities of your NFS, Lustre, or GPFS filesystem. See docs for details.
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/llnl/util/lock.py9
-rw-r--r--lib/spack/spack/test/lock.py292
2 files changed, 255 insertions, 46 deletions
diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py
index 2cab436e2d..55837c371e 100644
--- a/lib/spack/llnl/util/lock.py
+++ b/lib/spack/llnl/util/lock.py
@@ -127,8 +127,9 @@ class Lock(object):
return
- except IOError as error:
- if error.errno == errno.EAGAIN or error.errno == errno.EACCES:
+ except IOError as e:
+ if e.errno in (errno.EAGAIN, errno.EACCES):
+ # EAGAIN and EACCES == locked by another process
pass
else:
raise
@@ -197,6 +198,8 @@ class Lock(object):
tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
.format(self))
self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError.
+ tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquired]'
+ .format(self))
self._reads += 1
return True
else:
@@ -219,6 +222,8 @@ class Lock(object):
'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
.format(self))
self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError.
+ tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquired]'
+ .format(self))
self._writes += 1
return True
else:
diff --git a/lib/spack/spack/test/lock.py b/lib/spack/spack/test/lock.py
index 7cf4571016..347b72b575 100644
--- a/lib/spack/spack/test/lock.py
+++ b/lib/spack/spack/test/lock.py
@@ -22,37 +22,178 @@
# License along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##############################################################################
-"""
-These tests ensure that our lock works correctly.
+"""These tests ensure that our lock works correctly.
+
+This can be run in two ways.
+
+First, it can be run as a node-local test, with a typical invocation like
+this::
+
+ spack test lock
+
+You can *also* run it as an MPI program, which allows you to test locks
+across nodes. So, e.g., you can run the test like this::
+
+ mpirun -n 7 spack test lock
+
+And it will test locking correctness among MPI processes. Ideally, you
+want the MPI processes to span across multiple nodes, so, e.g., for SLURM
+you might do this::
+
+ srun -N 7 -n 7 -m cyclic spack test lock
+
+You can use this to test whether your shared filesystem properly supports
+POSIX reader-writer locking with byte ranges through fcntl.
+
+If you want to test on multiple filesystems, you can modify the
+``locations`` list below. By default it looks like this::
+
+ locations = [
+ tempfile.gettempdir(), # standard tmp directory (potentially local)
+ '/nfs/tmp2/%u', # NFS tmp mount
+ '/p/lscratch*/%u' # Lustre scratch mount
+ ]
+
+Add names and paths for your preferred filesystem mounts to test on them;
+the tests are parametrized to run on all the filesystems listed in this
+dict. Note that 'tmp' will be skipped for MPI testing, as it is often a
+node-local filesystem, and multi-node tests will fail if the locks aren't
+actually on a shared filesystem.
+
"""
import os
import shutil
-import functools
import tempfile
import traceback
+import glob
+import getpass
+from contextlib import contextmanager
from multiprocessing import Process
import pytest
-from llnl.util.filesystem import join_path, touch, mkdirp
+from llnl.util.filesystem import join_path, touch
from llnl.util.lock import *
from spack.util.multiproc import Barrier
-# This is the longest a failed test will take, as the barriers will
-# time out and raise an exception.
+#
+# This test can be run with MPI. MPI is "enabled" if we can import
+# mpi4py and the number of total MPI processes is greater than 1.
+# Otherwise it just runs as a node-local test.
+#
+# NOTE: MPI mode is different from node-local mode in that node-local
+# mode will spawn its own test processes, while MPI mode assumes you've
+# run this script as a SPMD application. In MPI mode, no additional
+# processes are spawned, and you need to ensure that you mpirun the
+# script with enough processes for all the multiproc_test cases below.
+#
+# If you don't run with enough processes, tests that require more
+# processes than you currently have will be skipped.
+#
+mpi = False
+comm = None
+try:
+ from mpi4py import MPI
+ comm = MPI.COMM_WORLD
+ if comm.size > 1:
+ mpi = True
+except:
+ pass
+
+
+"""This is a list of filesystem locations to test locks in. Paths are
+expanded so that %u is replaced with the current username. '~' is also
+legal and will be expanded to the user's home directory.
+
+Tests are skipped for directories that don't exist, so you'll need to
+update this with the locations of NFS, Lustre, and other mounts on your
+system.
+"""
+locations = [
+ tempfile.gettempdir(),
+ os.path.join('/nfs/tmp2/', getpass.getuser()),
+ os.path.join('/p/lscratch*/', getpass.getuser()),
+]
+
+"""This is the longest a failed multiproc test will take.
+Barriers will time out and raise an exception after this interval.
+In MPI mode, barriers don't time out (they hang). See mpi_multiproc_test.
+"""
barrier_timeout = 5
+"""This is the lock timeout for expected failures.
+This may need to be higher for some filesystems."""
+lock_fail_timeout = 0.1
+
+
+@contextmanager
+def read_only(path):
+ orginal_mode = os.stat(path).st_mode
+ os.chmod(path, 0o444)
+ yield
+ os.chmod(path, orginal_mode)
+
+
+@pytest.fixture(scope='session', params=locations)
+def lock_test_directory(request):
+ """This fixture causes tests to be executed for many different mounts.
+
+ See the ``locations`` dict above for details.
+ """
+ return request.param
+
+
+@pytest.fixture(scope='session')
+def lock_dir(lock_test_directory):
+ parent = next((p for p in glob.glob(lock_test_directory)
+ if os.path.exists(p) and os.access(p, os.W_OK)), None)
+ if not parent:
+ # Skip filesystems that don't exist or aren't writable
+ pytest.skip("requires filesystem: '%s'" % lock_test_directory)
+ elif mpi and parent == tempfile.gettempdir():
+ # Skip local tmp test for MPI runs
+ pytest.skip("skipping local tmp directory for MPI test.")
+
+ tempdir = None
+ if not mpi or comm.rank == 0:
+ tempdir = tempfile.mkdtemp(dir=parent)
+ if mpi:
+ tempdir = comm.bcast(tempdir)
+
+ yield tempdir
+
+ if mpi:
+ # rank 0 may get here before others, in which case it'll try to
+ # remove the directory while other processes try to re-create the
+ # lock. This will give errno 39: directory not empty. Use a
+ # barrier to ensure everyone is done first.
+ comm.barrier()
+
+ if not mpi or comm.rank == 0:
+ shutil.rmtree(tempdir)
+
+
+@pytest.fixture
+def private_lock_path(lock_dir):
+ """In MPI mode, this is a private lock for each rank in a multiproc test.
+
+ For other modes, it is the same as a shared lock.
+ """
+ lock_file = join_path(lock_dir, 'lockfile')
+ if mpi:
+ lock_file += '.%s' % comm.rank
+ yield lock_file
+
-@pytest.fixture()
-def lock_path():
- tempdir = tempfile.mkdtemp()
- lock_file = join_path(tempdir, 'lockfile')
+@pytest.fixture
+def lock_path(lock_dir):
+ """This lock is shared among all processes in a multiproc test."""
+ lock_file = join_path(lock_dir, 'lockfile')
yield lock_file
- shutil.rmtree(tempdir)
-def multiproc_test(*functions):
+def local_multiproc_test(*functions):
"""Order some processes using simple barrier synchronization."""
b = Barrier(len(functions), timeout=barrier_timeout)
procs = [Process(target=f, args=(b,)) for f in functions]
@@ -65,6 +206,52 @@ def multiproc_test(*functions):
assert p.exitcode == 0
+def mpi_multiproc_test(*functions):
+ """SPMD version of multiproc test.
+
+ This needs to be run like so:
+
+ srun spack test lock
+
+ Each process executes its corresponding function. This is different
+ from ``multiproc_test`` above, which spawns the processes. This will
+ skip tests if there are too few processes to run them.
+ """
+ procs = len(functions)
+ if procs > comm.size:
+ pytest.skip("requires at least %d MPI processes" % procs)
+
+ comm.Barrier() # barrier before each MPI test
+
+ include = comm.rank < len(functions)
+ subcomm = comm.Split(include)
+
+ class subcomm_barrier(object):
+ """Stand-in for multiproc barrier for MPI-parallel jobs."""
+ def wait(self):
+ subcomm.Barrier()
+
+ if include:
+ try:
+ functions[subcomm.rank](subcomm_barrier())
+ except:
+ # aborting is the best we can do for MPI tests without
+ # hanging, since we're using MPI barriers. This will fail
+ # early and it loses the nice pytest output, but at least it
+ # gets use a stacktrace on the processes that failed.
+ traceback.print_exc()
+ comm.Abort()
+ subcomm.Free()
+
+ comm.Barrier() # barrier after each MPI test.
+
+
+"""``multiproc_test()`` should be called by tests below.
+``multiproc_test()`` will work for either MPI runs or for local runs.
+"""
+multiproc_test = mpi_multiproc_test if mpi else local_multiproc_test
+
+
#
# Process snippets below can be composed into tests.
#
@@ -91,7 +278,7 @@ def timeout_write(lock_path, start=0, length=0):
lock = Lock(lock_path, start, length)
barrier.wait() # wait for lock acquire in first process
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
barrier.wait()
return fn
@@ -101,7 +288,7 @@ def timeout_read(lock_path, start=0, length=0):
lock = Lock(lock_path, start, length)
barrier.wait() # wait for lock acquire in first process
with pytest.raises(LockError):
- lock.acquire_read(0.1)
+ lock.acquire_read(lock_fail_timeout)
barrier.wait()
return fn
@@ -111,7 +298,9 @@ def timeout_read(lock_path, start=0, length=0):
# exclusive lock is held.
#
def test_write_lock_timeout_on_write(lock_path):
- multiproc_test(acquire_write(lock_path), timeout_write(lock_path))
+ multiproc_test(
+ acquire_write(lock_path),
+ timeout_write(lock_path))
def test_write_lock_timeout_on_write_2(lock_path):
@@ -258,7 +447,8 @@ def test_write_lock_timeout_on_read_ranges_3(lock_path):
def test_write_lock_timeout_on_read_ranges_4(lock_path):
multiproc_test(
acquire_read(lock_path, 0, 64),
- timeout_write(lock_path, 10, 1), timeout_write(lock_path, 32, 1))
+ timeout_write(lock_path, 10, 1),
+ timeout_write(lock_path, 32, 1))
def test_write_lock_timeout_on_read_ranges_5(lock_path):
@@ -268,6 +458,7 @@ def test_write_lock_timeout_on_read_ranges_5(lock_path):
timeout_write(lock_path, 127, 1),
timeout_write(lock_path, 90, 10))
+
#
# Test that exclusive locks time while lots of shared locks are held.
#
@@ -339,12 +530,19 @@ def test_write_lock_timeout_with_multiple_readers_3_2_ranges(lock_path):
#
# Test that read can be upgraded to write.
#
-def test_upgrade_read_to_write(lock_path):
+def test_upgrade_read_to_write(private_lock_path):
+ """Test that a read lock can be upgraded to a write lock.
+
+ Note that to upgrade a read lock to a write lock, you have the be the
+ only holder of a read lock. Client code needs to coordinate that for
+ shared locks. For this test, we use a private lock just to test that an
+ upgrade is possible.
+ """
# ensure lock file exists the first time, so we open it read-only
# to begin wtih.
- touch(lock_path)
+ touch(private_lock_path)
- lock = Lock(lock_path)
+ lock = Lock(private_lock_path)
assert lock._reads == 0
assert lock._writes == 0
@@ -368,26 +566,28 @@ def test_upgrade_read_to_write(lock_path):
assert lock._writes == 0
assert lock._file is None
+
#
# Test that read-only file can be read-locked but not write-locked.
#
-def test_upgrade_read_to_write_fails_with_readonly_file(lock_path):
+def test_upgrade_read_to_write_fails_with_readonly_file(private_lock_path):
# ensure lock file exists the first time, so we open it read-only
# to begin wtih.
- touch(lock_path)
- os.chmod(lock_path, 0o444)
+ touch(private_lock_path)
- lock = Lock(lock_path)
- assert lock._reads == 0
- assert lock._writes == 0
+ with read_only(private_lock_path):
+ lock = Lock(private_lock_path)
+ assert lock._reads == 0
+ assert lock._writes == 0
- lock.acquire_read()
- assert lock._reads == 1
- assert lock._writes == 0
- assert lock._file.mode == 'r'
+ lock.acquire_read()
+ assert lock._reads == 1
+ assert lock._writes == 0
+ assert lock._file.mode == 'r'
+
+ with pytest.raises(LockError):
+ lock.acquire_write()
- with pytest.raises(LockError):
- lock.acquire_write()
#
# Longer test case that ensures locks are reusable. Ordering is
@@ -404,7 +604,7 @@ def test_complex_acquire_and_release_chain(lock_path):
lock.release_write() # release and others acquire read
barrier.wait() # ---------------------------------------- 3
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
lock.acquire_read()
barrier.wait() # ---------------------------------------- 4
lock.release_read()
@@ -413,9 +613,9 @@ def test_complex_acquire_and_release_chain(lock_path):
# p2 upgrades read to write
barrier.wait() # ---------------------------------------- 6
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError):
- lock.acquire_read(0.1)
+ lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 7
# p2 releases write and read
barrier.wait() # ---------------------------------------- 8
@@ -425,9 +625,9 @@ def test_complex_acquire_and_release_chain(lock_path):
# p3 upgrades read to write
barrier.wait() # ---------------------------------------- 10
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError):
- lock.acquire_read(0.1)
+ lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 11
# p3 releases locks
barrier.wait() # ---------------------------------------- 12
@@ -441,9 +641,9 @@ def test_complex_acquire_and_release_chain(lock_path):
# p1 acquires write
barrier.wait() # ---------------------------------------- 1
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError):
- lock.acquire_read(0.1)
+ lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 2
lock.acquire_read()
barrier.wait() # ---------------------------------------- 3
@@ -465,9 +665,9 @@ def test_complex_acquire_and_release_chain(lock_path):
# p3 upgrades read to write
barrier.wait() # ---------------------------------------- 10
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError):
- lock.acquire_read(0.1)
+ lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 11
# p3 releases locks
barrier.wait() # ---------------------------------------- 12
@@ -481,9 +681,9 @@ def test_complex_acquire_and_release_chain(lock_path):
# p1 acquires write
barrier.wait() # ---------------------------------------- 1
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError):
- lock.acquire_read(0.1)
+ lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 2
lock.acquire_read()
barrier.wait() # ---------------------------------------- 3
@@ -495,9 +695,9 @@ def test_complex_acquire_and_release_chain(lock_path):
# p2 upgrades read to write
barrier.wait() # ---------------------------------------- 6
with pytest.raises(LockError):
- lock.acquire_write(0.1)
+ lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError):
- lock.acquire_read(0.1)
+ lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 7
# p2 releases write & read
barrier.wait() # ---------------------------------------- 8
@@ -517,6 +717,7 @@ def test_complex_acquire_and_release_chain(lock_path):
multiproc_test(p1, p2, p3)
+
def test_transaction(lock_path):
def enter_fn():
vals['entered'] = True
@@ -542,6 +743,7 @@ def test_transaction(lock_path):
assert vals['exited']
assert not vals['exception']
+
def test_transaction_with_exception(lock_path):
def enter_fn():
vals['entered'] = True
@@ -574,6 +776,7 @@ def test_transaction_with_exception(lock_path):
assert vals['exited']
assert vals['exception']
+
def test_transaction_with_context_manager(lock_path):
class TestContextManager(object):
@@ -634,6 +837,7 @@ def test_transaction_with_context_manager(lock_path):
assert not vals['exited_fn']
assert not vals['exception_fn']
+
def test_transaction_with_context_manager_and_exception(lock_path):
class TestContextManager(object):
def __enter__(self):