diff options
Diffstat (limited to 'lib/spack/llnl/util/lock.py')
-rw-r--r-- | lib/spack/llnl/util/lock.py | 210 |
1 files changed, 187 insertions, 23 deletions
diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py index 479a1b0167..2e44a94798 100644 --- a/lib/spack/llnl/util/lock.py +++ b/lib/spack/llnl/util/lock.py @@ -28,6 +28,13 @@ import errno import time import socket +import llnl.util.tty as tty + + +__all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction', + 'LockError'] + + # Default timeout in seconds, after which locks will raise exceptions. _default_timeout = 60 @@ -36,34 +43,88 @@ _sleep_time = 1e-5 class Lock(object): - def __init__(self,file_path): - self._file_path = file_path - self._fd = None + """This is an implementation of a filesystem lock using Python's lockf. + + In Python, `lockf` actually calls `fcntl`, so this should work with + any filesystem implementation that supports locking through the fcntl + calls. This includes distributed filesystems like Lustre (when flock + is enabled) and recent NFS versions. + """ + + def __init__(self, path, start=0, length=0): + """Construct a new lock on the file at ``path``. + + By default, the lock applies to the whole file. Optionally, + caller can specify a byte range beginning ``start`` bytes from + the start of the file and extending ``length`` bytes from there. + + This exposes a subset of fcntl locking functionality. It does + not currently expose the ``whence`` parameter -- ``whence`` is + always os.SEEK_SET and ``start`` is always evaluated from the + beginning of the file. + """ + self.path = path + self._file = None self._reads = 0 self._writes = 0 + # byte range parameters + self._start = start + self._length = length - def _lock(self, op, timeout): + # PID and host of lock holder + self.pid = self.old_pid = None + self.host = self.old_host = None + + def _lock(self, op, timeout=_default_timeout): """This takes a lock using POSIX locks (``fnctl.lockf``). - The lock is implemented as a spin lock using a nonblocking - call to lockf(). + The lock is implemented as a spin lock using a nonblocking call + to lockf(). On acquiring an exclusive lock, the lock writes this process's - pid and host to the lock file, in case the holding process - needs to be killed later. + pid and host to the lock file, in case the holding process needs + to be killed later. If the lock times out, it raises a ``LockError``. """ start_time = time.time() while (time.time() - start_time) < timeout: try: - if self._fd is None: - self._fd = os.open(self._file_path, os.O_RDWR) - - fcntl.lockf(self._fd, op | fcntl.LOCK_NB) + # If we could write the file, we'd have opened it 'r+'. + # Raise an error when we attempt to upgrade to a write lock. + if op == fcntl.LOCK_EX: + if self._file and self._file.mode == 'r': + raise LockError( + "Can't take exclusive lock on read-only file: %s" + % self.path) + + # Create file and parent directories if they don't exist. + if self._file is None: + self._ensure_parent_directory() + + # Prefer to open 'r+' to allow upgrading to write + # lock later if possible. Open read-only if we can't + # write the lock file at all. + os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+' + if os.path.exists(self.path) and not os.access( + self.path, os.W_OK): + os_mode, fd_mode = os.O_RDONLY, 'r' + + fd = os.open(self.path, os_mode) + self._file = os.fdopen(fd, fd_mode) + + # Try to get the lock (will raise if not available.) + fcntl.lockf(self._file, op | fcntl.LOCK_NB, + self._length, self._start, os.SEEK_SET) + + # All locks read the owner PID and host + self._read_lock_data() + + # Exclusive locks write their PID/host if op == fcntl.LOCK_EX: - os.write(self._fd, "pid=%s,host=%s" % (os.getpid(), socket.getfqdn())) + self._write_lock_data() + return except IOError as error: @@ -75,6 +136,39 @@ class Lock(object): raise LockError("Timed out waiting for lock.") + def _ensure_parent_directory(self): + parent = os.path.dirname(self.path) + try: + os.makedirs(parent) + return True + except OSError as e: + # makedirs can fail when diretory already exists. + if not (e.errno == errno.EEXIST and os.path.isdir(parent) or + e.errno == errno.EISDIR): + raise + + def _read_lock_data(self): + """Read PID and host data out of the file if it is there.""" + line = self._file.read() + if line: + pid, host = line.strip().split(',') + _, _, self.pid = pid.rpartition('=') + _, _, self.host = host.rpartition('=') + + def _write_lock_data(self): + """Write PID and host data to the file, recording old values.""" + self.old_pid = self.pid + self.old_host = self.host + + self.pid = os.getpid() + self.host = socket.getfqdn() + + # write pid, host to disk to sync over FS + self._file.seek(0) + self._file.write("pid=%s,host=%s" % (self.pid, self.host)) + self._file.truncate() + self._file.flush() + os.fsync(self._file.fileno()) def _unlock(self): """Releases a lock using POSIX locks (``fcntl.lockf``) @@ -83,10 +177,10 @@ class Lock(object): be masquerading as write locks, but this removes either. """ - fcntl.lockf(self._fd,fcntl.LOCK_UN) - os.close(self._fd) - self._fd = None - + fcntl.lockf(self._file, fcntl.LOCK_UN, + self._length, self._start, os.SEEK_SET) + self._file.close() + self._file = None def acquire_read(self, timeout=_default_timeout): """Acquires a recursive, shared lock for reading. @@ -100,14 +194,15 @@ class Lock(object): """ if self._reads == 0 and self._writes == 0: - self._lock(fcntl.LOCK_SH, timeout) # can raise LockError. + tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' + .format(self)) + self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError. self._reads += 1 return True else: self._reads += 1 return False - def acquire_write(self, timeout=_default_timeout): """Acquires a recursive, exclusive lock for writing. @@ -120,14 +215,16 @@ class Lock(object): """ if self._writes == 0: - self._lock(fcntl.LOCK_EX, timeout) # can raise LockError. + tty.debug( + 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' + .format(self)) + self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError. self._writes += 1 return True else: self._writes += 1 return False - def release_read(self): """Releases a read lock. @@ -141,6 +238,8 @@ class Lock(object): assert self._reads > 0 if self._reads == 1 and self._writes == 0: + tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Released]' + .format(self)) self._unlock() # can raise LockError. self._reads -= 1 return True @@ -148,7 +247,6 @@ class Lock(object): self._reads -= 1 return False - def release_write(self): """Releases a write lock. @@ -162,6 +260,8 @@ class Lock(object): assert self._writes > 0 if self._writes == 1 and self._reads == 0: + tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]' + .format(self)) self._unlock() # can raise LockError. self._writes -= 1 return True @@ -170,6 +270,70 @@ class Lock(object): return False +class LockTransaction(object): + """Simple nested transaction context manager that uses a file lock. + + This class can trigger actions when the lock is acquired for the + first time and released for the last. + + If the acquire_fn returns a value, it is used as the return value for + __enter__, allowing it to be passed as the `as` argument of a `with` + statement. + + If acquire_fn returns a context manager, *its* `__enter__` function will be + called in `__enter__` after acquire_fn, and its `__exit__` funciton will be + called before `release_fn` in `__exit__`, allowing you to nest a context + manager to be used along with the lock. + + Timeout for lock is customizable. + + """ + + def __init__(self, lock, acquire_fn=None, release_fn=None, + timeout=_default_timeout): + self._lock = lock + self._timeout = timeout + self._acquire_fn = acquire_fn + self._release_fn = release_fn + self._as = None + + def __enter__(self): + if self._enter() and self._acquire_fn: + self._as = self._acquire_fn() + if hasattr(self._as, '__enter__'): + return self._as.__enter__() + else: + return self._as + + def __exit__(self, type, value, traceback): + suppress = False + if self._exit(): + if self._as and hasattr(self._as, '__exit__'): + if self._as.__exit__(type, value, traceback): + suppress = True + if self._release_fn: + if self._release_fn(type, value, traceback): + suppress = True + return suppress + + +class ReadTransaction(LockTransaction): + + def _enter(self): + return self._lock.acquire_read(self._timeout) + + def _exit(self): + return self._lock.release_read() + + +class WriteTransaction(LockTransaction): + + def _enter(self): + return self._lock.acquire_write(self._timeout) + + def _exit(self): + return self._lock.release_write() + + class LockError(Exception): """Raised when an attempt to acquire a lock times out.""" - pass |