summaryrefslogtreecommitdiff
path: root/lib/spack/llnl
diff options
context:
space:
mode:
authorMassimiliano Culpo <massimiliano.culpo@gmail.com>2023-07-19 11:23:08 +0200
committerGitHub <noreply@github.com>2023-07-19 11:23:08 +0200
commitf34c93c5f80725a98ebff3ae15aab22630675726 (patch)
tree7f8227dc6d91eed197f27ccadf1a0e1f52618cab /lib/spack/llnl
parenta7f2abf9242ad321b410cc2f43b394856facba23 (diff)
downloadspack-f34c93c5f80725a98ebff3ae15aab22630675726.tar.gz
spack-f34c93c5f80725a98ebff3ae15aab22630675726.tar.bz2
spack-f34c93c5f80725a98ebff3ae15aab22630675726.tar.xz
spack-f34c93c5f80725a98ebff3ae15aab22630675726.zip
llnl.util.lock: add type-hints (#38977)
Also uppercase global variables in the module
Diffstat (limited to 'lib/spack/llnl')
-rw-r--r--lib/spack/llnl/util/lock.py169
1 files changed, 105 insertions, 64 deletions
diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py
index 2b9d2dfbf2..a533c57176 100644
--- a/lib/spack/llnl/util/lock.py
+++ b/lib/spack/llnl/util/lock.py
@@ -9,9 +9,10 @@ import socket
import sys
import time
from datetime import datetime
+from types import TracebackType
+from typing import IO, Any, Callable, ContextManager, Dict, Generator, Optional, Tuple, Type, Union
-import llnl.util.tty as tty
-from llnl.util.lang import pretty_seconds
+from llnl.util import lang, tty
import spack.util.string
@@ -34,9 +35,12 @@ __all__ = [
]
-#: A useful replacement for functions that should return True when not provided
-#: for example.
-true_fn = lambda: True
+ReleaseFnType = Optional[Callable[[], bool]]
+
+
+def true_fn() -> bool:
+ """A function that always returns True."""
+ return True
class OpenFile:
@@ -48,7 +52,7 @@ class OpenFile:
file descriptors as well in the future.
"""
- def __init__(self, fh):
+ def __init__(self, fh: IO) -> None:
self.fh = fh
self.refs = 0
@@ -78,11 +82,11 @@ class OpenFileTracker:
work in Python and assume the GIL.
"""
- def __init__(self):
+ def __init__(self) -> None:
"""Create a new ``OpenFileTracker``."""
- self._descriptors = {}
+ self._descriptors: Dict[Any, OpenFile] = {}
- def get_fh(self, path):
+ def get_fh(self, path: str) -> IO:
"""Get a filehandle for a lockfile.
This routine will open writable files for read/write even if you're asking
@@ -90,7 +94,7 @@ class OpenFileTracker:
(write) lock later if requested.
Arguments:
- path (str): path to lock file we want a filehandle for
+ path: path to lock file we want a filehandle for
"""
# Open writable files as 'r+' so we can upgrade to write later
os_mode, fh_mode = (os.O_RDWR | os.O_CREAT), "r+"
@@ -157,7 +161,7 @@ class OpenFileTracker:
#: Open file descriptors for locks in this process. Used to prevent one process
#: from opening the sam file many times for different byte range locks
-file_tracker = OpenFileTracker()
+FILE_TRACKER = OpenFileTracker()
def _attempts_str(wait_time, nattempts):
@@ -166,7 +170,7 @@ def _attempts_str(wait_time, nattempts):
return ""
attempts = spack.util.string.plural(nattempts, "attempt")
- return " after {} and {}".format(pretty_seconds(wait_time), attempts)
+ return " after {} and {}".format(lang.pretty_seconds(wait_time), attempts)
class LockType:
@@ -188,7 +192,7 @@ class LockType:
return lock
@staticmethod
- def is_valid(op):
+ def is_valid(op: int) -> bool:
return op == LockType.READ or op == LockType.WRITE
@@ -207,7 +211,15 @@ class Lock:
overlapping byte ranges in the same file).
"""
- def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, desc=""):
+ def __init__(
+ self,
+ path: str,
+ start: int = 0,
+ length: int = 0,
+ default_timeout: Optional[float] = None,
+ debug: bool = False,
+ desc: str = "",
+ ) -> None:
"""Construct a new lock on the file at ``path``.
By default, the lock applies to the whole file. Optionally,
@@ -220,17 +232,17 @@ class Lock:
beginning of the file.
Args:
- path (str): path to the lock
- start (int): optional byte offset at which the lock starts
- length (int): optional number of bytes to lock
- default_timeout (int): number of seconds to wait for lock attempts,
+ path: path to the lock
+ start: optional byte offset at which the lock starts
+ length: optional number of bytes to lock
+ default_timeout: seconds to wait for lock attempts,
where None means to wait indefinitely
- debug (bool): debug mode specific to locking
- desc (str): optional debug message lock description, which is
+ debug: debug mode specific to locking
+ desc: optional debug message lock description, which is
helpful for distinguishing between different Spack locks.
"""
self.path = path
- self._file = None
+ self._file: Optional[IO] = None
self._reads = 0
self._writes = 0
@@ -242,7 +254,7 @@ class Lock:
self.debug = debug
# optional debug description
- self.desc = " ({0})".format(desc) if desc else ""
+ self.desc = f" ({desc})" if desc else ""
# If the user doesn't set a default timeout, or if they choose
# None, 0, etc. then lock attempts will not time out (unless the
@@ -250,11 +262,15 @@ class Lock:
self.default_timeout = default_timeout or None
# PID and host of lock holder (only used in debug mode)
- self.pid = self.old_pid = None
- self.host = self.old_host = None
+ self.pid: Optional[int] = None
+ self.old_pid: Optional[int] = None
+ self.host: Optional[str] = None
+ self.old_host: Optional[str] = None
@staticmethod
- def _poll_interval_generator(_wait_times=None):
+ def _poll_interval_generator(
+ _wait_times: Optional[Tuple[float, float, float]] = None
+ ) -> Generator[float, None, None]:
"""This implements a backoff scheme for polling a contended resource
by suggesting a succession of wait times between polls.
@@ -277,21 +293,21 @@ class Lock:
num_requests += 1
yield wait_time
- def __repr__(self):
+ def __repr__(self) -> str:
"""Formal representation of the lock."""
rep = "{0}(".format(self.__class__.__name__)
for attr, value in self.__dict__.items():
rep += "{0}={1}, ".format(attr, value.__repr__())
return "{0})".format(rep.strip(", "))
- def __str__(self):
+ def __str__(self) -> str:
"""Readable string (with key fields) of the lock."""
location = "{0}[{1}:{2}]".format(self.path, self._start, self._length)
timeout = "timeout={0}".format(self.default_timeout)
activity = "#reads={0}, #writes={1}".format(self._reads, self._writes)
return "({0}, {1}, {2})".format(location, timeout, activity)
- def _lock(self, op, timeout=None):
+ def _lock(self, op: int, timeout: Optional[float] = None) -> Tuple[float, int]:
"""This takes a lock using POSIX locks (``fcntl.lockf``).
The lock is implemented as a spin lock using a nonblocking call
@@ -310,7 +326,7 @@ class Lock:
# Create file and parent directories if they don't exist.
if self._file is None:
self._ensure_parent_directory()
- self._file = file_tracker.get_fh(self.path)
+ self._file = FILE_TRACKER.get_fh(self.path)
if LockType.to_module(op) == fcntl.LOCK_EX and self._file.mode == "r":
# Attempt to upgrade to write lock w/a read-only file.
@@ -319,7 +335,7 @@ class Lock:
self._log_debug(
"{} locking [{}:{}]: timeout {}".format(
- op_str.lower(), self._start, self._length, pretty_seconds(timeout or 0)
+ op_str.lower(), self._start, self._length, lang.pretty_seconds(timeout or 0)
)
)
@@ -343,15 +359,20 @@ class Lock:
total_wait_time = time.time() - start_time
raise LockTimeoutError(op_str.lower(), self.path, total_wait_time, num_attempts)
- def _poll_lock(self, op):
+ def _poll_lock(self, op: int) -> bool:
"""Attempt to acquire the lock in a non-blocking manner. Return whether
the locking attempt succeeds
"""
+ assert self._file is not None, "cannot poll a lock without the file being set"
module_op = LockType.to_module(op)
try:
# Try to get the lock (will raise if not available.)
fcntl.lockf(
- self._file, module_op | fcntl.LOCK_NB, self._length, self._start, os.SEEK_SET
+ self._file.fileno(),
+ module_op | fcntl.LOCK_NB,
+ self._length,
+ self._start,
+ os.SEEK_SET,
)
# help for debugging distributed locking
@@ -377,7 +398,7 @@ class Lock:
return False
- def _ensure_parent_directory(self):
+ def _ensure_parent_directory(self) -> str:
parent = os.path.dirname(self.path)
# relative paths to lockfiles in the current directory have no parent
@@ -396,20 +417,22 @@ class Lock:
raise
return parent
- def _read_log_debug_data(self):
+ def _read_log_debug_data(self) -> None:
"""Read PID and host data out of the file if it is there."""
+ assert self._file is not None, "cannot read debug log without the file being set"
self.old_pid = self.pid
self.old_host = self.host
line = self._file.read()
if line:
pid, host = line.strip().split(",")
- _, _, self.pid = pid.rpartition("=")
+ _, _, pid = pid.rpartition("=")
_, _, self.host = host.rpartition("=")
- self.pid = int(self.pid)
+ self.pid = int(pid)
- def _write_log_debug_data(self):
+ def _write_log_debug_data(self) -> None:
"""Write PID and host data to the file, recording old values."""
+ assert self._file is not None, "cannot write debug log without the file being set"
self.old_pid = self.pid
self.old_host = self.host
@@ -423,20 +446,21 @@ class Lock:
self._file.flush()
os.fsync(self._file.fileno())
- def _unlock(self):
+ def _unlock(self) -> None:
"""Releases a lock using POSIX locks (``fcntl.lockf``)
Releases the lock regardless of mode. Note that read locks may
be masquerading as write locks, but this removes either.
"""
- fcntl.lockf(self._file, fcntl.LOCK_UN, self._length, self._start, os.SEEK_SET)
- file_tracker.release_by_fh(self._file)
+ assert self._file is not None, "cannot unlock without the file being set"
+ fcntl.lockf(self._file.fileno(), fcntl.LOCK_UN, self._length, self._start, os.SEEK_SET)
+ FILE_TRACKER.release_by_fh(self._file)
self._file = None
self._reads = 0
self._writes = 0
- def acquire_read(self, timeout=None):
+ def acquire_read(self, timeout: Optional[float] = None) -> bool:
"""Acquires a recursive, shared lock for reading.
Read and write locks can be acquired and released in arbitrary
@@ -461,7 +485,7 @@ class Lock:
self._reads += 1
return False
- def acquire_write(self, timeout=None):
+ def acquire_write(self, timeout: Optional[float] = None) -> bool:
"""Acquires a recursive, exclusive lock for writing.
Read and write locks can be acquired and released in arbitrary
@@ -491,7 +515,7 @@ class Lock:
self._writes += 1
return False
- def is_write_locked(self):
+ def is_write_locked(self) -> bool:
"""Check if the file is write locked
Return:
@@ -508,7 +532,7 @@ class Lock:
return False
- def downgrade_write_to_read(self, timeout=None):
+ def downgrade_write_to_read(self, timeout: Optional[float] = None) -> None:
"""
Downgrade from an exclusive write lock to a shared read.
@@ -527,7 +551,7 @@ class Lock:
else:
raise LockDowngradeError(self.path)
- def upgrade_read_to_write(self, timeout=None):
+ def upgrade_read_to_write(self, timeout: Optional[float] = None) -> None:
"""
Attempts to upgrade from a shared read lock to an exclusive write.
@@ -546,7 +570,7 @@ class Lock:
else:
raise LockUpgradeError(self.path)
- def release_read(self, release_fn=None):
+ def release_read(self, release_fn: ReleaseFnType = None) -> bool:
"""Releases a read lock.
Arguments:
@@ -582,7 +606,7 @@ class Lock:
self._reads -= 1
return False
- def release_write(self, release_fn=None):
+ def release_write(self, release_fn: ReleaseFnType = None) -> bool:
"""Releases a write lock.
Arguments:
@@ -623,58 +647,58 @@ class Lock:
else:
return False
- def cleanup(self):
+ def cleanup(self) -> None:
if self._reads == 0 and self._writes == 0:
os.unlink(self.path)
else:
raise LockError("Attempting to cleanup active lock.")
- def _get_counts_desc(self):
+ def _get_counts_desc(self) -> str:
return (
"(reads {0}, writes {1})".format(self._reads, self._writes) if tty.is_verbose() else ""
)
- def _log_acquired(self, locktype, wait_time, nattempts):
+ def _log_acquired(self, locktype, wait_time, nattempts) -> None:
attempts_part = _attempts_str(wait_time, nattempts)
now = datetime.now()
desc = "Acquired at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg(locktype, "{0}{1}".format(desc, attempts_part)))
- def _log_acquiring(self, locktype):
+ def _log_acquiring(self, locktype) -> None:
self._log_debug(self._status_msg(locktype, "Acquiring"), level=3)
- def _log_debug(self, *args, **kwargs):
+ def _log_debug(self, *args, **kwargs) -> None:
"""Output lock debug messages."""
kwargs["level"] = kwargs.get("level", 2)
tty.debug(*args, **kwargs)
- def _log_downgraded(self, wait_time, nattempts):
+ def _log_downgraded(self, wait_time, nattempts) -> None:
attempts_part = _attempts_str(wait_time, nattempts)
now = datetime.now()
desc = "Downgraded at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg("READ LOCK", "{0}{1}".format(desc, attempts_part)))
- def _log_downgrading(self):
+ def _log_downgrading(self) -> None:
self._log_debug(self._status_msg("WRITE LOCK", "Downgrading"), level=3)
- def _log_released(self, locktype):
+ def _log_released(self, locktype) -> None:
now = datetime.now()
desc = "Released at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg(locktype, desc))
- def _log_releasing(self, locktype):
+ def _log_releasing(self, locktype) -> None:
self._log_debug(self._status_msg(locktype, "Releasing"), level=3)
- def _log_upgraded(self, wait_time, nattempts):
+ def _log_upgraded(self, wait_time, nattempts) -> None:
attempts_part = _attempts_str(wait_time, nattempts)
now = datetime.now()
desc = "Upgraded at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg("WRITE LOCK", "{0}{1}".format(desc, attempts_part)))
- def _log_upgrading(self):
+ def _log_upgrading(self) -> None:
self._log_debug(self._status_msg("READ LOCK", "Upgrading"), level=3)
- def _status_msg(self, locktype, status):
+ def _status_msg(self, locktype: str, status: str) -> str:
status_desc = "[{0}] {1}".format(status, self._get_counts_desc())
return "{0}{1.desc}: {1.path}[{1._start}:{1._length}] {2}".format(
locktype, self, status_desc
@@ -709,7 +733,13 @@ class LockTransaction:
"""
- def __init__(self, lock, acquire=None, release=None, timeout=None):
+ def __init__(
+ self,
+ lock: Lock,
+ acquire: Union[ReleaseFnType, ContextManager] = None,
+ release: Union[ReleaseFnType, ContextManager] = None,
+ timeout: Optional[float] = None,
+ ) -> None:
self._lock = lock
self._timeout = timeout
self._acquire_fn = acquire
@@ -724,15 +754,20 @@ class LockTransaction:
else:
return self._as
- def __exit__(self, type, value, traceback):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> bool:
suppress = False
def release_fn():
if self._release_fn is not None:
- return self._release_fn(type, value, traceback)
+ return self._release_fn(exc_type, exc_value, traceback)
if self._as and hasattr(self._as, "__exit__"):
- if self._as.__exit__(type, value, traceback):
+ if self._as.__exit__(exc_type, exc_value, traceback):
suppress = True
if self._exit(release_fn):
@@ -740,6 +775,12 @@ class LockTransaction:
return suppress
+ def _enter(self) -> bool:
+ return NotImplemented
+
+ def _exit(self, release_fn: ReleaseFnType) -> bool:
+ return NotImplemented
+
class ReadTransaction(LockTransaction):
"""LockTransaction context manager that does a read and releases it."""
@@ -785,7 +826,7 @@ class LockTimeoutError(LockError):
super().__init__(
fmt.format(
lock_type,
- pretty_seconds(time),
+ lang.pretty_seconds(time),
attempts,
"attempt" if attempts == 1 else "attempts",
path,