summaryrefslogtreecommitdiff
path: root/lib/spack/llnl/util/lock.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/spack/llnl/util/lock.py')
-rw-r--r--lib/spack/llnl/util/lock.py210
1 files changed, 187 insertions, 23 deletions
diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py
index 479a1b0167..2e44a94798 100644
--- a/lib/spack/llnl/util/lock.py
+++ b/lib/spack/llnl/util/lock.py
@@ -28,6 +28,13 @@ import errno
import time
import socket
+import llnl.util.tty as tty
+
+
+__all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction',
+ 'LockError']
+
+
# Default timeout in seconds, after which locks will raise exceptions.
_default_timeout = 60
@@ -36,34 +43,88 @@ _sleep_time = 1e-5
class Lock(object):
- def __init__(self,file_path):
- self._file_path = file_path
- self._fd = None
+ """This is an implementation of a filesystem lock using Python's lockf.
+
+ In Python, `lockf` actually calls `fcntl`, so this should work with
+ any filesystem implementation that supports locking through the fcntl
+ calls. This includes distributed filesystems like Lustre (when flock
+ is enabled) and recent NFS versions.
+ """
+
+ def __init__(self, path, start=0, length=0):
+ """Construct a new lock on the file at ``path``.
+
+ By default, the lock applies to the whole file. Optionally,
+ caller can specify a byte range beginning ``start`` bytes from
+ the start of the file and extending ``length`` bytes from there.
+
+ This exposes a subset of fcntl locking functionality. It does
+ not currently expose the ``whence`` parameter -- ``whence`` is
+ always os.SEEK_SET and ``start`` is always evaluated from the
+ beginning of the file.
+ """
+ self.path = path
+ self._file = None
self._reads = 0
self._writes = 0
+ # byte range parameters
+ self._start = start
+ self._length = length
- def _lock(self, op, timeout):
+ # PID and host of lock holder
+ self.pid = self.old_pid = None
+ self.host = self.old_host = None
+
+ def _lock(self, op, timeout=_default_timeout):
"""This takes a lock using POSIX locks (``fnctl.lockf``).
- The lock is implemented as a spin lock using a nonblocking
- call to lockf().
+ The lock is implemented as a spin lock using a nonblocking call
+ to lockf().
On acquiring an exclusive lock, the lock writes this process's
- pid and host to the lock file, in case the holding process
- needs to be killed later.
+ pid and host to the lock file, in case the holding process needs
+ to be killed later.
If the lock times out, it raises a ``LockError``.
"""
start_time = time.time()
while (time.time() - start_time) < timeout:
try:
- if self._fd is None:
- self._fd = os.open(self._file_path, os.O_RDWR)
-
- fcntl.lockf(self._fd, op | fcntl.LOCK_NB)
+ # If we could write the file, we'd have opened it 'r+'.
+ # Raise an error when we attempt to upgrade to a write lock.
+ if op == fcntl.LOCK_EX:
+ if self._file and self._file.mode == 'r':
+ raise LockError(
+ "Can't take exclusive lock on read-only file: %s"
+ % self.path)
+
+ # Create file and parent directories if they don't exist.
+ if self._file is None:
+ self._ensure_parent_directory()
+
+ # Prefer to open 'r+' to allow upgrading to write
+ # lock later if possible. Open read-only if we can't
+ # write the lock file at all.
+ os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+'
+ if os.path.exists(self.path) and not os.access(
+ self.path, os.W_OK):
+ os_mode, fd_mode = os.O_RDONLY, 'r'
+
+ fd = os.open(self.path, os_mode)
+ self._file = os.fdopen(fd, fd_mode)
+
+ # Try to get the lock (will raise if not available.)
+ fcntl.lockf(self._file, op | fcntl.LOCK_NB,
+ self._length, self._start, os.SEEK_SET)
+
+ # All locks read the owner PID and host
+ self._read_lock_data()
+
+ # Exclusive locks write their PID/host
if op == fcntl.LOCK_EX:
- os.write(self._fd, "pid=%s,host=%s" % (os.getpid(), socket.getfqdn()))
+ self._write_lock_data()
+
return
except IOError as error:
@@ -75,6 +136,39 @@ class Lock(object):
raise LockError("Timed out waiting for lock.")
+ def _ensure_parent_directory(self):
+ parent = os.path.dirname(self.path)
+ try:
+ os.makedirs(parent)
+ return True
+ except OSError as e:
+ # makedirs can fail when diretory already exists.
+ if not (e.errno == errno.EEXIST and os.path.isdir(parent) or
+ e.errno == errno.EISDIR):
+ raise
+
+ def _read_lock_data(self):
+ """Read PID and host data out of the file if it is there."""
+ line = self._file.read()
+ if line:
+ pid, host = line.strip().split(',')
+ _, _, self.pid = pid.rpartition('=')
+ _, _, self.host = host.rpartition('=')
+
+ def _write_lock_data(self):
+ """Write PID and host data to the file, recording old values."""
+ self.old_pid = self.pid
+ self.old_host = self.host
+
+ self.pid = os.getpid()
+ self.host = socket.getfqdn()
+
+ # write pid, host to disk to sync over FS
+ self._file.seek(0)
+ self._file.write("pid=%s,host=%s" % (self.pid, self.host))
+ self._file.truncate()
+ self._file.flush()
+ os.fsync(self._file.fileno())
def _unlock(self):
"""Releases a lock using POSIX locks (``fcntl.lockf``)
@@ -83,10 +177,10 @@ class Lock(object):
be masquerading as write locks, but this removes either.
"""
- fcntl.lockf(self._fd,fcntl.LOCK_UN)
- os.close(self._fd)
- self._fd = None
-
+ fcntl.lockf(self._file, fcntl.LOCK_UN,
+ self._length, self._start, os.SEEK_SET)
+ self._file.close()
+ self._file = None
def acquire_read(self, timeout=_default_timeout):
"""Acquires a recursive, shared lock for reading.
@@ -100,14 +194,15 @@ class Lock(object):
"""
if self._reads == 0 and self._writes == 0:
- self._lock(fcntl.LOCK_SH, timeout) # can raise LockError.
+ tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
+ .format(self))
+ self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError.
self._reads += 1
return True
else:
self._reads += 1
return False
-
def acquire_write(self, timeout=_default_timeout):
"""Acquires a recursive, exclusive lock for writing.
@@ -120,14 +215,16 @@ class Lock(object):
"""
if self._writes == 0:
- self._lock(fcntl.LOCK_EX, timeout) # can raise LockError.
+ tty.debug(
+ 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
+ .format(self))
+ self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError.
self._writes += 1
return True
else:
self._writes += 1
return False
-
def release_read(self):
"""Releases a read lock.
@@ -141,6 +238,8 @@ class Lock(object):
assert self._reads > 0
if self._reads == 1 and self._writes == 0:
+ tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Released]'
+ .format(self))
self._unlock() # can raise LockError.
self._reads -= 1
return True
@@ -148,7 +247,6 @@ class Lock(object):
self._reads -= 1
return False
-
def release_write(self):
"""Releases a write lock.
@@ -162,6 +260,8 @@ class Lock(object):
assert self._writes > 0
if self._writes == 1 and self._reads == 0:
+ tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]'
+ .format(self))
self._unlock() # can raise LockError.
self._writes -= 1
return True
@@ -170,6 +270,70 @@ class Lock(object):
return False
+class LockTransaction(object):
+ """Simple nested transaction context manager that uses a file lock.
+
+ This class can trigger actions when the lock is acquired for the
+ first time and released for the last.
+
+ If the acquire_fn returns a value, it is used as the return value for
+ __enter__, allowing it to be passed as the `as` argument of a `with`
+ statement.
+
+ If acquire_fn returns a context manager, *its* `__enter__` function will be
+ called in `__enter__` after acquire_fn, and its `__exit__` funciton will be
+ called before `release_fn` in `__exit__`, allowing you to nest a context
+ manager to be used along with the lock.
+
+ Timeout for lock is customizable.
+
+ """
+
+ def __init__(self, lock, acquire_fn=None, release_fn=None,
+ timeout=_default_timeout):
+ self._lock = lock
+ self._timeout = timeout
+ self._acquire_fn = acquire_fn
+ self._release_fn = release_fn
+ self._as = None
+
+ def __enter__(self):
+ if self._enter() and self._acquire_fn:
+ self._as = self._acquire_fn()
+ if hasattr(self._as, '__enter__'):
+ return self._as.__enter__()
+ else:
+ return self._as
+
+ def __exit__(self, type, value, traceback):
+ suppress = False
+ if self._exit():
+ if self._as and hasattr(self._as, '__exit__'):
+ if self._as.__exit__(type, value, traceback):
+ suppress = True
+ if self._release_fn:
+ if self._release_fn(type, value, traceback):
+ suppress = True
+ return suppress
+
+
+class ReadTransaction(LockTransaction):
+
+ def _enter(self):
+ return self._lock.acquire_read(self._timeout)
+
+ def _exit(self):
+ return self._lock.release_read()
+
+
+class WriteTransaction(LockTransaction):
+
+ def _enter(self):
+ return self._lock.acquire_write(self._timeout)
+
+ def _exit(self):
+ return self._lock.release_write()
+
+
class LockError(Exception):
"""Raised when an attempt to acquire a lock times out."""
- pass