From ead8ac58c6ecde1b8bd32e9f651483b57c7e3bd5 Mon Sep 17 00:00:00 2001 From: Todd Gamblin Date: Sat, 24 Oct 2015 19:55:22 -0700 Subject: Working Lock class, now uses POSIX fcntl locks, extensive unit test. - llnl.util.lock now uses fcntl.lockf instead of flock - purported to have more NFS compatibility. - Added an extensive test case for locks. - tests acquiring, releasing, upgrading, timeouts, shared, & exclusive cases. --- lib/spack/llnl/util/lock.py | 167 +++++++++++++------------ lib/spack/spack/test/__init__.py | 3 +- lib/spack/spack/test/lock.py | 264 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 350 insertions(+), 84 deletions(-) create mode 100644 lib/spack/spack/test/lock.py diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py index 3cd02befe5..6e49bf74e6 100644 --- a/lib/spack/llnl/util/lock.py +++ b/lib/spack/llnl/util/lock.py @@ -1,5 +1,5 @@ ############################################################################## -# Copyright (c) 2013, Lawrence Livermore National Security, LLC. +# Copyright (c) 2013-2015, Lawrence Livermore National Security, LLC. # Produced at the Lawrence Livermore National Laboratory. # # This file is part of Spack. @@ -22,134 +22,135 @@ # along with this program; if not, write to the Free Software Foundation, # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ############################################################################## -"""Lock implementation for shared filesystems.""" import os import fcntl import errno import time import socket -# Default timeout for locks. -DEFAULT_TIMEOUT = 60 +# Default timeout in seconds, after which locks will raise exceptions. +_default_timeout = 60 -class _ReadLockContext(object): - """Context manager that takes and releases a read lock. +# Sleep time per iteration in spin loop (in seconds) +_sleep_time = 1e-5 - Arguments are lock and timeout (default 5 minutes) - """ - def __init__(self, lock, timeout=DEFAULT_TIMEOUT): - self._lock = lock - self._timeout = timeout - def __enter__(self): - self._lock.acquire_read(self._timeout) +class Lock(object): + def __init__(self,file_path): + self._file_path = file_path + self._fd = None + self._reads = 0 + self._writes = 0 - def __exit__(self,type,value,traceback): - self._lock.release_read() + def _lock(self, op, timeout): + """This takes a lock using POSIX locks (``fnctl.lockf``). -class _WriteLockContext(object): - """Context manager that takes and releases a write lock. + The lock is implemented as a spin lock using a nonblocking + call to lockf(). - Arguments are lock and timeout (default 5 minutes) - """ - def __init__(self, lock, timeout=DEFAULT_TIMEOUT): - self._lock = lock - self._timeout = timeout + On acquiring an exclusive lock, the lock writes this process's + pid and host to the lock file, in case the holding process + needs to be killed later. - def __enter__(self): - self._lock.acquire_write(self._timeout) + If the lock times out, it raises a ``LockError``. + """ + start_time = time.time() + while (time.time() - start_time) < timeout: + try: + if self._fd is None: + self._fd = os.open(self._file_path, os.O_RDWR) - def __exit__(self,type,value,traceback): - self._lock.release_write() + fcntl.lockf(self._fd, op | fcntl.LOCK_NB) + if op == fcntl.LOCK_EX: + os.write(self._fd, "pid=%s,host=%s" % (os.getpid(), socket.getfqdn())) + return + + except IOError as error: + if error.errno == errno.EAGAIN or error.errno == errno.EACCES: + pass + else: + raise + time.sleep(_sleep_time) + raise LockError("Timed out waiting for lock.") -class Lock(object): - """Distributed file-based lock using ``flock``.""" - def __init__(self, file_path): - self._file_path = file_path - self._fd = os.open(file_path,os.O_RDWR) - self._reads = 0 - self._writes = 0 + def _unlock(self): + """Releases a lock using POSIX locks (``fcntl.lockf``) + Releases the lock regardless of mode. Note that read locks may + be masquerading as write locks, but this removes either. - def write_lock(self, timeout=DEFAULT_TIMEOUT): - """Convenience method that returns a write lock context.""" - return _WriteLockContext(self, timeout) + """ + fcntl.lockf(self._fd,fcntl.LOCK_UN) + os.close(self._fd) + self._fd = None - def read_lock(self, timeout=DEFAULT_TIMEOUT): - """Convenience method that returns a read lock context.""" - return _ReadLockContext(self, timeout) + def acquire_read(self, timeout=_default_timeout): + """Acquires a recursive, shared lock for reading. + Read and write locks can be acquired and released in arbitrary + order, but the POSIX lock is held until all local read and + write locks are released. - def acquire_read(self, timeout): - """ - Implements recursive lock. If held in both read and write mode, - the write lock will be maintained until all locks are released """ if self._reads == 0 and self._writes == 0: self._lock(fcntl.LOCK_SH, timeout) self._reads += 1 - def acquire_write(self, timeout): - """ - Implements recursive lock + def acquire_write(self, timeout=_default_timeout): + """Acquires a recursive, exclusive lock for writing. + + Read and write locks can be acquired and released in arbitrary + order, but the POSIX lock is held until all local read and + write locks are released. """ if self._writes == 0: self._lock(fcntl.LOCK_EX, timeout) self._writes += 1 - def _lock(self, op, timeout): - """ - The timeout is implemented using nonblocking flock() - to avoid using signals for timing - Write locks store pid and host information to the lock file - Read locks do not store data - """ - total_time = 0 - while total_time < timeout: - try: - fcntl.flock(self._fd, op | fcntl.LOCK_NB) - if op == fcntl.LOCK_EX: - with open(self._file_path, 'w') as f: - f.write("pid = " + str(os.getpid()) + ", host = " + socket.getfqdn()) - return - except IOError as error: - if error.errno == errno.EAGAIN or error.errno == EACCES: - pass - else: - raise - time.sleep(0.1) - total_time += 0.1 + def release_read(self): + """Releases a read lock. + Returns True if the last recursive lock was released, False if + there are still outstanding locks. + + Does limited correctness checking: if a read lock is released + when none are held, this will raise an assertion error. - def release_read(self): - """ - Assert there is a lock of the right type to release, recursive lock """ assert self._reads > 0 - if self._reads == 1 and self._writes == 0: - self._unlock() + self._reads -= 1 + if self._reads == 0 and self._writes == 0: + self._unlock() + return True + return False def release_write(self): - """ - Assert there is a lock of the right type to release, recursive lock + """Releases a write lock. + + Returns True if the last recursive lock was released, False if + there are still outstanding locks. + + Does limited correctness checking: if a read lock is released + when none are held, this will raise an assertion error. + """ assert self._writes > 0 - if self._writes == 1 and self._reads == 0: - self._unlock() + self._writes -= 1 + if self._writes == 0 and self._reads == 0: + self._unlock() + return True + return False - def _unlock(self): - """ - Releases the lock regardless of mode. Note that read locks may be - masquerading as write locks at times, but this removes either. - """ - fcntl.flock(self._fd, fcntl.LOCK_UN) +class LockError(Exception): + """Raised when an attempt to acquire a lock times out.""" + pass diff --git a/lib/spack/spack/test/__init__.py b/lib/spack/spack/test/__init__.py index c3b39b76f8..84419781e2 100644 --- a/lib/spack/spack/test/__init__.py +++ b/lib/spack/spack/test/__init__.py @@ -57,6 +57,7 @@ test_names = ['versions', 'optional_deps', 'make_executable', 'configure_guess', + 'lock', 'database'] @@ -77,7 +78,7 @@ def run(names, verbose=False): if test not in test_names: tty.error("%s is not a valid spack test name." % test, "Valid names are:") - colify(test_names, indent=4) + colify(sorted(test_names), indent=4) sys.exit(1) runner = unittest.TextTestRunner(verbosity=verbosity) diff --git a/lib/spack/spack/test/lock.py b/lib/spack/spack/test/lock.py new file mode 100644 index 0000000000..2e7440bbbc --- /dev/null +++ b/lib/spack/spack/test/lock.py @@ -0,0 +1,264 @@ +############################################################################## +# Copyright (c) 2013-2015, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory. +# +# This file is part of Spack. +# Written by Todd Gamblin, tgamblin@llnl.gov, All rights reserved. +# LLNL-CODE-647188 +# +# For details, see https://scalability-llnl.github.io/spack +# Please also see the LICENSE file for our notice and the LGPL. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License (as published by +# the Free Software Foundation) version 2.1 dated February 1999. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the IMPLIED WARRANTY OF +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the terms and +# conditions of the GNU General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +############################################################################## +""" +These tests ensure that our lock works correctly. +""" +import unittest +import os +import tempfile +import shutil +from multiprocessing import Process + +from llnl.util.lock import * +from llnl.util.filesystem import join_path, touch + +from spack.util.multiproc import Barrier + +# This is the longest a failed test will take, as the barriers will +# time out and raise an exception. +barrier_timeout = 5 + + +def order_processes(*functions): + """Order some processes using simple barrier synchronization.""" + b = Barrier(len(functions), timeout=barrier_timeout) + procs = [Process(target=f, args=(b,)) for f in functions] + for p in procs: p.start() + for p in procs: p.join() + + +class LockTest(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.lock_path = join_path(self.tempdir, 'lockfile') + touch(self.lock_path) + + + def tearDown(self): + shutil.rmtree(self.tempdir, ignore_errors=True) + + + # + # Process snippets below can be composed into tests. + # + def acquire_write(self, barrier): + lock = Lock(self.lock_path) + lock.acquire_write() # grab exclusive lock + barrier.wait() + barrier.wait() # hold the lock until exception raises in other procs. + + def acquire_read(self, barrier): + lock = Lock(self.lock_path) + lock.acquire_read() # grab shared lock + barrier.wait() + barrier.wait() # hold the lock until exception raises in other procs. + + def timeout_write(self, barrier): + lock = Lock(self.lock_path) + barrier.wait() # wait for lock acquire in first process + self.assertRaises(LockError, lock.acquire_write, 0.1) + barrier.wait() + + def timeout_read(self, barrier): + lock = Lock(self.lock_path) + barrier.wait() # wait for lock acquire in first process + self.assertRaises(LockError, lock.acquire_read, 0.1) + barrier.wait() + + + # + # Test that exclusive locks on other processes time out when an + # exclusive lock is held. + # + def test_write_lock_timeout_on_write(self): + order_processes(self.acquire_write, self.timeout_write) + + def test_write_lock_timeout_on_write_2(self): + order_processes(self.acquire_write, self.timeout_write, self.timeout_write) + + def test_write_lock_timeout_on_write_3(self): + order_processes(self.acquire_write, self.timeout_write, self.timeout_write, self.timeout_write) + + + # + # Test that shared locks on other processes time out when an + # exclusive lock is held. + # + def test_read_lock_timeout_on_write(self): + order_processes(self.acquire_write, self.timeout_read) + + def test_read_lock_timeout_on_write_2(self): + order_processes(self.acquire_write, self.timeout_read, self.timeout_read) + + def test_read_lock_timeout_on_write_3(self): + order_processes(self.acquire_write, self.timeout_read, self.timeout_read, self.timeout_read) + + + # + # Test that exclusive locks time out when shared locks are held. + # + def test_write_lock_timeout_on_read(self): + order_processes(self.acquire_read, self.timeout_write) + + def test_write_lock_timeout_on_read_2(self): + order_processes(self.acquire_read, self.timeout_write, self.timeout_write) + + def test_write_lock_timeout_on_read_3(self): + order_processes(self.acquire_read, self.timeout_write, self.timeout_write, self.timeout_write) + + + # + # Test that exclusive locks time while lots of shared locks are held. + # + def test_write_lock_timeout_with_multiple_readers_2_1(self): + order_processes(self.acquire_read, self.acquire_read, self.timeout_write) + + def test_write_lock_timeout_with_multiple_readers_2_2(self): + order_processes(self.acquire_read, self.acquire_read, self.timeout_write, self.timeout_write) + + def test_write_lock_timeout_with_multiple_readers_3_1(self): + order_processes(self.acquire_read, self.acquire_read, self.acquire_read, self.timeout_write) + + def test_write_lock_timeout_with_multiple_readers_3_2(self): + order_processes(self.acquire_read, self.acquire_read, self.acquire_read, self.timeout_write, self.timeout_write) + + + # + # Longer test case that ensures locks are reusable. Ordering is + # enforced by barriers throughout -- steps are shown with numbers. + # + def test_complex_acquire_and_release_chain(self): + def p1(barrier): + lock = Lock(self.lock_path) + + lock.acquire_write() + barrier.wait() # ---------------------------------------- 1 + # others test timeout + barrier.wait() # ---------------------------------------- 2 + lock.release_write() # release and others acquire read + barrier.wait() # ---------------------------------------- 3 + self.assertRaises(LockError, lock.acquire_write, 0.1) + lock.acquire_read() + barrier.wait() # ---------------------------------------- 4 + lock.release_read() + barrier.wait() # ---------------------------------------- 5 + + # p2 upgrades read to write + barrier.wait() # ---------------------------------------- 6 + self.assertRaises(LockError, lock.acquire_write, 0.1) + self.assertRaises(LockError, lock.acquire_read, 0.1) + barrier.wait() # ---------------------------------------- 7 + # p2 releases write and read + barrier.wait() # ---------------------------------------- 8 + + # p3 acquires read + barrier.wait() # ---------------------------------------- 9 + # p3 upgrades read to write + barrier.wait() # ---------------------------------------- 10 + self.assertRaises(LockError, lock.acquire_write, 0.1) + self.assertRaises(LockError, lock.acquire_read, 0.1) + barrier.wait() # ---------------------------------------- 11 + # p3 releases locks + barrier.wait() # ---------------------------------------- 12 + lock.acquire_read() + barrier.wait() # ---------------------------------------- 13 + lock.release_read() + + + def p2(barrier): + lock = Lock(self.lock_path) + + # p1 acquires write + barrier.wait() # ---------------------------------------- 1 + self.assertRaises(LockError, lock.acquire_write, 0.1) + self.assertRaises(LockError, lock.acquire_read, 0.1) + barrier.wait() # ---------------------------------------- 2 + lock.acquire_read() + barrier.wait() # ---------------------------------------- 3 + # p1 tests shared read + barrier.wait() # ---------------------------------------- 4 + # others release reads + barrier.wait() # ---------------------------------------- 5 + + lock.acquire_write() # upgrade read to write + barrier.wait() # ---------------------------------------- 6 + # others test timeout + barrier.wait() # ---------------------------------------- 7 + lock.release_write() # release read AND write (need both) + lock.release_read() + barrier.wait() # ---------------------------------------- 8 + + # p3 acquires read + barrier.wait() # ---------------------------------------- 9 + # p3 upgrades read to write + barrier.wait() # ---------------------------------------- 10 + self.assertRaises(LockError, lock.acquire_write, 0.1) + self.assertRaises(LockError, lock.acquire_read, 0.1) + barrier.wait() # ---------------------------------------- 11 + # p3 releases locks + barrier.wait() # ---------------------------------------- 12 + lock.acquire_read() + barrier.wait() # ---------------------------------------- 13 + lock.release_read() + + + def p3(barrier): + lock = Lock(self.lock_path) + + # p1 acquires write + barrier.wait() # ---------------------------------------- 1 + self.assertRaises(LockError, lock.acquire_write, 0.1) + self.assertRaises(LockError, lock.acquire_read, 0.1) + barrier.wait() # ---------------------------------------- 2 + lock.acquire_read() + barrier.wait() # ---------------------------------------- 3 + # p1 tests shared read + barrier.wait() # ---------------------------------------- 4 + lock.release_read() + barrier.wait() # ---------------------------------------- 5 + + # p2 upgrades read to write + barrier.wait() # ---------------------------------------- 6 + self.assertRaises(LockError, lock.acquire_write, 0.1) + self.assertRaises(LockError, lock.acquire_read, 0.1) + barrier.wait() # ---------------------------------------- 7 + # p2 releases write & read + barrier.wait() # ---------------------------------------- 8 + + lock.acquire_read() + barrier.wait() # ---------------------------------------- 9 + lock.acquire_write() + barrier.wait() # ---------------------------------------- 10 + # others test timeout + barrier.wait() # ---------------------------------------- 11 + lock.release_read() # release read AND write in opposite + lock.release_write() # order from before on p2 + barrier.wait() # ---------------------------------------- 12 + lock.acquire_read() + barrier.wait() # ---------------------------------------- 13 + lock.release_read() + + order_processes(p1, p2, p3) -- cgit v1.2.3-70-g09d2