Source code for fictive.patterns.concurrency

"""
Concurrency Primitives

"""

import contextlib
import functools
import time

from fictive.patterns.decorators import optional_arguments


[docs]class ContextLock(object): """ provides some context-management features for an encapsulated lock """
[docs] def __init__(self, lock): self._lock = lock
[docs] def acquire(self, blocking=True, timeout=-1): """ acquire the lock """ return self._lock.acquire(blocking, timeout)
[docs] def release(self): """ release the lock """ self._lock.release()
def __enter__(self): return self._lock.__enter__() def __exit__(self, exc_type, exc_value, traceback): return self._lock.__exit__(exc_type, exc_value, traceback)
[docs] class LockNotAcquiredError(Exception): """ raised by a `protect`\\ed function when the lock could not be acquired """
@property def protect(self): """ returns a decorator that will acquire the lock before invoking the wrapped function """ @optional_arguments def multimodal_protect(blocking=True, timeout=-1): def protect_wrapper(wrapped): @functools.wraps(wrapped) def acquire_then_execute(*args, **kwargs): if not self.acquire(blocking=blocking, timeout=timeout): raise self.LockNotAcquiredError() try: return wrapped(*args, **kwargs) finally: self.release() return acquire_then_execute return protect_wrapper return multimodal_protect
[docs] @contextlib.contextmanager def acquire_context(self, blocking=True, timeout=-1): """ a context manager that will attempt to acquire the lock NB: if blocking is `False` or timeout is > 0, make sure to check the returned value to determine whteher the lock was actually acquired. If a `LockNotAcquiredError` is raised within the context, it will be handled by the context manager and the context will immediately cleanup and exit:: lock = SpinLock(some_non_blocking_lock) with lock.acquire_context(timeout=5) as acquired: if not aquired: raise lock.LockNotAcquiredError() # lock is acquired do_something() """ acquired = self.acquire(blocking, timeout) try: yield acquired except self.LockNotAcquiredError: pass finally: if acquired: self.release()
[docs]class SpinLock(object): """ Implements a spin lock around a nonblocking lock """ DEFAULT_SPIN_DELAY = 0.001
[docs] def __init__(self, lock, spin_delay=None): # pylint: disable=keyword-arg-before-vararg self._lock = lock self.spin_delay = spin_delay or self.DEFAULT_SPIN_DELAY
[docs] def acquire(self, blocking=True, timeout=-1): """ If `blocking`, keep trying the lock until acquired (or `timeout` is reached) """ if timeout != -1 and not blocking: raise ValueError("can't specify a timeout for a non-blocking call") start = time.time() while timeout < 0 or time.time() - start < timeout: if self._lock.acquire(False): return True if not blocking: return False time.sleep(self.spin_delay) return False
[docs] def release(self): """ release the lock """ self._lock.release()
def __enter__(self): return self._lock.__enter__() def __exit__(self, exc_type, exc_value, traceback): return self._lock.__exit__(exc_type, exc_value, traceback)