diff options
-rw-r--r-- | etc/spack/defaults/config.yaml | 13 | ||||
-rw-r--r-- | lib/spack/llnl/util/lock.py | 204 | ||||
-rw-r--r-- | lib/spack/spack/database.py | 59 | ||||
-rw-r--r-- | lib/spack/spack/schema/config.py | 7 | ||||
-rw-r--r-- | lib/spack/spack/test/llnl/util/lock.py | 7 | ||||
-rw-r--r-- | lib/spack/spack/util/file_cache.py | 12 | ||||
-rw-r--r-- | lib/spack/spack/util/lock.py | 4 |
7 files changed, 217 insertions, 89 deletions
diff --git a/etc/spack/defaults/config.yaml b/etc/spack/defaults/config.yaml index 28628a3d49..4092140e29 100644 --- a/etc/spack/defaults/config.yaml +++ b/etc/spack/defaults/config.yaml @@ -94,3 +94,16 @@ config: # If set to true, spack will use ccache to cache c compiles. ccache: false + + # How long to wait to lock the Spack installation database. This lock is used + # when spack needs to manage its own package metadata and all operations are + # expected to complete within the default time limit. The timeout should + # therefore generally be left untouched. + db_lock_timeout: 120 + + # How long to wait when attempting to modify a package (e.g. to install it). + # This value should typically be 'null' (never time out) unless the Spack + # instance only ever has a single user at a time, and only if the user + # anticipates that a significant delay indicates that the lock attempt will + # never succeed. + package_lock_timeout: null diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py index b685091e9b..9a2044992b 100644 --- a/lib/spack/llnl/util/lock.py +++ b/lib/spack/llnl/util/lock.py @@ -36,13 +36,6 @@ __all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction', 'LockPermissionError', 'LockROFileError', 'CantCreateLockError'] -# Default timeout in seconds, after which locks will raise exceptions. -_default_timeout = 60 - -# Sleep time per iteration in spin loop (in seconds) -_sleep_time = 1e-5 - - class Lock(object): """This is an implementation of a filesystem lock using Python's lockf. @@ -50,9 +43,15 @@ class Lock(object): any filesystem implementation that supports locking through the fcntl calls. This includes distributed filesystems like Lustre (when flock is enabled) and recent NFS versions. + + Note that this is for managing contention over resources *between* + processes and not for managing contention between threads in a process: the + functions of this object are not thread-safe. A process also must not + maintain multiple locks on the same file. """ - def __init__(self, path, start=0, length=0, debug=False): + def __init__(self, path, start=0, length=0, debug=False, + default_timeout=None): """Construct a new lock on the file at ``path``. By default, the lock applies to the whole file. Optionally, @@ -76,11 +75,40 @@ class Lock(object): # enable debug mode self.debug = debug + # If the user doesn't set a default timeout, or if they choose + # None, 0, etc. then lock attempts will not time out (unless the + # user sets a timeout for each attempt) + self.default_timeout = default_timeout or None + # PID and host of lock holder (only used in debug mode) self.pid = self.old_pid = None self.host = self.old_host = None - def _lock(self, op, timeout=_default_timeout): + @staticmethod + def _poll_interval_generator(_wait_times=None): + """This implements a backoff scheme for polling a contended resource + by suggesting a succession of wait times between polls. + + It suggests a poll interval of .1s until 2 seconds have passed, + then a poll interval of .2s until 10 seconds have passed, and finally + (for all requests after 10s) suggests a poll interval of .5s. + + This doesn't actually track elapsed time, it estimates the waiting + time as though the caller always waits for the full length of time + suggested by this function. + """ + num_requests = 0 + stage1, stage2, stage3 = _wait_times or (1e-1, 2e-1, 5e-1) + wait_time = stage1 + while True: + if num_requests >= 60: # 40 * .2 = 8 + wait_time = stage3 + elif num_requests >= 20: # 20 * .1 = 2 + wait_time = stage2 + num_requests += 1 + yield wait_time + + def _lock(self, op, timeout=None): """This takes a lock using POSIX locks (``fcntl.lockf``). The lock is implemented as a spin lock using a nonblocking call @@ -90,64 +118,84 @@ class Lock(object): pid and host to the lock file, in case the holding process needs to be killed later. - If the lock times out, it raises a ``LockError``. + If the lock times out, it raises a ``LockError``. If the lock is + successfully acquired, the total wait time and the number of attempts + is returned. """ assert op in (fcntl.LOCK_SH, fcntl.LOCK_EX) + timeout = timeout or self.default_timeout + + # Create file and parent directories if they don't exist. + if self._file is None: + parent = self._ensure_parent_directory() + + # Open writable files as 'r+' so we can upgrade to write later + os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+' + if os.path.exists(self.path): + if not os.access(self.path, os.W_OK): + if op == fcntl.LOCK_SH: + # can still lock read-only files if we open 'r' + os_mode, fd_mode = os.O_RDONLY, 'r' + else: + raise LockROFileError(self.path) + + elif not os.access(parent, os.W_OK): + raise CantCreateLockError(self.path) + + fd = os.open(self.path, os_mode) + self._file = os.fdopen(fd, fd_mode) + + elif op == fcntl.LOCK_EX and self._file.mode == 'r': + # Attempt to upgrade to write lock w/a read-only file. + # If the file were writable, we'd have opened it 'r+' + raise LockROFileError(self.path) + + poll_intervals = iter(Lock._poll_interval_generator()) start_time = time.time() - while (time.time() - start_time) < timeout: - # Create file and parent directories if they don't exist. - if self._file is None: - parent = self._ensure_parent_directory() - - # Open writable files as 'r+' so we can upgrade to write later - os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+' - if os.path.exists(self.path): - if not os.access(self.path, os.W_OK): - if op == fcntl.LOCK_SH: - # can still lock read-only files if we open 'r' - os_mode, fd_mode = os.O_RDONLY, 'r' - else: - raise LockROFileError(self.path) - - elif not os.access(parent, os.W_OK): - raise CantCreateLockError(self.path) - - fd = os.open(self.path, os_mode) - self._file = os.fdopen(fd, fd_mode) - - elif op == fcntl.LOCK_EX and self._file.mode == 'r': - # Attempt to upgrade to write lock w/a read-only file. - # If the file were writable, we'd have opened it 'r+' - raise LockROFileError(self.path) - - try: - # Try to get the lock (will raise if not available.) - fcntl.lockf(self._file, op | fcntl.LOCK_NB, - self._length, self._start, os.SEEK_SET) - - # help for debugging distributed locking - if self.debug: - # All locks read the owner PID and host - self._read_debug_data() - - # Exclusive locks write their PID/host - if op == fcntl.LOCK_EX: - self._write_debug_data() - - return - - except IOError as e: - if e.errno in (errno.EAGAIN, errno.EACCES): - # EAGAIN and EACCES == locked by another process - pass - else: - raise - - time.sleep(_sleep_time) + num_attempts = 0 + while (not timeout) or (time.time() - start_time) < timeout: + num_attempts += 1 + if self._poll_lock(op): + total_wait_time = time.time() - start_time + return total_wait_time, num_attempts + + time.sleep(next(poll_intervals)) + + num_attempts += 1 + if self._poll_lock(op): + total_wait_time = time.time() - start_time + return total_wait_time, num_attempts raise LockTimeoutError("Timed out waiting for lock.") + def _poll_lock(self, op): + """Attempt to acquire the lock in a non-blocking manner. Return whether + the locking attempt succeeds + """ + try: + # Try to get the lock (will raise if not available.) + fcntl.lockf(self._file, op | fcntl.LOCK_NB, + self._length, self._start, os.SEEK_SET) + + # help for debugging distributed locking + if self.debug: + # All locks read the owner PID and host + self._read_debug_data() + + # Exclusive locks write their PID/host + if op == fcntl.LOCK_EX: + self._write_debug_data() + + return True + + except IOError as e: + if e.errno in (errno.EAGAIN, errno.EACCES): + # EAGAIN and EACCES == locked by another process + pass + else: + raise + def _ensure_parent_directory(self): parent = os.path.dirname(self.path) @@ -203,7 +251,7 @@ class Lock(object): self._file.close() self._file = None - def acquire_read(self, timeout=_default_timeout): + def acquire_read(self, timeout=None): """Acquires a recursive, shared lock for reading. Read and write locks can be acquired and released in arbitrary @@ -214,21 +262,22 @@ class Lock(object): the POSIX lock, False if it is a nested transaction. """ + timeout = timeout or self.default_timeout + if self._reads == 0 and self._writes == 0: self._debug( 'READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' .format(self)) - self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError. - self._debug( - 'READ LOCK: {0.path}[{0._start}:{0._length}] [Acquired]' - .format(self)) + # can raise LockError. + wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout) + self._acquired_debug('READ LOCK', wait_time, nattempts) self._reads += 1 return True else: self._reads += 1 return False - def acquire_write(self, timeout=_default_timeout): + def acquire_write(self, timeout=None): """Acquires a recursive, exclusive lock for writing. Read and write locks can be acquired and released in arbitrary @@ -239,14 +288,15 @@ class Lock(object): the POSIX lock, False if it is a nested transaction. """ + timeout = timeout or self.default_timeout + if self._writes == 0: self._debug( 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' .format(self)) - self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError. - self._debug( - 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquired]' - .format(self)) + # can raise LockError. + wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout) + self._acquired_debug('WRITE LOCK', wait_time, nattempts) self._writes += 1 return True else: @@ -302,6 +352,18 @@ class Lock(object): def _debug(self, *args): tty.debug(*args) + def _acquired_debug(self, lock_type, wait_time, nattempts): + attempts_format = 'attempt' if nattempts == 1 else 'attempt' + if nattempts > 1: + acquired_attempts_format = ' after {0:0.2f}s and {1:d} {2}'.format( + wait_time, nattempts, attempts_format) + else: + # Dont print anything if we succeeded immediately + acquired_attempts_format = '' + self._debug( + '{0}: {1.path}[{1._start}:{1._length}] [Acquired{2}]' + .format(lock_type, self, acquired_attempts_format)) + class LockTransaction(object): """Simple nested transaction context manager that uses a file lock. @@ -323,7 +385,7 @@ class LockTransaction(object): """ def __init__(self, lock, acquire_fn=None, release_fn=None, - timeout=_default_timeout): + timeout=None): self._lock = lock self._timeout = timeout self._acquire_fn = acquire_fn diff --git a/lib/spack/spack/database.py b/lib/spack/spack/database.py index 62bc61c647..c7a5e21881 100644 --- a/lib/spack/spack/database.py +++ b/lib/spack/spack/database.py @@ -63,7 +63,7 @@ from spack.util.crypto import bit_length from spack.directory_layout import DirectoryLayoutError from spack.error import SpackError from spack.version import Version -from spack.util.lock import Lock, WriteTransaction, ReadTransaction +from spack.util.lock import Lock, WriteTransaction, ReadTransaction, LockError # DB goes in this directory underneath the root @@ -73,7 +73,7 @@ _db_dirname = '.spack-db' _db_version = Version('0.9.3') # Timeout for spack database locks in seconds -_db_lock_timeout = 60 +_db_lock_timeout = 120 # Types of dependencies tracked by the database _tracked_deps = ('link', 'run') @@ -203,19 +203,30 @@ class Database(object): mkdirp(self._db_dir) # initialize rest of state. - self.lock = Lock(self._lock_path) + self.db_lock_timeout = ( + spack.config.get('config:db_lock_timeout') or _db_lock_timeout) + self.package_lock_timeout = ( + spack.config.get('config:package_lock_timeout') or None) + tty.debug('DATABASE LOCK TIMEOUT: {0}s'.format( + str(self.db_lock_timeout))) + timeout_format_str = ('{0}s'.format(str(self.package_lock_timeout)) + if self.package_lock_timeout else 'No timeout') + tty.debug('PACKAGE LOCK TIMEOUT: {0}'.format( + str(timeout_format_str))) + self.lock = Lock(self._lock_path, + default_timeout=self.db_lock_timeout) self._data = {} # whether there was an error at the start of a read transaction self._error = None - def write_transaction(self, timeout=_db_lock_timeout): + def write_transaction(self): """Get a write lock context manager for use in a `with` block.""" - return WriteTransaction(self.lock, self._read, self._write, timeout) + return WriteTransaction(self.lock, self._read, self._write) - def read_transaction(self, timeout=_db_lock_timeout): + def read_transaction(self): """Get a read lock context manager for use in a `with` block.""" - return ReadTransaction(self.lock, self._read, timeout=timeout) + return ReadTransaction(self.lock, self._read) def prefix_lock(self, spec): """Get a lock on a particular spec's installation directory. @@ -236,26 +247,44 @@ class Database(object): if prefix not in self._prefix_locks: self._prefix_locks[prefix] = Lock( self.prefix_lock_path, - spec.dag_hash_bit_prefix(bit_length(sys.maxsize)), 1) + start=spec.dag_hash_bit_prefix(bit_length(sys.maxsize)), + length=1, + default_timeout=self.package_lock_timeout) return self._prefix_locks[prefix] @contextlib.contextmanager def prefix_read_lock(self, spec): prefix_lock = self.prefix_lock(spec) + prefix_lock.acquire_read() + try: - prefix_lock.acquire_read(60) yield self - finally: + except LockError: + # This addresses the case where a nested lock attempt fails inside + # of this context manager + raise + except (Exception, KeyboardInterrupt): + prefix_lock.release_read() + raise + else: prefix_lock.release_read() @contextlib.contextmanager def prefix_write_lock(self, spec): prefix_lock = self.prefix_lock(spec) + prefix_lock.acquire_write() + try: - prefix_lock.acquire_write(60) yield self - finally: + except LockError: + # This addresses the case where a nested lock attempt fails inside + # of this context manager + raise + except (Exception, KeyboardInterrupt): + prefix_lock.release_write() + raise + else: prefix_lock.release_write() def _write_to_file(self, stream): @@ -435,7 +464,7 @@ class Database(object): self._data = {} transaction = WriteTransaction( - self.lock, _read_suppress_error, self._write, _db_lock_timeout + self.lock, _read_suppress_error, self._write ) with transaction: @@ -599,7 +628,7 @@ class Database(object): if os.access(self._db_dir, os.R_OK | os.W_OK): # if we can write, then read AND write a JSON file. self._read_from_file(self._old_yaml_index_path, format='yaml') - with WriteTransaction(self.lock, timeout=_db_lock_timeout): + with WriteTransaction(self.lock): self._write(None, None, None) else: # Read chck for a YAML file if we can't find JSON. @@ -608,7 +637,7 @@ class Database(object): else: # The file doesn't exist, try to traverse the directory. # reindex() takes its own write lock, so no lock here. - with WriteTransaction(self.lock, timeout=_db_lock_timeout): + with WriteTransaction(self.lock): self._write(None, None, None) self.reindex(spack.store.layout) diff --git a/lib/spack/spack/schema/config.py b/lib/spack/spack/schema/config.py index 9773213b7d..ca77a8105b 100644 --- a/lib/spack/spack/schema/config.py +++ b/lib/spack/spack/schema/config.py @@ -70,6 +70,13 @@ schema = { 'dirty': {'type': 'boolean'}, 'build_jobs': {'type': 'integer', 'minimum': 1}, 'ccache': {'type': 'boolean'}, + 'db_lock_timeout': {'type': 'integer', 'minimum': 1}, + 'package_lock_timeout': { + 'anyOf': [ + {'type': 'integer', 'minimum': 1}, + {'type': 'null'} + ], + }, } }, }, diff --git a/lib/spack/spack/test/llnl/util/lock.py b/lib/spack/spack/test/llnl/util/lock.py index 89434d82f1..c1e6de018e 100644 --- a/lib/spack/spack/test/llnl/util/lock.py +++ b/lib/spack/spack/test/llnl/util/lock.py @@ -220,6 +220,13 @@ def lock_path(lock_dir): os.unlink(lock_file) +def test_poll_interval_generator(): + interval_iter = iter( + lk.Lock._poll_interval_generator(_wait_times=[1, 2, 3])) + intervals = list(next(interval_iter) for i in range(100)) + assert intervals == [1] * 20 + [2] * 40 + [3] * 40 + + def local_multiproc_test(*functions, **kwargs): """Order some processes using simple barrier synchronization.""" b = mp.Barrier(len(functions), timeout=barrier_timeout) diff --git a/lib/spack/spack/util/file_cache.py b/lib/spack/spack/util/file_cache.py index 6dce9b9bdc..dd590c646e 100644 --- a/lib/spack/spack/util/file_cache.py +++ b/lib/spack/spack/util/file_cache.py @@ -42,17 +42,24 @@ class FileCache(object): """ - def __init__(self, root): + def __init__(self, root, timeout=120): """Create a file cache object. This will create the cache directory if it does not exist yet. + Args: + root: specifies the root directory where the cache stores files + + timeout: when there is contention among multiple Spack processes + for cache files, this specifies how long Spack should wait + before assuming that there is a deadlock. """ self.root = root.rstrip(os.path.sep) if not os.path.exists(self.root): mkdirp(self.root) self._locks = {} + self.lock_timeout = timeout def destroy(self): """Remove all files under the cache root.""" @@ -77,7 +84,8 @@ class FileCache(object): def _get_lock(self, key): """Create a lock for a key, if necessary, and return a lock object.""" if key not in self._locks: - self._locks[key] = Lock(self._lock_path(key)) + self._locks[key] = Lock(self._lock_path(key), + default_timeout=self.lock_timeout) return self._locks[key] def init_entry(self, key): diff --git a/lib/spack/spack/util/lock.py b/lib/spack/spack/util/lock.py index a36cf5876d..b01a0951ee 100644 --- a/lib/spack/spack/util/lock.py +++ b/lib/spack/spack/util/lock.py @@ -47,7 +47,9 @@ class Lock(llnl.util.lock.Lock): def _lock(self, op, timeout=0): if self._enable: - super(Lock, self)._lock(op, timeout) + return super(Lock, self)._lock(op, timeout) + else: + return 0, 0 def _unlock(self): """Unlock call that always succeeds.""" |