Skip to content
GitHubDiscord

API Reference

Protocol for cache status handlers.

Implement this protocol to create custom handlers for cache status events. Handlers are registered with the cache status registry and called when corresponding cache operations occur.

def can_handle(status: CacheStatus) -> bool

Check if this handler can handle the status.

Parameters
ParameterTypeDescription
`status`CacheStatusThe cache status to check.
Returns
TypeDescription
boolTrue if this handler can handle the status.
async def record(
    metrics: CacheMetrics,
    count: int
) -> None

Record metrics for the status.

Parameters
ParameterTypeDescription
`metrics`CacheMetricsThe metrics object to update.
`count`intThe number of operations to record.

Distributed lock with automatic TTL renewal.

Prevents lock expiration during long operations by automatically extending the TTL in the background.

def __init__(
    redis: Any,
    key: str,
    lock_id: str,
    ttl: int,
    auto_renew: bool = True,
    renew_interval: int | None = None
) -> None

Initialize auto-renewing lock.

Parameters
ParameterTypeDescription
`redis`AnyRedis client
`key`strLock key
`lock_id`strUnique lock identifier (owner ID)
`ttl`intLock TTL in seconds
`auto_renew`boolEnable automatic renewal
`renew_interval`int | NoneRenewal interval (default: ttl / 3)
async def start_auto_renewal() -> None

Start background task to auto-renew lock.

async def stop_auto_renewal() -> None

Stop background renewal task.

async def release() -> None

Release lock and stop auto-renewal.


Supported cache backend types.

Unified (flattened-union) configuration for any single cache backend.

This class uses the flattened-union pattern: all backend-specific fields live in one dataclass and are partitioned by the type field. Fields that do not belong to the active backend type are silently ignored by the respective backend implementation.

Prefer this class when loading backend configuration from environment variables or a flat config file (e.g. TOML/YAML with no nested sections), where per-backend types cannot be selected at parse time.

For strongly-typed, up-front configuration prefer the dedicated classes: MemoryBackendConfig, RedisBackendConfig, or MemcachedBackendConfig.

+-------------------------+----------+----------+-----------+ | Field | memory | redis | memcached | +=========================+==========+==========+===========+ | name, type, default, | âś“ | âś“ | âś“ | | enabled, default_ttl, | | | | | key_prefix, | | | | | enable_metrics, | | | | | health_check_interval | | | | +-------------------------+----------+----------+-----------+ | max_size, | âś“ | | | | cleanup_interval | | | | +-------------------------+----------+----------+-----------+ | redis_host/port/db/ | | âś“ | | | password/url/ssl/ | | | | | pool_size | | | | +-------------------------+----------+----------+-----------+ | memcached_host/port/ | | | âś“ | | servers | | | | +-------------------------+----------+----------+-----------+

Attributes: name: Unique name for this backend. type: Backend type (memory, redis, memcached). default: Whether this is the default backend. enabled: Whether this backend is enabled. default_ttl: Default TTL in seconds. key_prefix: Key prefix for namespace isolation. enable_metrics: Enable metrics collection. health_check_interval: Health check interval in seconds.

def build_redis_url() -> CacheBackendConfig

Build Redis URL from components if not provided.

def build_memcached_servers() -> CacheBackendConfig

Build Memcached server list if not provided.


Top-level configuration for Lexigram Cache.

Attributes: name: Configuration name (default: “cache”) version: Config version enabled: Whether cache is enabled backends: List of cache backend configurations service: Cache service settings environment: Environment name debug: Debug mode

def validate_production_security() -> CacheConfig

Block insecure cache configurations in production.

def validate_default_backend(
    cls,
    v: list[CacheBackendConfig]
) -> list[CacheBackendConfig]

Ensure exactly one default backend is specified.

def validate_unique_names(
    cls,
    v: list[CacheBackendConfig]
) -> list[CacheBackendConfig]

Ensure backend names are unique.

def get_default_backend() -> CacheBackendConfig | None

Get the default backend configuration.

Returns
TypeDescription
CacheBackendConfig | NoneThe first backend marked as default, or ``None`` if none exists.
def get_backend(name: str) -> CacheBackendConfig | None

Get a backend configuration by name.

Parameters
ParameterTypeDescription
`name`strThe backend name to look up.
Returns
TypeDescription
CacheBackendConfig | NoneMatching CacheBackendConfig, or ``None`` if not found.
def get_provider_class(cls) -> type

Return the provider class for this config.

Returns
TypeDescription
typeThe CacheProvider class.

Cache backend connection established.

Payload fired when the cache backend establishes its connection.

Attributes: backend: Identifier of the backend that connected (e.g. "redis").


Advanced caching decorator with multiple strategies.

Provides a class-based decorator with more configuration options and caching strategies.

def __init__(
    service: CacheService,
    strategy: str = 'remember',
    key_prefix: str = '',
    ttl: int | None = None,
    backend: str | None = None,
    protect: bool = True,
    condition: Callable[[Any], bool] | None = None
)

Initialize the cache decorator.

Parameters
ParameterTypeDescription
`service`CacheServiceCacheService instance
`strategy`strCaching strategy ("remember", "cache", "conditional")
`key_prefix`strPrefix for cache keys
`ttl`int | NoneTime-to-live in seconds
`backend`str | NoneBackend name
`protect`boolEnable stampede protection
`condition`Callable[[Any], bool] | NoneCondition function for conditional caching

Payload fired when the cache backend disconnects or is shut down.

Attributes: backend: Identifier of the backend that disconnected.


Cache entry with metadata for stampede protection.

Attributes: value: Cached value. cached_at: When value was cached. expires_at: When value expires.

property is_expired() -> bool

Check if entry has expired.


Payload fired when a cache entry is evicted (TTL expiry or explicit removal).

Attributes: key: The cache key that was evicted. backend: Identifier of the backend that performed the eviction.


Cache entry was evicted.

property is_healthy() -> bool
def to_dict() -> dict[str, Any]

Cache lookup resulted in a hit.

Represents a cached item with metadata.
property is_expired() -> bool
property expires_at() -> datetime | None
def touch() -> None

Cache performance metrics with atomic updates.
async def record(
    status: CacheStatus,
    latency: float = 0.0,
    count: int = 1
) -> None

Atomic record helper.

async def record_hit(count: int = 1) -> None

Convenience method to record hits.

async def record_miss(count: int = 1) -> None

Convenience method to record misses.

async def record_set(count: int = 1) -> None

Convenience method to record sets.

async def record_delete(count: int = 1) -> None

Convenience method to record deletes.

async def record_error(count: int = 1) -> None

Convenience method to record errors.

property hit_rate() -> float
async def to_dict() -> dict[str, Any]

Cache lookup resulted in a miss.

Redis and in-memory cache backends with stampede protection.

Call configure to register a configured CacheProvider and expose CacheBackendProtocol for injection.

Usage

from lexigram.cache.config import CacheConfig
@module(
imports=[CacheModule.configure(CacheConfig(...))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: Any | None = None
) -> DynamicModule

Create a CacheModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneCacheConfig or ``None`` for framework defaults.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(cls) -> DynamicModule

Return an in-memory CacheModule for unit testing.

Uses an in-memory cache backend with no external Redis connection. SemanticCacheProtocol is not registered (not available without embedding client and vector index).

Returns
TypeDescription
DynamicModuleA DynamicModule backed by in-memory cache storage.

Internal configuration for cache operations. Shared across backends.

Attributes: default_ttl: Default time-to-live in seconds. max_memory: Maximum memory usage in bytes. key_prefix: Prefix for all cache keys. enable_metrics: Whether to enable metrics collection. serializer_type: Type of serializer to use (json, pickle, msgpack).

def make_key(key: str) -> str

Create a prefixed cache key.

Parameters
ParameterTypeDescription
`key`strThe raw cache key.
Returns
TypeDescription
strKey with the configured prefix applied, or the original key if no prefix is set.
def strip_prefix(prefixed_key: str) -> str

Remove prefix from a cache key.

Parameters
ParameterTypeDescription
`prefixed_key`strA key that may carry the configured prefix.
Returns
TypeDescription
strThe key with the prefix removed, or the original key unchanged.

Lexigram Cache provider for lexigram integration.

Integrates cache services with lexigram’s provider system, managing lifecycle, configuration, and service registration.

def __init__(config: CacheConfig | None = None) -> None

Initialize the cache provider.

Parameters
ParameterTypeDescription
`config`CacheConfig | NoneOptional cache configuration. When provided the provider is configured immediately (equivalent to calling configure).
def from_config(
    cls,
    config: CacheConfig,
    **context: Any
) -> CacheProvider

Create a CacheProvider from a CacheConfig.

The provider is created and immediately configured with the config dict.

def configure(config: dict[str, Any] | CacheConfig) -> None

Configure the provider with cache settings.

async def register(container: ContainerRegistrarProtocol) -> None

Register cache services with the container.

Only binds factories — no I/O. All real initialization happens in boot().

async def boot(container: ContainerResolverProtocol) -> None

Start the cache provider.

All real I/O initialization (backend connections, protection setup, service wiring) happens here, after the container is frozen.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container provided by the framework.
def observe_repository(
    cache_repository: Any,
    repository: Any
) -> None

Queue cache repository observation wiring for provider boot.

Parameters
ParameterTypeDescription
`cache_repository`AnyCache repository that exposes ``observe()``.
`repository`AnySource repository whose mutations should invalidate cache.
async def shutdown() -> None

Shutdown the cache provider and cleanup resources.

property cache() -> CacheService

Get the default cache service.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check provider health.

def get_service(backend_name: str | None = None) -> CacheService

Get a cache service by backend name.

Parameters
ParameterTypeDescription
`backend_name`str | NoneName of the backend, uses default if None
Returns
TypeDescription
CacheServiceCacheService instance
Raises
ExceptionDescription
ValueErrorIf backend not found
def get_backend(backend_name: str | None = None) -> Any

Get a backend instance by name.

Parameters
ParameterTypeDescription
`backend_name`str | NoneName of the backend, uses default if None
Returns
TypeDescription
AnyBackend instance
Raises
ExceptionDescription
ValueErrorIf backend not found
def get_default_service() -> CacheService

Get the default cache service.

Returns
TypeDescription
CacheServiceDefault CacheService instance
async def get_health_status() -> HealthCheckResult

Return comprehensive health status of all cache services and backends.

Returns
TypeDescription
HealthCheckResultStructured HealthCheckResult.
async def get_metrics() -> dict[str, Any]

Return comprehensive metrics from all registered cache services.

Returns
TypeDescription
dict[str, Any]Metrics dictionary with per-service statistics.

Result of a cache operation.
property is_hit() -> bool
property is_miss() -> bool
def hit(
    cls,
    value: T,
    key: str | None = None,
    latency: float | None = None,
    backend: str | None = None
) -> CacheResult[T]
def miss(
    cls,
    key: str | None = None,
    latency: float | None = None
) -> CacheResult[T]

High-level cache service with advanced features.

Provides a unified interface for caching operations across backend implementations (memory, Redis, Memcached). Stampede protection, metrics tracking, tag-based invalidation, and batch operations are composed in via mixin inheritance.

Attributes: provider: The cache backend provider. config: Configuration for cache operations.

def __init__(
    provider: CacheProvider | None = None,
    config: CacheOperationConfig | None = None,
    protection: StampedeProtectedCache | None = None,
    backend: CacheBackendProtocol | None = None,
    ctx: Context | None = None,
    max_tags: int = 10000
)

Initialize the cache service.

Parameters
ParameterTypeDescription
`provider`CacheProvider | NoneCache provider for backend management (optional)
`config`CacheOperationConfig | NoneService-level configuration
`protection`StampedeProtectedCache | NoneCache stampede protection instance
`backend`CacheBackendProtocol | NoneDirect backend instance (optional, for standalone usage)
`ctx`Context | NoneOptional context for request-scoped cache keys
`max_tags`intMaximum number of tags tracked in the in-memory index. When the cap is reached, the oldest tag (by insertion/access order) is evicted. Defaults to 10 000.
async def close() -> None

Close the cache service, releasing any background resources.

async def get(
    key: str,
    backend: str | None = None,
    default: Any = None,
    request_scoped: bool = False
) -> Any

Get a value from cache with error handling.

Parameters
ParameterTypeDescription
`key`strCache key
`backend`str | NoneBackend name (uses default if None)
`default`AnyDefault value if key not found
Returns
TypeDescription
AnyCached value or default
async def set(
    key: str,
    value: Any,
    ttl: int | None = None,
    backend: str | None = None
) -> bool

Set a value in cache with error handling.

Parameters
ParameterTypeDescription
`key`strCache key
`value`AnyValue to cache
`ttl`int | NoneTime-to-live in seconds
`backend`str | NoneBackend name (uses default if None)
Returns
TypeDescription
boolTrue if successful, False otherwise
async def delete(
    key: str,
    backend: str | None = None
) -> bool

Delete a value from cache.

Parameters
ParameterTypeDescription
`key`strCache key
`backend`str | NoneBackend name (uses default if None)
Returns
TypeDescription
boolTrue if successful, False otherwise
async def delete_pattern(
    pattern: str,
    backend: str | None = None
) -> int

Delete all keys matching a glob-style pattern.

Delegates to the backend’s delete_pattern implementation. Redis uses SCAN + DEL; Memory iterates its in-process store. The namespace prefix (if any) is applied by the backend itself.

Parameters
ParameterTypeDescription
`pattern`strGlob pattern, e.g. ``"pet:list:*"``.
`backend`str | NoneNamed backend to use (uses default if ``None``).
Returns
TypeDescription
intNumber of keys deleted, or 0 on error.
async def exists(
    key: str,
    backend: str | None = None
) -> bool

Check if a key exists in cache.

Parameters
ParameterTypeDescription
`key`strCache key
`backend`str | NoneBackend name (uses default if None)
Returns
TypeDescription
boolTrue if key exists, False otherwise
async def clear(backend: str | None = None) -> bool

Clear all values from cache.

Parameters
ParameterTypeDescription
`backend`str | NoneBackend name (uses default if None)
Returns
TypeDescription
boolTrue if successful, False otherwise
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Get comprehensive health status of the cache service.

Returns
TypeDescription
HealthCheckResultStructured health check result.
def get_metrics() -> dict[str, int]

Get service performance metrics.

Returns
TypeDescription
dict[str, int]Dictionary of metrics
def reset_metrics() -> None

Reset performance metrics.

async def get_typed(
    key: str,
    type_: type[_T],
    backend: str | None = None,
    default: _T | None = None
) -> _T | None

Get a value from cache and cast it to the requested type.

Unlike get, which returns Any, this method provides compile-time type safety. The stored value is returned as-is if it is already an instance of type_; otherwise the raw value is passed to type_(**raw) (for dict-like values) or type_(raw) as a fallback.

Parameters
ParameterTypeDescription
`key`strCache key.
`type_`type[_T]The expected Python type of the cached value.
`backend`str | NoneBackend name (uses default if ``None``).
`default`_T | NoneValue returned when the key is absent or deserialisation fails.
Returns
TypeDescription
_T | NoneThe cached value cast to *type_*, or *default*.

Configuration for the cache service layer.

Controls service-level behavior and features like stampede protection, metrics, and circuit breaker.

Attributes: enable_protection: Enable cache stampede protection. enable_metrics: Enable service-level metrics. enable_health_checks: Enable health checks. protection_lock_ttl: Lock TTL for stampede protection. protection_max_wait: Max wait time for locks. protection_retry_interval: Retry interval for locks. default_backend: Name of the default backend. circuit_breaker_enabled: Enable circuit breaker pattern. circuit_breaker_threshold: Failure threshold for circuit breaker. default_serializer: Default serializer type.


def to_dict() -> dict[str, Any]

Status values for cache operations.

AsyncStringSerializerProtocol with automatic compression for large values.

Compresses values above size threshold to save memory.

def __init__(
    compression_threshold: int = const.DEFAULT_COMPRESSION_THRESHOLD,
    compression_level: int = const.DEFAULT_COMPRESSION_LEVEL
) -> None

Initialize serializer.

Parameters
ParameterTypeDescription
`compression_threshold`intCompress values larger than this (bytes)
`compression_level`intCompression level (1-9, higher = more compression)
async def serialize(value: Any) -> str

Serialize and optionally compress value.

Parameters
ParameterTypeDescription
`value`AnyValue to serialize
Returns
TypeDescription
strSerialized string (possibly compressed)
async def deserialize(value: str) -> Any

Deserialize and decompress if needed.

Parameters
ParameterTypeDescription
`value`strSerialized string
Returns
TypeDescription
AnyOriginal value
Raises
ExceptionDescription
SerializationErrorIf deserialization fails
def get_stats() -> dict[str, Any]

Get serializer statistics.

Returns
TypeDescription
dict[str, Any]Stats dict

Cost-aware cache hit/miss decision function.

Implements a simple heuristic that balances accuracy (similarity score) against the economic cost of invoking the API. When API costs are high, lower similarity thresholds become acceptable to save money. When costs are low, higher similarity thresholds are enforced.

The decision is based on:

  • similarity: Cosine similarity [0, 1]. Closeness to 1 = high accuracy.
  • api_cost_per_1k_tokens: USD cost per 1000 tokens.
  • expected_tokens: Expected token count for a fresh API call.
def __init__(accuracy_weight: float = 0.7) -> None

Initialize the cost-aware decision function.

Parameters
ParameterTypeDescription
`accuracy_weight`floatWeight (0 to 1) assigned to accuracy mismatch penalty. Higher values prioritize accuracy over cost. Defaults to 0.7.
def should_use_cache(
    similarity: float,
    api_cost_per_1k_tokens: float,
    expected_tokens: int
) -> bool

Decide whether to use a cached response.

Decision logic:

  • Compute mismatch_penalty = (1 - similarity) * accuracy_weight
  • Compute cost_incentive = min(estimated_cost * (1 - accuracy_weight), 1.0)
  • Return mismatch_penalty < cost_incentive

When accuracy_weight=0.7 and similarity=0.95:

  • mismatch_penalty = 0.05 * 0.7 = 0.035
  • If estimated_cost >= 0.035/0.3 = 0.117 USD, cache is used.
  • For gpt-4 (~$0.03 per 1k tokens), expected_tokens=100 → ~$0.003, cache is skipped (cost too low to accept small mismatch).
  • For gpt-4 (~$0.03 per 1k tokens), expected_tokens=5000 → ~$0.15, cache is used (cost high enough to accept 5% mismatch).
Parameters
ParameterTypeDescription
`similarity`floatCosine similarity in [0, 1].
`api_cost_per_1k_tokens`floatEstimated USD cost per 1000 tokens.
`expected_tokens`intExpected token count for fresh call.
Returns
TypeDescription
boolTrue if the cached response should be used, False if a fresh API call should be made.

Lock ownership information for distributed locks.

Loader for cache configuration from various sources.
def from_env(prefix: str = const.ENV_PREFIX) -> CacheConfig

Load CacheConfig from environment variables.

Parameters
ParameterTypeDescription
`prefix`strEnvironment variable prefix.
Returns
TypeDescription
CacheConfigPopulated CacheConfig instance.
def from_dict(config_dict: dict[str, Any]) -> CacheConfig

Load CacheConfig from a dictionary.

Parameters
ParameterTypeDescription
`config_dict`dict[str, Any]Configuration dictionary.
Returns
TypeDescription
CacheConfigPopulated CacheConfig instance.
def from_yaml(file_path: str) -> CacheConfig

Load CacheConfig from a YAML file.

Parameters
ParameterTypeDescription
`file_path`strPath to the YAML configuration file.
Returns
TypeDescription
CacheConfigPopulated CacheConfig instance.
def from_json(file_path: str) -> CacheConfig

Load CacheConfig from a JSON file.

Parameters
ParameterTypeDescription
`file_path`strPath to the JSON configuration file.
Returns
TypeDescription
CacheConfigPopulated CacheConfig instance.

FAISS-backed in-memory vector index for semantic cache.

Uses IndexFlatIP (inner product on L2-normalized vectors = cosine similarity). Suitable for single-server deployments with up to ~1M entries. Implements VectorIndexProtocol.

All vectors must be L2-normalized (unit length) before storage.

def __init__(
    embedding_dim: int = 384,
    max_entries: int = 100000
) -> None

Initialize the FAISS vector index.

Parameters
ParameterTypeDescription
`embedding_dim`intDimension of embedding vectors. Defaults to 384 (common for sentence-transformers like all-MiniLM-L6-v2).
`max_entries`intMaximum number of entries before warning. Defaults to 100,000.
Raises
ExceptionDescription
ImportErrorIf faiss package is not installed.
ValueErrorIf embedding_dim <= 0 or max_entries <= 0.
async def search(
    embedding: list[float],
    k: int = 1
) -> list[tuple[str, float]]

Search for the top-k most similar embeddings.

Parameters
ParameterTypeDescription
`embedding`list[float]Query embedding vector (must be L2-normalized).
`k`intNumber of results to return.
Returns
TypeDescription
list[tuple[str, float]]List of (cache_key, similarity_score) tuples, sorted by similarity_score descending. Empty list if index is empty.
async def add(
    key: str,
    embedding: list[float]
) -> None

Add an embedding to the index.

If the key already exists, it is not duplicated (early return). If the index reaches max_entries, a warning is logged but the entry is still added (LRU eviction can be a future enhancement).

Parameters
ParameterTypeDescription
`key`strCache key identifier for the embedding.
`embedding`list[float]Embedding vector (must be L2-normalized).
async def remove(key: str) -> bool

Remove an entry from the index by cache key.

FAISS IndexFlatIP does not support deletion. We use a soft-delete approach by marking the slot with None. Removed entries are skipped in search results.

Parameters
ParameterTypeDescription
`key`strCache key identifier to remove.
Returns
TypeDescription
boolTrue if the key was found and removed, False if not found.
property size() -> int

Number of entries currently indexed (excluding deleted entries).

Returns
TypeDescription
intNon-negative integer.

Unified health status.

High-performance JSON-based serializer with enhanced type support.

Uses orjson (5-10x faster than stdlib json) for serialization with automatic fallback to stdlib json if orjson is unavailable.

Supports serialization of:

  • Basic Python types (str, int, float, bool, None)
  • Collections (list, dict, tuple, set)
  • datetime and date objects (via orjson native support)
  • UUID objects (via orjson native support)
  • Custom objects with dict attribute

Performance:

  • With orjson: ~5-10x faster than stdlib json
  • Native support for datetime, UUID, dataclasses
  • Optimized for Redis/Memcached serialization workloads

This class implements the AsyncStringSerializerProtocol protocol.

async def serialize(value: Any) -> str

Serialize a Python object to JSON string using orjson (5-10x faster).

async def deserialize(value: str) -> Any

Deserialize a JSON string back to a Python object.


Configuration for Memcached cache backend.

Attributes: name: Backend name. default: Whether this is the default backend. enabled: Whether this backend is enabled. host: Memcached host. port: Memcached port. servers: List of server addresses (host:port). default_ttl: Default TTL in seconds. key_prefix: Key prefix for namespace isolation. connect_timeout: Connection timeout in seconds. timeout: Operation timeout in seconds. max_pool_size: Maximum connection pool size.

property backend_type() -> BackendType

Get the backend type.

property server_list() -> list[str]

Get the server list.


Memcached cache backend implementation.

This backend provides Memcached-based distributed caching with TTL support. Unlike memory and Redis backends, this is implemented directly since the core infrastructure layer doesn’t provide a Memcached driver.

def __init__(
    servers: list[str],
    config: CacheOperationConfig | None = None
)

Initialize the Memcached cache backend.

Parameters
ParameterTypeDescription
`servers`list[str]List of Memcached server addresses (e.g., ["localhost:11211"]).
`config`CacheOperationConfig | NoneCache configuration. If None, uses default configuration.
Raises
ExceptionDescription
ImportErrorIf pymemcache is not installed.
async def get(key: str) -> Any | None

Get a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to retrieve.
Returns
TypeDescription
Any | NoneThe cached value if found and not expired, None otherwise.
async def set(
    key: str,
    value: Any,
    ttl: int | None = None
) -> bool

Set a value in the cache with optional TTL.

Parameters
ParameterTypeDescription
`key`strThe cache key to set.
`value`AnyThe value to cache.
`ttl`int | NoneTime to live in seconds. If None, uses default TTL.
Returns
TypeDescription
boolTrue if successful, False otherwise.
async def delete(key: str) -> bool

Delete a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to delete.
Returns
TypeDescription
boolTrue if the key was deleted, False if it didn't exist or error occurred.
async def exists(key: str) -> bool

Check if a key exists in the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to check.
Returns
TypeDescription
boolTrue if the key exists and is not expired, False otherwise.
async def clear() -> bool

Clear all values from the cache.

Note: This operation is not supported by Memcached protocol. Memcached doesn’t provide a way to clear all keys.

Returns
TypeDescription
boolFalse (operation not supported).
async def get_many(keys: list[str]) -> dict[str, Any]

Get multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to retrieve.
Returns
TypeDescription
dict[str, Any]Dictionary mapping keys to their values (missing keys omitted).
async def set_many(
    items: dict[str, Any],
    ttl: int | None = None
) -> bool

Set multiple values in the cache.

Parameters
ParameterTypeDescription
`items`dict[str, Any]Dictionary of key-value pairs to cache.
`ttl`int | NoneTime to live in seconds for all items.
Returns
TypeDescription
boolTrue if all items were set successfully, False otherwise.
async def delete_many(keys: list[str]) -> bool

Delete multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to delete.
Returns
TypeDescription
boolTrue if all keys were deleted successfully, False otherwise.
async def delete_pattern(pattern: str) -> int

Delete keys matching a pattern.

Memcached does not natively support key enumeration or pattern matching. This method logs a warning and returns 0. If pattern-based invalidation is required, use Redis or the in-memory backend instead.

Parameters
ParameterTypeDescription
`pattern`strGlob pattern (not supported by Memcached).
Returns
TypeDescription
intAlways 0 — Memcached does not support key enumeration.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a health check on the cache backend.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Configuration for in-memory cache backend.

Attributes: name: Backend name. default: Whether this is the default backend. enabled: Whether this backend is enabled. default_ttl: Default TTL in seconds. max_size: Maximum number of entries. cleanup_interval: Cleanup interval in seconds. key_prefix: Key prefix for namespace isolation.

property backend_type() -> BackendType

Get the backend type.


Memory cache backend using lexigram-components MemoryStateStore.

This backend provides in-memory caching with TTL support by wrapping the standardized MemoryStateStore from lexigram-components.

def __init__(
    config: CacheOperationConfig | None = None,
    max_size: int | None = None,
    hooks: HookRegistryProtocol | None = None
) -> None

Initialize the memory cache backend.

Parameters
ParameterTypeDescription
`config`CacheOperationConfig | NoneCache configuration. If None, uses default configuration.
`max_size`int | NoneMaximum number of cache entries. When the store is full the least-recently-used entry is evicted. ``None`` disables eviction (use only for tests / development).
`hooks`HookRegistryProtocol | NoneOptional hook registry for lifecycle emission.
async def get(key: str) -> Result[Any | None, CacheError]

Get a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to retrieve.
Returns
TypeDescription
Result[Any | None, CacheError]Ok(value) if found, Ok(None) if not found, Err(CacheError) on failure.
async def set(
    key: str,
    value: Any,
    ttl: int | None = None
) -> Result[None, CacheError]

Set a value in the cache with optional TTL.

Parameters
ParameterTypeDescription
`key`strThe cache key to set.
`value`AnyThe value to cache.
`ttl`int | NoneTime to live in seconds. If None, uses default TTL.
Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def delete(key: str) -> Result[bool, CacheError]

Delete a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to delete.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if deleted, Ok(False) if not found, Err(CacheError) on failure.
async def exists(key: str) -> Result[bool, CacheError]

Check if a key exists in the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to check.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if exists, Ok(False) otherwise, Err(CacheError) on failure.
async def clear() -> Result[None, CacheError]

Clear all values from the cache.

Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def get_many(keys: list[str]) -> Result[dict[str, Any], CacheError]

Get multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to retrieve.
Returns
TypeDescription
Result[dict[str, Any], CacheError]Ok(dict) mapping found keys to values, Err(CacheError) on failure.
async def set_many(
    items: dict[str, Any],
    ttl: int | None = None
) -> Result[None, CacheError]

Set multiple values in the cache.

Parameters
ParameterTypeDescription
`items`dict[str, Any]Dictionary of key-value pairs to cache.
`ttl`int | NoneTime to live in seconds for all items.
Returns
TypeDescription
Result[None, CacheError]Ok(None) if all items set successfully, Err(CacheError) on failure.
async def delete_many(keys: list[str]) -> Result[int, CacheError]

Delete multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to delete.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def delete_pattern(pattern: str) -> Result[int, CacheError]

Delete all keys matching a glob-style pattern.

Iterates over the in-memory store’s known keys (including the configured prefix) and deletes those that match pattern.

Parameters
ParameterTypeDescription
`pattern`strGlob pattern, e.g. ``"pet:list:*"``.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a health check on the cache backend.

Returns
TypeDescription
HealthCheckResultStructured health check result.

HMAC-SHA256-signed pickle serializer for Python object caching.

Every value is signed before storage and the signature is verified before deserialization. An invalid or missing signature raises CacheSerializationError — the payload is never passed to pickle.loads.

Supports key rotation: pass a list of keys so that data signed with an older key can still be verified during a rotation window. The first key in the list is always used for signing; all keys are tried for verification. Use rotate_key to prepend a new active key.

Advantages: - Preserves all Python object types and relationships. - HMAC authentication prevents RCE via cache poisoning.

Disadvantages: - Python-specific (not compatible with other languages). - Breaking format change from unsigned pickle storage. - Version compatibility issues across Python versions.

Only use in Python-only environments where the HMAC key is kept secret.

Parameters
ParameterTypeDescription
`hmac_key`Secret key(s) used for HMAC-SHA256 signing, minimum 32 bytes each. Pass a ``str`` or ``bytes`` value for a single key, or a ``list[bytes]`` for multi-key rotation support. The first key in the list is the current signing key. Generate one with:: import secrets; secrets.token_bytes(32)
`protocol`Pickle protocol version (default: highest available).
Raises
ExceptionDescription
ValueErrorIf any key is shorter than 32 bytes, or if the key list is empty.
def __init__(
    hmac_key: bytes | str | list[bytes],
    protocol: int = pickle.HIGHEST_PROTOCOL
) -> None
def rotate_key(new_key: bytes) -> None

Prepend new_key as the active signing key.

After rotation the serializer signs new entries with new_key while still accepting entries signed with any previously registered key. Remove old keys from the list once all cache entries have expired or been re-signed.

Parameters
ParameterTypeDescription
`new_key`bytesNew signing key, minimum 32 bytes.
Raises
ExceptionDescription
ValueErrorIf *new_key* is shorter than 32 bytes.
async def serialize(value: Any) -> str

Pickle value, sign it, and return a <hmac>:<hex> string.

async def deserialize(value: str) -> Any

Verify the HMAC signature and unpickle the payload.


Configuration for Redis cache backend.

Attributes: name: Backend name. default: Whether this is the default backend. enabled: Whether this backend is enabled. host: Redis host. port: Redis port. db: Redis database number. password: Redis password (optional). url: Full Redis URL (overrides host/port/db). default_ttl: Default TTL in seconds. key_prefix: Key prefix for namespace isolation. ssl: Whether to use SSL/TLS. connection_pool_size: Connection pool size. socket_timeout: Socket timeout in seconds. retry_on_timeout: Whether to retry on timeout.

property backend_type() -> BackendType

Get the backend type.

property redis_url() -> str

Get the full Redis URL.


Redis cache backend using StateStoreProtocol protocol.

This backend provides Redis-based distributed caching with TTL support by wrapping the standardized StateStoreProtocol protocol.

def __init__(
    store: StateStoreProtocol,
    config: CacheOperationConfig | None = None,
    hooks: HookRegistryProtocol | None = None
)

Initialize the Redis cache backend.

Parameters
ParameterTypeDescription
`store`StateStoreProtocolStateStoreProtocol implementation for key-value operations.
`config`CacheOperationConfig | NoneCache configuration. If None, uses default configuration.
`hooks`HookRegistryProtocol | NoneOptional hook registry for lifecycle emission.
async def get(key: str) -> Result[Any | None, CacheError]

Get a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to retrieve.
Returns
TypeDescription
Result[Any | None, CacheError]Ok(value) if found, Ok(None) if not found, Err(CacheError) on failure.
async def set(
    key: str,
    value: Any,
    ttl: int | None = None
) -> Result[None, CacheError]

Set a value in the cache with optional TTL.

Parameters
ParameterTypeDescription
`key`strThe cache key to set.
`value`AnyThe value to cache.
`ttl`int | NoneTime to live in seconds. If None, uses default TTL.
Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def delete(key: str) -> Result[bool, CacheError]

Delete a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to delete.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if deleted, Ok(False) if not found, Err(CacheError) on failure.
async def exists(key: str) -> Result[bool, CacheError]

Check if a key exists in the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to check.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if exists, Ok(False) otherwise, Err(CacheError) on failure.
async def clear() -> Result[None, CacheError]

Clear all values from the cache.

Uses pattern-based deletion to clear only prefixed keys. This is safer than FLUSHDB as it only clears cache keys.

Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def get_many(keys: list[str]) -> Result[dict[str, Any], CacheError]

Get multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to retrieve.
Returns
TypeDescription
Result[dict[str, Any], CacheError]Ok(dict) mapping found keys to values, Err(CacheError) on failure.
async def set_many(
    items: dict[str, Any],
    ttl: int | None = None
) -> Result[None, CacheError]

Set multiple values in the cache using pipeline.

Parameters
ParameterTypeDescription
`items`dict[str, Any]Dictionary of key-value pairs to cache.
`ttl`int | NoneTime to live in seconds for all items.
Returns
TypeDescription
Result[None, CacheError]Ok(None) if all items set successfully, Err(CacheError) on failure.
async def delete_many(keys: list[str]) -> Result[int, CacheError]

Delete multiple values from the cache using pipeline.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to delete.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def delete_pattern(pattern: str) -> Result[int, CacheError]

Delete all Redis keys matching a glob-style pattern.

Uses SCAN (non-blocking) to find matching keys then DEL or pipeline to remove them. The configured key prefix is prepended to pattern automatically.

Parameters
ParameterTypeDescription
`pattern`strGlob pattern, e.g. ``"pet:list:*"``.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a health check on the cache backend.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Shared Redis connection logic
def __init__(
    url: str = '',
    prefix: str = '',
    client: Any | None = None
)

Redis implementation of LockStore with owner validation.

Locks are stored as the value of the key and contain a lock_id of the form <owner>:<uuid>. Release and extend operations are executed via Lua scripts that compare-and-delete or compare-and-expire atomically.

def __init__(
    url: str = '',
    prefix: str = '',
    client: Any | None = None
)
async def acquire(
    resource: str,
    ttl: int = 30,
    owner: str = ...
) -> str | None

Acquire a lock.

Parameters
ParameterTypeDescription
`resource`strThe resource to lock.
`ttl`intLock TTL in seconds.
`owner`strOwner identifier (required).
Returns
TypeDescription
str | Nonelock_id (str) on success, None on failure.
async def release(
    resource: str,
    lock_id: str
) -> None

Release a lock only if lock_id matches the current owner.

Raises
ExceptionDescription
LockNotHeldErrorif the lock does not exist
LockOwnershipErrorif the lock is held by someone else
async def extend(
    resource: str,
    lock_id: str,
    ttl: int
) -> None

Extend TTL of a lock only if lock_id matches the current owner.

Raises
ExceptionDescription
LockNotHeldErrorif the lock does not exist
LockOwnershipErrorif the lock is held by someone else
async def is_locked(key: str) -> bool

Check if a key is locked

async def locked(
    resource: str,
    ttl: int = 30,
    owner: str | None = None
) -> Any

Async context manager that acquires a lock and releases it on exit.

Yields the lock_id string.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check the health of the lock store


Redis implementation of SecretStore with optional at-rest encryption.
Parameters
ParameterTypeDescription
`url`Redis connection URL (e.g. ``redis://localhost:6379/0``).
`prefix`Optional string prefix applied to every key stored in Redis.
`encryption_key`Optional Fernet symmetric encryption key (32-byte URL-safe base64-encoded bytes or string). When provided all secret values are encrypted before writing and decrypted on read. Generate one with:: import secrets, base64 key = base64.urlsafe_b64encode(secrets.token_bytes(32))
def __init__(
    url: str = '',
    prefix: str = '',
    encryption_key: bytes | str | None = None,
    client: Any | None = None
)
async def get_secret(key: str) -> str | None

Get a secret by name, decrypting if encryption is configured.

async def set_secret(
    key: str,
    value: str
) -> None

Set a secret, encrypting if encryption is configured.

async def delete_secret(key: str) -> None

Delete a secret

async def list_secrets(prefix: str | None = None) -> list[str]

List secrets with optional prefix

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check the health of the secret store


Redis implementation of StateStoreProtocol
async def get(key: str) -> Any | None

Get a value by key

async def set(
    key: str,
    value: Any,
    ttl: int | None = None
) -> None

Set a value with optional TTL

async def delete(key: str) -> None

Delete a value by key

async def get_bulk(keys: list[str]) -> dict[str, Any]

Get multiple values by keys

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check the health of the state store


Three-tier semantic cache: exact hash → vector similarity → miss.

Implements SemanticCacheProtocol. Caches LLM responses using a three-tier lookup strategy:

Tier 1 (Exact Hash): Normalizes and hashes the query to look for exact matches in the cache backend. Fast, deterministic.

Tier 2 (Vector Similarity): If Tier 1 misses, embeds the query and searches the vector index for similar cached queries. Returns the cached response if similarity >= similarity_threshold.

Tier 3 (Cache Miss): If both tiers miss, returns None for the caller to invoke the LLM.

The store() method populates both Tier 1 and Tier 2. The invalidate() method removes from both.

def __init__(
    cache_backend: CacheBackendProtocol,
    embedding_client: EmbeddingClientProtocol,
    vector_index: VectorIndexProtocol,
    similarity_threshold: float = 0.95,
    cache_ttl: int | None = None
) -> None

Initialize the semantic cache store.

Parameters
ParameterTypeDescription
`cache_backend`CacheBackendProtocolBackend for exact hash storage (e.g., Redis, in-memory cache). Implements CacheBackendProtocol.
`embedding_client`EmbeddingClientProtocolLLM embedding service. Implements EmbeddingClientProtocol.
`vector_index`VectorIndexProtocolVector similarity search index. Implements VectorIndexProtocol.
`similarity_threshold`floatMinimum cosine similarity to accept a Tier 2 cache hit. Must be in [0, 1]. Defaults to 0.95.
`cache_ttl`int | NoneOptional TTL in seconds for cache entries. Passed to cache_backend.set(). Defaults to None (no expiry).
Raises
ExceptionDescription
ValueErrorIf similarity_threshold is not in [0, 1].
async def lookup(query: str) -> str | None

Look up a query in the cache.

Checks Tier 1 (exact hash) then Tier 2 (vector similarity).

Parameters
ParameterTypeDescription
`query`strThe user query string.
Returns
TypeDescription
str | NoneCached response string, or None on cache miss.
async def store(
    query: str,
    response: str,
    model: str
) -> None

Store a query-response pair in both tiers.

Stores in Tier 1 (hash key) and Tier 2 (vector index).

Parameters
ParameterTypeDescription
`query`strThe user query string.
`response`strThe LLM response to cache.
`model`strThe model that produced the response (for logging/tracking).
async def invalidate(query: str) -> bool

Invalidate a cached entry by query.

Removes from both Tier 1 (cache backend) and Tier 2 (vector index).

Parameters
ParameterTypeDescription
`query`strThe user query string to invalidate.
Returns
TypeDescription
boolTrue if the entry was found and removed, False otherwise.


def cache(
    service: CacheService,
    key_prefix: str = 'cache',
    ttl: int | None = None,
    backend: str | None = None,
    protect: bool = True
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to cache function results with custom key generation.

Parameters
ParameterTypeDescription
`service`CacheServiceCacheService instance
`key_prefix`strPrefix for cache keys
`ttl`int | NoneTime-to-live in seconds
`backend`str | NoneBackend name
`protect`boolEnable stampede protection
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@cache(cache_service, key_prefix=“api”, ttl=300) async def get_user_data(user_id: int): return await fetch_from_database(user_id)


def cache_in_request(func: _F) -> _F

Cache the result of an async function for the duration of the current request.

The cache key is derived from the function’s qualified name and all positional and keyword arguments. Arguments must support a stable repr() so that logically equal calls produce the same key.

  • Same request, same args → cached value returned; wrapped coroutine is not awaited again.
  • Same request, different args → each distinct arg combination gets its own cache entry.
  • Different requests → each request has an isolated contextvars.ContextVar context; they never share entries.
Parameters
ParameterTypeDescription
`func`_FThe async callable to wrap. Must be a coroutine function.
Returns
TypeDescription
_FA coroutine function with identical signature that transparently caches results within the current request context.

Example

@cache_in_request
async def get_user_permissions(user_id: str) -> list[str]:
return await db.fetch_permissions(user_id)

def cacheable(
    ttl: int | None = 300,
    key_prefix: str = '',
    skip_on_error: bool = True
) -> Callable[[F], F]

Cache the return value of an async method using the injected CacheBackend.

The decorator resolves the cache backend from self._cache on the target instance. The cache key is derived from the method name, prefix, and a SHA-256 hash of the serialized arguments.

Supports methods that return Result[T, E]:

  • Ok(value) results are cached; the inner value is unwrapped, serialised, and re-wrapped in Ok on retrieval.
  • Err(...) results are never cached — errors propagate on every call.

Supports domain model return values (or Result-wrapped domain models):

  • Domain models are serialised to a type-tagged JSON envelope and reconstructed via model_validate on retrieval, preserving type identity across the JSON round-trip.
Parameters
ParameterTypeDescription
`ttl`int | NoneTime-to-live in seconds. ``None`` disables expiry.
`key_prefix`strOptional prefix prepended to the generated cache key.
`skip_on_error`boolWhen ``True``, cache misses on backend errors gracefully fall through to the original method. When ``False``, errors propagate.
Returns
TypeDescription
Callable[[F], F]Decorator that wraps an async method with cache-aside logic.

Example

class UserService:
def __init__(self, cache: CacheBackendProtocol) -> None:
self._cache = cache
@cacheable(ttl=60, key_prefix="users")
async def get_user(self, user_id: str) -> Result[User, DomainError]:
return await self._repo.find(user_id)

def clear_request_cache() -> None

Reset the current request’s cache to an empty state.

Call this at the end of a request (e.g. in ASGI middleware teardown) if you need an explicit eviction guarantee rather than relying on the context being discarded by the ASGI server.


def conditional_cache(
    service: CacheService,
    condition_func: Callable[[Any], bool],
    key_prefix: str = 'cond',
    ttl: int | None = None,
    backend: str | None = None,
    protect: bool = True
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to cache function results only when condition is met.

Parameters
ParameterTypeDescription
`service`CacheServiceCacheService instance
`condition_func`Callable[[Any], bool]Function that returns True if result should be cached
`key_prefix`strPrefix for cache keys
`ttl`int | NoneTime-to-live in seconds
`backend`str | NoneBackend name
`protect`boolEnable stampede protection
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@conditional_cache( cache_service, lambda result: result is not None, ttl=300 ) async def search_database(query: str): return await db.search(query)


def get_request_cache() -> dict[str, Any]

Return the current request’s cache dict, creating it if needed.

The first call within a new request context allocates a fresh dict and stores it via ContextVar.set so that all subsequent calls in the same context share it.

Returns
TypeDescription
dict[str, Any]The mutable cache dict for this request context.

def invalidate_cache(
    service: CacheService,
    key_pattern: str,
    backend: str | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to invalidate cache keys after function execution.

Parameters
ParameterTypeDescription
`service`CacheServiceCacheService instance
`key_pattern`strPattern for keys to invalidate (supports wildcards)
`backend`str | NoneBackend name
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@invalidate_cache(cache_service, “user:*”) async def update_user(user_id: int, data: dict): return await db.update_user(user_id, data)


def remember(
    service: CacheService,
    ttl: int | None = None,
    backend: str | None = None,
    protect: bool = True
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to remember function results (memoization).

Uses automatic key generation based on function signature.

Parameters
ParameterTypeDescription
`service`CacheServiceCacheService instance
`ttl`int | NoneTime-to-live in seconds
`backend`str | NoneBackend name
`protect`boolEnable stampede protection
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@remember(cache_service, ttl=600) async def expensive_computation(x: int, y: int): return await slow_calculation(x, y)


Raised when the cache backend (Redis/Memcached) fails.

Raised when the cache is at capacity.

Raised when cache configuration is invalid.
def __init__(
    message: str = const.ERROR_MSG_CACHE_CONFIGURATION,
    setting: str | None = None,
    value: Any | None = None,
    **kwargs
)

Raised when the cache connection fails.

Base exception for cache errors.

Raised when cache invalidation fails.
def __init__(
    message: str = const.ERROR_MSG_CACHE_INVALIDATION,
    keys: list[str] | None = None,
    tag: str | None = None,
    pattern: str | None = None,
    **kwargs
)

Raised when a cache key is invalid or for key-specific errors.

Raised when serialization/deserialization fails.

Raised when cache stampede protection fails.
def __init__(
    message: str = const.ERROR_MSG_CACHE_STAMPEDE,
    lock_holder: str | None = None,
    **kwargs
)

Raised when a cache operation times out.
def __init__(
    message: str = const.ERROR_MSG_CACHE_TIMEOUT,
    key: str | None = None,
    backend: str | None = None,
    timeout_seconds: float | None = None,
    **kwargs
)