diff options
author | Tamara Dahlgren <35777542+tldahlgren@users.noreply.github.com> | 2020-02-19 00:04:22 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-19 00:04:22 -0800 |
commit | f2aca86502eded1500489cd13799d8826e6fc9d2 (patch) | |
tree | b6420e0db1471646d0f2cb94d138d1056478fd50 /lib/spack/llnl/util/lock.py | |
parent | 2f4881d582181b275d13ad2098a3c89b563e9f97 (diff) | |
download | spack-f2aca86502eded1500489cd13799d8826e6fc9d2.tar.gz spack-f2aca86502eded1500489cd13799d8826e6fc9d2.tar.bz2 spack-f2aca86502eded1500489cd13799d8826e6fc9d2.tar.xz spack-f2aca86502eded1500489cd13799d8826e6fc9d2.zip |
Distributed builds (#13100)
Fixes #9394
Closes #13217.
## Background
Spack provides the ability to enable/disable parallel builds through two options: package `parallel` and configuration `build_jobs`. This PR changes the algorithm to allow multiple, simultaneous processes to coordinate the installation of the same spec (and specs with overlapping dependencies.).
The `parallel` (boolean) property sets the default for its package though the value can be overridden in the `install` method.
Spack's current parallel builds are limited to build tools supporting `jobs` arguments (e.g., `Makefiles`). The number of jobs actually used is calculated as`min(config:build_jobs, # cores, 16)`, which can be overridden in the package or on the command line (i.e., `spack install -j <# jobs>`).
This PR adds support for distributed (single- and multi-node) parallel builds. The goals of this work include improving the efficiency of installing packages with many dependencies and reducing the repetition associated with concurrent installations of (dependency) packages.
## Approach
### File System Locks
Coordination between concurrent installs of overlapping packages to a Spack instance is accomplished through bottom-up dependency DAG processing and file system locks. The runs can be a combination of interactive and batch processes affecting the same file system. Exclusive prefix locks are required to install a package while shared prefix locks are required to check if the package is installed.
Failures are communicated through a separate exclusive prefix failure lock, for concurrent processes, combined with a persistent store, for separate, related build processes. The resulting file contains the failing spec to facilitate manual debugging.
### Priority Queue
Management of dependency builds changed from reliance on recursion to use of a priority queue where the priority of a spec is based on the number of its remaining uninstalled dependencies.
Using a queue required a change to dependency build exception handling with the most visible issue being that the `install` method *must* install something in the prefix. Consequently, packages can no longer get away with an install method consisting of `pass`, for example.
## Caveats
- This still only parallelizes a single-rooted build. Multi-rooted installs (e.g., for environments) are TBD in a future PR.
Tasks:
- [x] Adjust package lock timeout to correspond to value used in the demo
- [x] Adjust database lock timeout to reduce contention on startup of concurrent
`spack install <spec>` calls
- [x] Replace (test) package's `install: pass` methods with file creation since post-install
`sanity_check_prefix` will otherwise error out with `Install failed .. Nothing was installed!`
- [x] Resolve remaining existing test failures
- [x] Respond to alalazo's initial feedback
- [x] Remove `bin/demo-locks.py`
- [x] Add new tests to address new coverage issues
- [x] Replace built-in package's `def install(..): pass` to "install" something
(i.e., only `apple-libunwind`)
- [x] Increase code coverage
Diffstat (limited to 'lib/spack/llnl/util/lock.py')
-rw-r--r-- | lib/spack/llnl/util/lock.py | 261 |
1 files changed, 218 insertions, 43 deletions
diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py index 63f970c98b..b295341d48 100644 --- a/lib/spack/llnl/util/lock.py +++ b/lib/spack/llnl/util/lock.py @@ -8,14 +8,32 @@ import fcntl import errno import time import socket +from datetime import datetime import llnl.util.tty as tty +import spack.util.string __all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction', 'LockError', 'LockTimeoutError', 'LockPermissionError', 'LockROFileError', 'CantCreateLockError'] +#: Mapping of supported locks to description +lock_type = {fcntl.LOCK_SH: 'read', fcntl.LOCK_EX: 'write'} + +#: A useful replacement for functions that should return True when not provided +#: for example. +true_fn = lambda: True + + +def _attempts_str(wait_time, nattempts): + # Don't print anything if we succeeded on the first try + if nattempts <= 1: + return '' + + attempts = spack.util.string.plural(nattempts, 'attempt') + return ' after {0:0.2f}s and {1}'.format(wait_time, attempts) + class Lock(object): """This is an implementation of a filesystem lock using Python's lockf. @@ -31,8 +49,8 @@ class Lock(object): maintain multiple locks on the same file. """ - def __init__(self, path, start=0, length=0, debug=False, - default_timeout=None): + def __init__(self, path, start=0, length=0, default_timeout=None, + debug=False, desc=''): """Construct a new lock on the file at ``path``. By default, the lock applies to the whole file. Optionally, @@ -43,6 +61,16 @@ class Lock(object): not currently expose the ``whence`` parameter -- ``whence`` is always ``os.SEEK_SET`` and ``start`` is always evaluated from the beginning of the file. + + Args: + path (str): path to the lock + start (int): optional byte offset at which the lock starts + length (int): optional number of bytes to lock + default_timeout (int): number of seconds to wait for lock attempts, + where None means to wait indefinitely + debug (bool): debug mode specific to locking + desc (str): optional debug message lock description, which is + helpful for distinguishing between different Spack locks. """ self.path = path self._file = None @@ -56,6 +84,9 @@ class Lock(object): # enable debug mode self.debug = debug + # optional debug description + self.desc = ' ({0})'.format(desc) if desc else '' + # If the user doesn't set a default timeout, or if they choose # None, 0, etc. then lock attempts will not time out (unless the # user sets a timeout for each attempt) @@ -89,6 +120,20 @@ class Lock(object): num_requests += 1 yield wait_time + def __repr__(self): + """Formal representation of the lock.""" + rep = '{0}('.format(self.__class__.__name__) + for attr, value in self.__dict__.items(): + rep += '{0}={1}, '.format(attr, value.__repr__()) + return '{0})'.format(rep.strip(', ')) + + def __str__(self): + """Readable string (with key fields) of the lock.""" + location = '{0}[{1}:{2}]'.format(self.path, self._start, self._length) + timeout = 'timeout={0}'.format(self.default_timeout) + activity = '#reads={0}, #writes={1}'.format(self._reads, self._writes) + return '({0}, {1}, {2})'.format(location, timeout, activity) + def _lock(self, op, timeout=None): """This takes a lock using POSIX locks (``fcntl.lockf``). @@ -99,8 +144,9 @@ class Lock(object): successfully acquired, the total wait time and the number of attempts is returned. """ - assert op in (fcntl.LOCK_SH, fcntl.LOCK_EX) + assert op in lock_type + self._log_acquiring('{0} LOCK'.format(lock_type[op].upper())) timeout = timeout or self.default_timeout # Create file and parent directories if they don't exist. @@ -128,6 +174,9 @@ class Lock(object): # If the file were writable, we'd have opened it 'r+' raise LockROFileError(self.path) + tty.debug("{0} locking [{1}:{2}]: timeout {3} sec" + .format(lock_type[op], self._start, self._length, timeout)) + poll_intervals = iter(Lock._poll_interval_generator()) start_time = time.time() num_attempts = 0 @@ -139,17 +188,21 @@ class Lock(object): time.sleep(next(poll_intervals)) + # TBD: Is an extra attempt after timeout needed/appropriate? num_attempts += 1 if self._poll_lock(op): total_wait_time = time.time() - start_time return total_wait_time, num_attempts - raise LockTimeoutError("Timed out waiting for lock.") + raise LockTimeoutError("Timed out waiting for a {0} lock." + .format(lock_type[op])) def _poll_lock(self, op): """Attempt to acquire the lock in a non-blocking manner. Return whether the locking attempt succeeds """ + assert op in lock_type + try: # Try to get the lock (will raise if not available.) fcntl.lockf(self._file, op | fcntl.LOCK_NB, @@ -159,6 +212,9 @@ class Lock(object): if self.debug: # All locks read the owner PID and host self._read_debug_data() + tty.debug('{0} locked {1} [{2}:{3}] (owner={4})' + .format(lock_type[op], self.path, + self._start, self._length, self.pid)) # Exclusive locks write their PID/host if op == fcntl.LOCK_EX: @@ -167,12 +223,12 @@ class Lock(object): return True except IOError as e: - if e.errno in (errno.EAGAIN, errno.EACCES): - # EAGAIN and EACCES == locked by another process - pass - else: + # EAGAIN and EACCES == locked by another process (so try again) + if e.errno not in (errno.EAGAIN, errno.EACCES): raise + return False + def _ensure_parent_directory(self): parent = os.path.dirname(self.path) @@ -227,6 +283,8 @@ class Lock(object): self._length, self._start, os.SEEK_SET) self._file.close() self._file = None + self._reads = 0 + self._writes = 0 def acquire_read(self, timeout=None): """Acquires a recursive, shared lock for reading. @@ -242,15 +300,14 @@ class Lock(object): timeout = timeout or self.default_timeout if self._reads == 0 and self._writes == 0: - self._debug( - 'READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' - .format(self)) # can raise LockError. wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout) - self._acquired_debug('READ LOCK', wait_time, nattempts) self._reads += 1 + # Log if acquired, which includes counts when verbose + self._log_acquired('READ LOCK', wait_time, nattempts) return True else: + # Increment the read count for nested lock tracking self._reads += 1 return False @@ -268,13 +325,11 @@ class Lock(object): timeout = timeout or self.default_timeout if self._writes == 0: - self._debug( - 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' - .format(self)) # can raise LockError. wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout) - self._acquired_debug('WRITE LOCK', wait_time, nattempts) self._writes += 1 + # Log if acquired, which includes counts when verbose + self._log_acquired('WRITE LOCK', wait_time, nattempts) # return True only if we weren't nested in a read lock. # TODO: we may need to return two values: whether we got @@ -282,9 +337,65 @@ class Lock(object): # write lock for the first time. Now it returns the latter. return self._reads == 0 else: + # Increment the write count for nested lock tracking self._writes += 1 return False + def is_write_locked(self): + """Check if the file is write locked + + Return: + (bool): ``True`` if the path is write locked, otherwise, ``False`` + """ + try: + self.acquire_read() + + # If we have a read lock then no other process has a write lock. + self.release_read() + except LockTimeoutError: + # Another process is holding a write lock on the file + return True + + return False + + def downgrade_write_to_read(self, timeout=None): + """ + Downgrade from an exclusive write lock to a shared read. + + Raises: + LockDowngradeError: if this is an attempt at a nested transaction + """ + timeout = timeout or self.default_timeout + + if self._writes == 1 and self._reads == 0: + self._log_downgrading() + # can raise LockError. + wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout) + self._reads = 1 + self._writes = 0 + self._log_downgraded(wait_time, nattempts) + else: + raise LockDowngradeError(self.path) + + def upgrade_read_to_write(self, timeout=None): + """ + Attempts to upgrade from a shared read lock to an exclusive write. + + Raises: + LockUpgradeError: if this is an attempt at a nested transaction + """ + timeout = timeout or self.default_timeout + + if self._reads == 1 and self._writes == 0: + self._log_upgrading() + # can raise LockError. + wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout) + self._reads = 0 + self._writes = 1 + self._log_upgraded(wait_time, nattempts) + else: + raise LockUpgradeError(self.path) + def release_read(self, release_fn=None): """Releases a read lock. @@ -305,17 +416,17 @@ class Lock(object): """ assert self._reads > 0 + locktype = 'READ LOCK' if self._reads == 1 and self._writes == 0: - self._debug( - 'READ LOCK: {0.path}[{0._start}:{0._length}] [Released]' - .format(self)) + self._log_releasing(locktype) - result = True - if release_fn is not None: - result = release_fn() + # we need to call release_fn before releasing the lock + release_fn = release_fn or true_fn + result = release_fn() self._unlock() # can raise LockError. - self._reads -= 1 + self._reads = 0 + self._log_released(locktype) return result else: self._reads -= 1 @@ -339,45 +450,91 @@ class Lock(object): """ assert self._writes > 0 + release_fn = release_fn or true_fn + locktype = 'WRITE LOCK' if self._writes == 1 and self._reads == 0: - self._debug( - 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]' - .format(self)) + self._log_releasing(locktype) # we need to call release_fn before releasing the lock - result = True - if release_fn is not None: - result = release_fn() + result = release_fn() self._unlock() # can raise LockError. - self._writes -= 1 + self._writes = 0 + self._log_released(locktype) return result - else: self._writes -= 1 # when the last *write* is released, we call release_fn here # instead of immediately before releasing the lock. if self._writes == 0: - return release_fn() if release_fn is not None else True + return release_fn() else: return False def _debug(self, *args): tty.debug(*args) - def _acquired_debug(self, lock_type, wait_time, nattempts): - attempts_format = 'attempt' if nattempts == 1 else 'attempt' - if nattempts > 1: - acquired_attempts_format = ' after {0:0.2f}s and {1:d} {2}'.format( - wait_time, nattempts, attempts_format) - else: - # Dont print anything if we succeeded immediately - acquired_attempts_format = '' - self._debug( - '{0}: {1.path}[{1._start}:{1._length}] [Acquired{2}]' - .format(lock_type, self, acquired_attempts_format)) + def _get_counts_desc(self): + return '(reads {0}, writes {1})'.format(self._reads, self._writes) \ + if tty.is_verbose() else '' + + def _log_acquired(self, locktype, wait_time, nattempts): + attempts_part = _attempts_str(wait_time, nattempts) + now = datetime.now() + desc = 'Acquired at %s' % now.strftime("%H:%M:%S.%f") + self._debug(self._status_msg(locktype, '{0}{1}'. + format(desc, attempts_part))) + + def _log_acquiring(self, locktype): + self._debug2(self._status_msg(locktype, 'Acquiring')) + + def _log_downgraded(self, wait_time, nattempts): + attempts_part = _attempts_str(wait_time, nattempts) + now = datetime.now() + desc = 'Downgraded at %s' % now.strftime("%H:%M:%S.%f") + self._debug(self._status_msg('READ LOCK', '{0}{1}' + .format(desc, attempts_part))) + + def _log_downgrading(self): + self._debug2(self._status_msg('WRITE LOCK', 'Downgrading')) + + def _log_released(self, locktype): + now = datetime.now() + desc = 'Released at %s' % now.strftime("%H:%M:%S.%f") + self._debug(self._status_msg(locktype, desc)) + + def _log_releasing(self, locktype): + self._debug2(self._status_msg(locktype, 'Releasing')) + + def _log_upgraded(self, wait_time, nattempts): + attempts_part = _attempts_str(wait_time, nattempts) + now = datetime.now() + desc = 'Upgraded at %s' % now.strftime("%H:%M:%S.%f") + self._debug(self._status_msg('WRITE LOCK', '{0}{1}'. + format(desc, attempts_part))) + + def _log_upgrading(self): + self._debug2(self._status_msg('READ LOCK', 'Upgrading')) + + def _status_msg(self, locktype, status): + status_desc = '[{0}] {1}'.format(status, self._get_counts_desc()) + return '{0}{1.desc}: {1.path}[{1._start}:{1._length}] {2}'.format( + locktype, self, status_desc) + + def _debug2(self, *args): + # TODO: Easy place to make a single, temporary change to the + # TODO: debug level associated with the more detailed messages. + # TODO: + # TODO: Someday it would be great if we could switch this to + # TODO: another level, perhaps _between_ debug and verbose, or + # TODO: some other form of filtering so the first level of + # TODO: debugging doesn't have to generate these messages. Using + # TODO: verbose here did not work as expected because tests like + # TODO: test_spec_json will write the verbose messages to the + # TODO: output that is used to check test correctness. + tty.debug(*args) class LockTransaction(object): @@ -462,10 +619,28 @@ class LockError(Exception): """Raised for any errors related to locks.""" +class LockDowngradeError(LockError): + """Raised when unable to downgrade from a write to a read lock.""" + def __init__(self, path): + msg = "Cannot downgrade lock from write to read on file: %s" % path + super(LockDowngradeError, self).__init__(msg) + + +class LockLimitError(LockError): + """Raised when exceed maximum attempts to acquire a lock.""" + + class LockTimeoutError(LockError): """Raised when an attempt to acquire a lock times out.""" +class LockUpgradeError(LockError): + """Raised when unable to upgrade from a read to a write lock.""" + def __init__(self, path): + msg = "Cannot upgrade lock from read to write on file: %s" % path + super(LockUpgradeError, self).__init__(msg) + + class LockPermissionError(LockError): """Raised when there are permission issues with a lock.""" |