"""
Concurrency Primitives
"""
import contextlib
import functools
import time
from fictive.patterns.decorators import optional_arguments
[docs]class ContextLock(object):
"""
provides some context-management features for an encapsulated lock
"""
[docs] def __init__(self, lock):
self._lock = lock
[docs] def acquire(self, blocking=True, timeout=-1):
"""
acquire the lock
"""
return self._lock.acquire(blocking, timeout)
[docs] def release(self):
"""
release the lock
"""
self._lock.release()
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, exc_type, exc_value, traceback):
return self._lock.__exit__(exc_type, exc_value, traceback)
[docs] class LockNotAcquiredError(Exception):
"""
raised by a `protect`\\ed function when the lock could not be acquired
"""
@property
def protect(self):
"""
returns a decorator that will acquire the lock before invoking the wrapped function
"""
@optional_arguments
def multimodal_protect(blocking=True, timeout=-1):
def protect_wrapper(wrapped):
@functools.wraps(wrapped)
def acquire_then_execute(*args, **kwargs):
if not self.acquire(blocking=blocking, timeout=timeout):
raise self.LockNotAcquiredError()
try:
return wrapped(*args, **kwargs)
finally:
self.release()
return acquire_then_execute
return protect_wrapper
return multimodal_protect
[docs] @contextlib.contextmanager
def acquire_context(self, blocking=True, timeout=-1):
"""
a context manager that will attempt to acquire the lock
NB: if blocking is `False` or timeout is > 0, make sure to
check the returned value to determine whteher the lock was
actually acquired. If a `LockNotAcquiredError` is raised
within the context, it will be handled by the context manager
and the context will immediately cleanup and exit::
lock = SpinLock(some_non_blocking_lock)
with lock.acquire_context(timeout=5) as acquired:
if not aquired:
raise lock.LockNotAcquiredError()
# lock is acquired
do_something()
"""
acquired = self.acquire(blocking, timeout)
try:
yield acquired
except self.LockNotAcquiredError:
pass
finally:
if acquired:
self.release()
[docs]class SpinLock(object):
"""
Implements a spin lock around a nonblocking lock
"""
DEFAULT_SPIN_DELAY = 0.001
[docs] def __init__(self, lock, spin_delay=None):
# pylint: disable=keyword-arg-before-vararg
self._lock = lock
self.spin_delay = spin_delay or self.DEFAULT_SPIN_DELAY
[docs] def acquire(self, blocking=True, timeout=-1):
"""
If `blocking`, keep trying the lock until acquired (or `timeout` is reached)
"""
if timeout != -1 and not blocking:
raise ValueError("can't specify a timeout for a non-blocking call")
start = time.time()
while timeout < 0 or time.time() - start < timeout:
if self._lock.acquire(False):
return True
if not blocking:
return False
time.sleep(self.spin_delay)
return False
[docs] def release(self):
"""
release the lock
"""
self._lock.release()
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, exc_type, exc_value, traceback):
return self._lock.__exit__(exc_type, exc_value, traceback)