Source code for fictive.cache.mixins

"""
fictive.cache.mixins

Optional mix in features for an `AbstractCache`

"""

import argparse
import typing
import types

from fictive.patterns.concurrency import ContextLock
from fictive.cache.abstract import AbstractCache


[docs]class RLockingMixin(object): """ Uses a re-entrant lock to protect any mutations to the cached value """ # pylint: disable=too-few-public-methods
[docs] class CacheLockError(AbstractCache.CacheWriteError): """ Raised when a cache lock could not be acquired """
[docs] def __init__( self, *args, reentrant_lock, acquire_args: typing.Iterable = None, acquire_kwargs: typing.Mapping = None, **kwargs): super().__init__(*args, **kwargs) self._ctx_rlock = ContextLock(reentrant_lock) self.bind_acquire_params(*(acquire_args or ()), **(acquire_kwargs or {}))
[docs] def bind_acquire_params(self, *args, **kwargs): """ Bind the parameters that will be used for the lock `acquire` method """ self._acquire_params = argparse.Namespace(args=args, kwargs=kwargs)
@property def acquire_params(self): """ Read-only access """ return self._acquire_params def _dispatch_fetch(self, *args, **kwargs): """ Check whether the cache has been updated while waiting for the lock If a cache value is available, return that value rather than fetch """ params = self.acquire_params with self._ctx_rlock.acquire_context(*params.args, **params.kwargs) as acquired: if not acquired: raise self.CacheLockError() try: return self._dispatch_read(*args, **kwargs) except self.CacheMissError: return super()._dispatch_fetch(*args, **kwargs) def _dispatch_set(self, value, *args, **kwargs): params = self.acquire_params with self._ctx_rlock.acquire_context(*params.args, **params.kwargs) as acquired: if not acquired: raise self.CacheLockError() super()._dispatch_set(value, *args, **kwargs) def _dispatch_delete(self, cache_value, *args, **kwargs): params = self.acquire_params with self._ctx_rlock.acquire_context(*params.args, **params.kwargs) as acquired: if not acquired: raise self.CacheLockError() return super()._dispatch_delete(cache_value, *args, **kwargs)
[docs]class ValidatingMixin(object): """ Stored values are considerd a cache miss if they fail validation Validation is "reconsidered" if a fetch fails """ # pylint: disable=no-init # pylint: disable=too-few-public-methods
[docs] class CacheInvalidError(AbstractCache.CacheMissError): """ raised when a cached value is invalid and should be disregarded """
def _dispatch_fetch(self, *args, **kwargs): """ dispatch a `_fetch_impl` request, then cache and return the result """ try: return super()._dispatch_fetch(*args, **kwargs) except self.CacheFetchError as fetch_error: try: return self._dispatch_read(*args, __fetch_error=True, **kwargs) except self.CacheMissError: raise fetch_error def _dispatch_read(self, *args, **kwargs): """ Treat an invalid value as a cache miss """ value = super(ValidatingMixin, self)._dispatch_read(*args, **kwargs) if not self._dispatch_validate(value, *args, **kwargs): raise self.CacheInvalidError() return value def _dispatch_validate(self, value, *args, **kwargs): return self.validate_impl(value, *args, **kwargs) # pylint: disable=not-callable @property def validate_impl(self): """ allow for overriding `_validate_impl` after initialization """ return self._validate_impl @validate_impl.setter def validate_impl(self, validate_impl): self._validate_impl = types.MethodType(validate_impl, self) @validate_impl.deleter def validate_impl(self): del self._validate_impl def _validate_impl(self, value, *args, **kwargs): """ Subclasses should override this method to check `value`'s validity """ # pylint: disable=no-self-use # pylint: disable=unused-argument # pylint: disable=method-hidden return True
[docs] def __init__(self, *args, validate_impl=None, **kwargs): """ :param validate_impl: callable that determines whether a value is valid """ if validate_impl: self.validate_impl = validate_impl super(ValidatingMixin, self).__init__(*args, **kwargs)