From f406fcb843edcea359e3fb6103eed560dadc1355 Mon Sep 17 00:00:00 2001 From: Gregory Becker Date: Mon, 31 Aug 2015 09:38:38 -0700 Subject: Fixed several issues from code review Most importantly wrote the Lock, Read_Lock_Instance, and Write_Lock_Instance classes in lock.py Updated the locking in database.py TODO: Lock on larger areas --- lib/spack/llnl/util/lock.py | 136 ++++++++++++++++++++++++++++++++++++++++++++ lib/spack/spack/database.py | 130 ++++++++++++++++-------------------------- 2 files changed, 184 insertions(+), 82 deletions(-) create mode 100644 lib/spack/llnl/util/lock.py (limited to 'lib') diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py new file mode 100644 index 0000000000..05641475ed --- /dev/null +++ b/lib/spack/llnl/util/lock.py @@ -0,0 +1,136 @@ +############################################################################## +# Copyright (c) 2013, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory. +# +# This file is part of Spack. +# Written by Todd Gamblin, tgamblin@llnl.gov, All rights reserved. +# LLNL-CODE-647188 +# +# For details, see https://scalability-llnl.github.io/spack +# Please also see the LICENSE file for our notice and the LGPL. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License (as published by +# the Free Software Foundation) version 2.1 dated February 1999. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the IMPLIED WARRANTY OF +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the terms and +# conditions of the GNU General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +############################################################################## +import os +import fcntl +import errno +import time +import socket + + +class Read_Lock_Instance(object): + """ + A context manager for getting shared access to the object lock + Arguments are lock and timeout (default 5 minutes) + """ + def __init__(self,lock,timeout = 300): + self._lock = lock + self._timeout = timeout + def __enter__(self): + self._lock.acquire_read(self._timeout) + def __exit__(self,type,value,traceback): + self._lock.release_read() + + +class Write_Lock_Instance(object): + """ + A context manager for getting exclusive access to the object lock + Arguments are lock and timeout (default 5 minutes) + """ + def __init__(self,lock,timeout = 300): + self._lock = lock + self._timeout = timeout + def __enter__(self): + self._lock.acquire_write(self._timeout) + def __exit__(self,type,value,traceback): + self._lock.release_write() + + +class Lock(object): + def __init__(self,file_path): + self._file_path = file_path + self._fd = os.open(file_path,os.O_RDWR) + self._reads = 0 + self._writes = 0 + + + def acquire_read(self,timeout): + """ + Implements recursive lock. If held in both read and write mode, + the write lock will be maintained until all locks are released + """ + if self._reads == 0 and self._writes == 0: + self._lock(fcntl.LOCK_SH,timeout) + self._reads += 1 + + + def acquire_write(self,timeout): + """ + Implements recursive lock + """ + if self._writes == 0: + self._lock(fcntl.LOCK_EX,timeout) + self._writes += 1 + + + def _lock(self,op,timeout): + """ + The timeout is implemented using nonblocking flock() + to avoid using signals for timing + Write locks store pid and host information to the lock file + Read locks do not store data + """ + total_time = 0 + while total_time < timeout: + try: + fcntl.flock(self._fd, op | fcntl.LOCK_NB) + if op == fcntl.LOCK_EX: + with open(self._file_path,'w') as f: + f.write("pid = "+str(os.getpid())+", host = "+socket.getfqdn()) + return + except IOError as error: + if error.errno == errno.EAGAIN or error.errno == EACCES: + pass + else: + raise + time.sleep(0.1) + total_time += 0.1 + + + def release_read(self): + """ + Assert there is a lock of the right type to release, recursive lock + """ + assert self._reads > 0 + if self._reads == 1 and self._writes == 0: + self._unlock() + self._reads -= 1 + + + def release_write(self): + """ + Assert there is a lock of the right type to release, recursive lock + """ + assert self._writes > 0 + if self._writes == 1 and self._reads == 0: + self._unlock() + self._writes -= 1 + + + def _unlock(self): + """ + Releases the lock regardless of mode. Note that read locks may be + masquerading as write locks at times, but this removes either. + """ + fcntl.flock(self._fd,fcntl.LOCK_UN) diff --git a/lib/spack/spack/database.py b/lib/spack/spack/database.py index 5e8bb172b8..6a429b68a9 100644 --- a/lib/spack/spack/database.py +++ b/lib/spack/spack/database.py @@ -38,6 +38,7 @@ from external.yaml.error import MarkedYAMLError import llnl.util.tty as tty from llnl.util.filesystem import join_path from llnl.util.lang import * +from llnl.util.lock import * import spack.error import spack.spec @@ -58,7 +59,7 @@ def _autospec(function): class Database(object): - def __init__(self,root,file_name="specDB.yaml"): + def __init__(self,root,file_name="_index.yaml"): """ Create an empty Database Location defaults to root/specDB.yaml @@ -67,28 +68,31 @@ class Database(object): path: the path to the install of that package dep_hash: a hash of the dependence DAG for that package """ - self.root = root - self.file_name = file_name - self.file_path = join_path(self.root,self.file_name) + self._root = root + self._file_name = file_name + self._file_path = join_path(self._root,self._file_name) - self.lock_name = "db_lock" - self.lock_path = join_path(self.root,self.lock_name) + self._lock_name = "_db_lock" + self._lock_path = join_path(self._root,self._lock_name) + if not os.path.exists(self._lock_path): + open(self._lock_path,'w').close() + self.lock = Lock(self._lock_path) - self.data = [] - self.last_write_time = 0 + self._data = [] + self._last_write_time = 0 - def from_yaml(self,stream): + def _read_from_yaml(self,stream): """ Fill database from YAML, do not maintain old data - Translate the spec portions from node-dict form to spec from + Translate the spec portions from node-dict form to spec form """ try: file = yaml.load(stream) except MarkedYAMLError, e: raise SpackYAMLError("error parsing YAML database:", str(e)) - if file==None: + if file is None: return data = {} @@ -104,51 +108,34 @@ class Database(object): for idx in sph['deps']: sph['spec'].dependencies[data[idx]['spec'].name] = data[idx]['spec'] - self.data = data.values() + self._data = data.values() def read_database(self): """ Re-read Database from the data in the set location If the cache is fresh, return immediately. - Implemented with mkdir locking for the database file. """ if not self.is_dirty(): return - lock=0 - while lock==0: - try: - os.mkdir(self.lock_path) - lock=1 - except OSError as err: - pass - - #The try statement ensures that a failure won't leave the - #database locked to other processes. - try: - if os.path.isfile(self.file_path): - with open(self.file_path,'r') as f: - self.from_yaml(f) - else: + if os.path.isfile(self._file_path): + with open(self._file_path,'r') as f: + self._read_from_yaml(f) + else: #The file doesn't exist, construct empty data. - self.data = [] - except: - os.rmdir(self.lock_path) - raise - - os.rmdir(self.lock_path) + self._data = [] - def write_database_to_yaml(self,stream): + def _write_database_to_yaml(self,stream): """ Replace each spec with its dict-node form Then stream all data to YAML """ node_list = [] - spec_list = [sph['spec'] for sph in self.data] + spec_list = [sph['spec'] for sph in self._data] - for sph in self.data: + for sph in self._data: node = {} deps = [] for name,spec in sph['spec'].dependencies.items(): @@ -167,46 +154,23 @@ class Database(object): def write(self): """ Write the database to the standard location - Implements mkdir locking for the database file + Everywhere that the database is written it is read + within the same lock, so there is no need to refresh + the database within write() """ - lock=0 - while lock==0: - try: - os.mkdir(self.lock_path) - lock=1 - except OSError as err: - pass - - #The try statement ensures that a failure won't leave the - #database locked to other processes. - try: - with open(self.file_path,'w') as f: - self.last_write_time = int(time.time()) - self.write_database_to_yaml(f) - except: - os.rmdir(self.lock_path) - raise - - os.rmdir(self.lock_path) - - - def get_index_of(self, spec): - """ - Returns the index of a spec in the database - If unable to find the spec it returns -1 - """ - for index, sph in enumerate(self.data): - if sph['spec'] == spec: - return index - return -1 - + temp_name = os.getpid() + socket.getfqdn() + ".temp" + temp_file = path.join(self._root,temp_name) + with open(self.temp_path,'w') as f: + self._last_write_time = int(time.time()) + self._write_database_to_yaml(f) + os.rename(temp_name,self._file_path) def is_dirty(self): """ Returns true iff the database file exists and was most recently written to by another spack instance. """ - return (os.path.isfile(self.file_path) and (os.path.getmtime(self.file_path) > self.last_write_time)) + return (os.path.isfile(self._file_path) and (os.path.getmtime(self._file_path) > self._last_write_time)) # @_autospec @@ -215,16 +179,15 @@ class Database(object): Add the specified entry as a dict Write the database back to memory """ - self.read_database() - sph = {} sph['spec']=spec sph['path']=path sph['hash']=spec.dag_hash() - self.data.append(sph) - - self.write() + with Write_Lock_Instance(self.lock,60): + self.read_database() + self._data.append(sph) + self.write() @_autospec @@ -234,13 +197,15 @@ class Database(object): Searches for and removes the specified spec Writes the database back to memory """ - self.read_database() + with Write_Lock_Instance(self.lock,60): + self.read_database() - for sp in self.data: - if sp['hash'] == spec.dag_hash() and sp['spec'] == spec: - self.data.remove(sp) + for sp in self._data: + #Not sure the hash comparison is necessary + if sp['hash'] == spec.dag_hash() and sp['spec'] == spec: + self._data.remove(sp) - self.write() + self.write() @_autospec @@ -272,10 +237,11 @@ class Database(object): Read installed package names from the database and return their specs """ - self.read_database() + with Read_Lock_Instance(self.lock,60): + self.read_database() installed = [] - for sph in self.data: + for sph in self._data: installed.append(sph['spec']) return installed -- cgit v1.2.3-70-g09d2