summaryrefslogtreecommitdiff
path: root/lib/spack/llnl/util/lock.py
diff options
context:
space:
mode:
authorTamara Dahlgren <35777542+tldahlgren@users.noreply.github.com>2020-02-19 00:04:22 -0800
committerGitHub <noreply@github.com>2020-02-19 00:04:22 -0800
commitf2aca86502eded1500489cd13799d8826e6fc9d2 (patch)
treeb6420e0db1471646d0f2cb94d138d1056478fd50 /lib/spack/llnl/util/lock.py
parent2f4881d582181b275d13ad2098a3c89b563e9f97 (diff)
downloadspack-f2aca86502eded1500489cd13799d8826e6fc9d2.tar.gz
spack-f2aca86502eded1500489cd13799d8826e6fc9d2.tar.bz2
spack-f2aca86502eded1500489cd13799d8826e6fc9d2.tar.xz
spack-f2aca86502eded1500489cd13799d8826e6fc9d2.zip
Distributed builds (#13100)
Fixes #9394 Closes #13217. ## Background Spack provides the ability to enable/disable parallel builds through two options: package `parallel` and configuration `build_jobs`. This PR changes the algorithm to allow multiple, simultaneous processes to coordinate the installation of the same spec (and specs with overlapping dependencies.). The `parallel` (boolean) property sets the default for its package though the value can be overridden in the `install` method. Spack's current parallel builds are limited to build tools supporting `jobs` arguments (e.g., `Makefiles`). The number of jobs actually used is calculated as`min(config:build_jobs, # cores, 16)`, which can be overridden in the package or on the command line (i.e., `spack install -j <# jobs>`). This PR adds support for distributed (single- and multi-node) parallel builds. The goals of this work include improving the efficiency of installing packages with many dependencies and reducing the repetition associated with concurrent installations of (dependency) packages. ## Approach ### File System Locks Coordination between concurrent installs of overlapping packages to a Spack instance is accomplished through bottom-up dependency DAG processing and file system locks. The runs can be a combination of interactive and batch processes affecting the same file system. Exclusive prefix locks are required to install a package while shared prefix locks are required to check if the package is installed. Failures are communicated through a separate exclusive prefix failure lock, for concurrent processes, combined with a persistent store, for separate, related build processes. The resulting file contains the failing spec to facilitate manual debugging. ### Priority Queue Management of dependency builds changed from reliance on recursion to use of a priority queue where the priority of a spec is based on the number of its remaining uninstalled dependencies. Using a queue required a change to dependency build exception handling with the most visible issue being that the `install` method *must* install something in the prefix. Consequently, packages can no longer get away with an install method consisting of `pass`, for example. ## Caveats - This still only parallelizes a single-rooted build. Multi-rooted installs (e.g., for environments) are TBD in a future PR. Tasks: - [x] Adjust package lock timeout to correspond to value used in the demo - [x] Adjust database lock timeout to reduce contention on startup of concurrent `spack install <spec>` calls - [x] Replace (test) package's `install: pass` methods with file creation since post-install `sanity_check_prefix` will otherwise error out with `Install failed .. Nothing was installed!` - [x] Resolve remaining existing test failures - [x] Respond to alalazo's initial feedback - [x] Remove `bin/demo-locks.py` - [x] Add new tests to address new coverage issues - [x] Replace built-in package's `def install(..): pass` to "install" something (i.e., only `apple-libunwind`) - [x] Increase code coverage
Diffstat (limited to 'lib/spack/llnl/util/lock.py')
-rw-r--r--lib/spack/llnl/util/lock.py261
1 files changed, 218 insertions, 43 deletions
diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py
index 63f970c98b..b295341d48 100644
--- a/lib/spack/llnl/util/lock.py
+++ b/lib/spack/llnl/util/lock.py
@@ -8,14 +8,32 @@ import fcntl
import errno
import time
import socket
+from datetime import datetime
import llnl.util.tty as tty
+import spack.util.string
__all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction',
'LockError', 'LockTimeoutError',
'LockPermissionError', 'LockROFileError', 'CantCreateLockError']
+#: Mapping of supported locks to description
+lock_type = {fcntl.LOCK_SH: 'read', fcntl.LOCK_EX: 'write'}
+
+#: A useful replacement for functions that should return True when not provided
+#: for example.
+true_fn = lambda: True
+
+
+def _attempts_str(wait_time, nattempts):
+ # Don't print anything if we succeeded on the first try
+ if nattempts <= 1:
+ return ''
+
+ attempts = spack.util.string.plural(nattempts, 'attempt')
+ return ' after {0:0.2f}s and {1}'.format(wait_time, attempts)
+
class Lock(object):
"""This is an implementation of a filesystem lock using Python's lockf.
@@ -31,8 +49,8 @@ class Lock(object):
maintain multiple locks on the same file.
"""
- def __init__(self, path, start=0, length=0, debug=False,
- default_timeout=None):
+ def __init__(self, path, start=0, length=0, default_timeout=None,
+ debug=False, desc=''):
"""Construct a new lock on the file at ``path``.
By default, the lock applies to the whole file. Optionally,
@@ -43,6 +61,16 @@ class Lock(object):
not currently expose the ``whence`` parameter -- ``whence`` is
always ``os.SEEK_SET`` and ``start`` is always evaluated from the
beginning of the file.
+
+ Args:
+ path (str): path to the lock
+ start (int): optional byte offset at which the lock starts
+ length (int): optional number of bytes to lock
+ default_timeout (int): number of seconds to wait for lock attempts,
+ where None means to wait indefinitely
+ debug (bool): debug mode specific to locking
+ desc (str): optional debug message lock description, which is
+ helpful for distinguishing between different Spack locks.
"""
self.path = path
self._file = None
@@ -56,6 +84,9 @@ class Lock(object):
# enable debug mode
self.debug = debug
+ # optional debug description
+ self.desc = ' ({0})'.format(desc) if desc else ''
+
# If the user doesn't set a default timeout, or if they choose
# None, 0, etc. then lock attempts will not time out (unless the
# user sets a timeout for each attempt)
@@ -89,6 +120,20 @@ class Lock(object):
num_requests += 1
yield wait_time
+ def __repr__(self):
+ """Formal representation of the lock."""
+ rep = '{0}('.format(self.__class__.__name__)
+ for attr, value in self.__dict__.items():
+ rep += '{0}={1}, '.format(attr, value.__repr__())
+ return '{0})'.format(rep.strip(', '))
+
+ def __str__(self):
+ """Readable string (with key fields) of the lock."""
+ location = '{0}[{1}:{2}]'.format(self.path, self._start, self._length)
+ timeout = 'timeout={0}'.format(self.default_timeout)
+ activity = '#reads={0}, #writes={1}'.format(self._reads, self._writes)
+ return '({0}, {1}, {2})'.format(location, timeout, activity)
+
def _lock(self, op, timeout=None):
"""This takes a lock using POSIX locks (``fcntl.lockf``).
@@ -99,8 +144,9 @@ class Lock(object):
successfully acquired, the total wait time and the number of attempts
is returned.
"""
- assert op in (fcntl.LOCK_SH, fcntl.LOCK_EX)
+ assert op in lock_type
+ self._log_acquiring('{0} LOCK'.format(lock_type[op].upper()))
timeout = timeout or self.default_timeout
# Create file and parent directories if they don't exist.
@@ -128,6 +174,9 @@ class Lock(object):
# If the file were writable, we'd have opened it 'r+'
raise LockROFileError(self.path)
+ tty.debug("{0} locking [{1}:{2}]: timeout {3} sec"
+ .format(lock_type[op], self._start, self._length, timeout))
+
poll_intervals = iter(Lock._poll_interval_generator())
start_time = time.time()
num_attempts = 0
@@ -139,17 +188,21 @@ class Lock(object):
time.sleep(next(poll_intervals))
+ # TBD: Is an extra attempt after timeout needed/appropriate?
num_attempts += 1
if self._poll_lock(op):
total_wait_time = time.time() - start_time
return total_wait_time, num_attempts
- raise LockTimeoutError("Timed out waiting for lock.")
+ raise LockTimeoutError("Timed out waiting for a {0} lock."
+ .format(lock_type[op]))
def _poll_lock(self, op):
"""Attempt to acquire the lock in a non-blocking manner. Return whether
the locking attempt succeeds
"""
+ assert op in lock_type
+
try:
# Try to get the lock (will raise if not available.)
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
@@ -159,6 +212,9 @@ class Lock(object):
if self.debug:
# All locks read the owner PID and host
self._read_debug_data()
+ tty.debug('{0} locked {1} [{2}:{3}] (owner={4})'
+ .format(lock_type[op], self.path,
+ self._start, self._length, self.pid))
# Exclusive locks write their PID/host
if op == fcntl.LOCK_EX:
@@ -167,12 +223,12 @@ class Lock(object):
return True
except IOError as e:
- if e.errno in (errno.EAGAIN, errno.EACCES):
- # EAGAIN and EACCES == locked by another process
- pass
- else:
+ # EAGAIN and EACCES == locked by another process (so try again)
+ if e.errno not in (errno.EAGAIN, errno.EACCES):
raise
+ return False
+
def _ensure_parent_directory(self):
parent = os.path.dirname(self.path)
@@ -227,6 +283,8 @@ class Lock(object):
self._length, self._start, os.SEEK_SET)
self._file.close()
self._file = None
+ self._reads = 0
+ self._writes = 0
def acquire_read(self, timeout=None):
"""Acquires a recursive, shared lock for reading.
@@ -242,15 +300,14 @@ class Lock(object):
timeout = timeout or self.default_timeout
if self._reads == 0 and self._writes == 0:
- self._debug(
- 'READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
- .format(self))
# can raise LockError.
wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout)
- self._acquired_debug('READ LOCK', wait_time, nattempts)
self._reads += 1
+ # Log if acquired, which includes counts when verbose
+ self._log_acquired('READ LOCK', wait_time, nattempts)
return True
else:
+ # Increment the read count for nested lock tracking
self._reads += 1
return False
@@ -268,13 +325,11 @@ class Lock(object):
timeout = timeout or self.default_timeout
if self._writes == 0:
- self._debug(
- 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
- .format(self))
# can raise LockError.
wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout)
- self._acquired_debug('WRITE LOCK', wait_time, nattempts)
self._writes += 1
+ # Log if acquired, which includes counts when verbose
+ self._log_acquired('WRITE LOCK', wait_time, nattempts)
# return True only if we weren't nested in a read lock.
# TODO: we may need to return two values: whether we got
@@ -282,9 +337,65 @@ class Lock(object):
# write lock for the first time. Now it returns the latter.
return self._reads == 0
else:
+ # Increment the write count for nested lock tracking
self._writes += 1
return False
+ def is_write_locked(self):
+ """Check if the file is write locked
+
+ Return:
+ (bool): ``True`` if the path is write locked, otherwise, ``False``
+ """
+ try:
+ self.acquire_read()
+
+ # If we have a read lock then no other process has a write lock.
+ self.release_read()
+ except LockTimeoutError:
+ # Another process is holding a write lock on the file
+ return True
+
+ return False
+
+ def downgrade_write_to_read(self, timeout=None):
+ """
+ Downgrade from an exclusive write lock to a shared read.
+
+ Raises:
+ LockDowngradeError: if this is an attempt at a nested transaction
+ """
+ timeout = timeout or self.default_timeout
+
+ if self._writes == 1 and self._reads == 0:
+ self._log_downgrading()
+ # can raise LockError.
+ wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout)
+ self._reads = 1
+ self._writes = 0
+ self._log_downgraded(wait_time, nattempts)
+ else:
+ raise LockDowngradeError(self.path)
+
+ def upgrade_read_to_write(self, timeout=None):
+ """
+ Attempts to upgrade from a shared read lock to an exclusive write.
+
+ Raises:
+ LockUpgradeError: if this is an attempt at a nested transaction
+ """
+ timeout = timeout or self.default_timeout
+
+ if self._reads == 1 and self._writes == 0:
+ self._log_upgrading()
+ # can raise LockError.
+ wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout)
+ self._reads = 0
+ self._writes = 1
+ self._log_upgraded(wait_time, nattempts)
+ else:
+ raise LockUpgradeError(self.path)
+
def release_read(self, release_fn=None):
"""Releases a read lock.
@@ -305,17 +416,17 @@ class Lock(object):
"""
assert self._reads > 0
+ locktype = 'READ LOCK'
if self._reads == 1 and self._writes == 0:
- self._debug(
- 'READ LOCK: {0.path}[{0._start}:{0._length}] [Released]'
- .format(self))
+ self._log_releasing(locktype)
- result = True
- if release_fn is not None:
- result = release_fn()
+ # we need to call release_fn before releasing the lock
+ release_fn = release_fn or true_fn
+ result = release_fn()
self._unlock() # can raise LockError.
- self._reads -= 1
+ self._reads = 0
+ self._log_released(locktype)
return result
else:
self._reads -= 1
@@ -339,45 +450,91 @@ class Lock(object):
"""
assert self._writes > 0
+ release_fn = release_fn or true_fn
+ locktype = 'WRITE LOCK'
if self._writes == 1 and self._reads == 0:
- self._debug(
- 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]'
- .format(self))
+ self._log_releasing(locktype)
# we need to call release_fn before releasing the lock
- result = True
- if release_fn is not None:
- result = release_fn()
+ result = release_fn()
self._unlock() # can raise LockError.
- self._writes -= 1
+ self._writes = 0
+ self._log_released(locktype)
return result
-
else:
self._writes -= 1
# when the last *write* is released, we call release_fn here
# instead of immediately before releasing the lock.
if self._writes == 0:
- return release_fn() if release_fn is not None else True
+ return release_fn()
else:
return False
def _debug(self, *args):
tty.debug(*args)
- def _acquired_debug(self, lock_type, wait_time, nattempts):
- attempts_format = 'attempt' if nattempts == 1 else 'attempt'
- if nattempts > 1:
- acquired_attempts_format = ' after {0:0.2f}s and {1:d} {2}'.format(
- wait_time, nattempts, attempts_format)
- else:
- # Dont print anything if we succeeded immediately
- acquired_attempts_format = ''
- self._debug(
- '{0}: {1.path}[{1._start}:{1._length}] [Acquired{2}]'
- .format(lock_type, self, acquired_attempts_format))
+ def _get_counts_desc(self):
+ return '(reads {0}, writes {1})'.format(self._reads, self._writes) \
+ if tty.is_verbose() else ''
+
+ def _log_acquired(self, locktype, wait_time, nattempts):
+ attempts_part = _attempts_str(wait_time, nattempts)
+ now = datetime.now()
+ desc = 'Acquired at %s' % now.strftime("%H:%M:%S.%f")
+ self._debug(self._status_msg(locktype, '{0}{1}'.
+ format(desc, attempts_part)))
+
+ def _log_acquiring(self, locktype):
+ self._debug2(self._status_msg(locktype, 'Acquiring'))
+
+ def _log_downgraded(self, wait_time, nattempts):
+ attempts_part = _attempts_str(wait_time, nattempts)
+ now = datetime.now()
+ desc = 'Downgraded at %s' % now.strftime("%H:%M:%S.%f")
+ self._debug(self._status_msg('READ LOCK', '{0}{1}'
+ .format(desc, attempts_part)))
+
+ def _log_downgrading(self):
+ self._debug2(self._status_msg('WRITE LOCK', 'Downgrading'))
+
+ def _log_released(self, locktype):
+ now = datetime.now()
+ desc = 'Released at %s' % now.strftime("%H:%M:%S.%f")
+ self._debug(self._status_msg(locktype, desc))
+
+ def _log_releasing(self, locktype):
+ self._debug2(self._status_msg(locktype, 'Releasing'))
+
+ def _log_upgraded(self, wait_time, nattempts):
+ attempts_part = _attempts_str(wait_time, nattempts)
+ now = datetime.now()
+ desc = 'Upgraded at %s' % now.strftime("%H:%M:%S.%f")
+ self._debug(self._status_msg('WRITE LOCK', '{0}{1}'.
+ format(desc, attempts_part)))
+
+ def _log_upgrading(self):
+ self._debug2(self._status_msg('READ LOCK', 'Upgrading'))
+
+ def _status_msg(self, locktype, status):
+ status_desc = '[{0}] {1}'.format(status, self._get_counts_desc())
+ return '{0}{1.desc}: {1.path}[{1._start}:{1._length}] {2}'.format(
+ locktype, self, status_desc)
+
+ def _debug2(self, *args):
+ # TODO: Easy place to make a single, temporary change to the
+ # TODO: debug level associated with the more detailed messages.
+ # TODO:
+ # TODO: Someday it would be great if we could switch this to
+ # TODO: another level, perhaps _between_ debug and verbose, or
+ # TODO: some other form of filtering so the first level of
+ # TODO: debugging doesn't have to generate these messages. Using
+ # TODO: verbose here did not work as expected because tests like
+ # TODO: test_spec_json will write the verbose messages to the
+ # TODO: output that is used to check test correctness.
+ tty.debug(*args)
class LockTransaction(object):
@@ -462,10 +619,28 @@ class LockError(Exception):
"""Raised for any errors related to locks."""
+class LockDowngradeError(LockError):
+ """Raised when unable to downgrade from a write to a read lock."""
+ def __init__(self, path):
+ msg = "Cannot downgrade lock from write to read on file: %s" % path
+ super(LockDowngradeError, self).__init__(msg)
+
+
+class LockLimitError(LockError):
+ """Raised when exceed maximum attempts to acquire a lock."""
+
+
class LockTimeoutError(LockError):
"""Raised when an attempt to acquire a lock times out."""
+class LockUpgradeError(LockError):
+ """Raised when unable to upgrade from a read to a write lock."""
+ def __init__(self, path):
+ msg = "Cannot upgrade lock from read to write on file: %s" % path
+ super(LockUpgradeError, self).__init__(msg)
+
+
class LockPermissionError(LockError):
"""Raised when there are permission issues with a lock."""