API Reference
Protocols
Section titled “Protocols”BulkheadProtocol
Section titled “BulkheadProtocol”Protocol for bulkhead implementations.
CircuitBreakerProtocol
Section titled “CircuitBreakerProtocol”Protocol for circuit breaker implementations.
Get current circuit state (closed, open, half_open).
Execute function with circuit breaker protection.
Return an async context manager that protects a code block.
Raises CircuitOpenError when the circuit is open, and records success or failure based on the outcome of the protected block.
Reset circuit to closed state.
Force circuit to open state.
CircuitBreakerRegistryProtocol
Section titled “CircuitBreakerRegistryProtocol”Protocol for circuit breaker registries.
def get(name: str) -> CircuitBreakerProtocol | None
Get circuit breaker by name.
async def get_or_create( name: str, config: CircuitBreakerConfig | None = None ) -> CircuitBreakerProtocol
Get or create circuit breaker by name.
List all circuit breakers.
IdempotencyStoreProtocol
Section titled “IdempotencyStoreProtocol”Protocol for storing and checking idempotency keys.
async def get(key: str) -> Result[Any | None, IdempotencyError]
Retrieve a stored result by idempotency key.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| Type | Description |
|---|---|
| Result[Any | None, IdempotencyError] | ``Ok(value)`` when a cached result exists. ``Ok(None)`` when the key is not present or expired. ``Err(IdempotencyError)`` on store failures. |
async def get_record(key: str) -> Result[IdempotencyRecord | None, IdempotencyError]
Retrieve the full idempotency record, including metadata.
async def set( key: str, value: Any, ttl: float | None = None ) -> Result[None, IdempotencyError]
Store a result with an optional time-to-live.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| `value` | Any | The result to store. |
| `ttl` | float | None | Time-to-live in seconds, or ``None`` for no expiry. |
async def delete(key: str) -> Result[None, IdempotencyError]
Remove an idempotency record by key.
async def acquire( key: str, ttl: int ) -> Result[bool, IdempotencyError]
Atomically claim an idempotency key if it is not already held.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to acquire. |
| `ttl` | int | Time-to-live in seconds for the claimed key. |
| Type | Description |
|---|---|
| Result[bool, IdempotencyError] | ``Ok(True)`` if this caller should proceed. ``Ok(False)`` if the key is already claimed. ``Err(IdempotencyError)`` on store failures. |
ResiliencePipelineProtocol
Section titled “ResiliencePipelineProtocol”Protocol for resilience pipeline that combines multiple patterns.
def add(pattern: Any) -> ResiliencePipelineProtocol
Add a resilience pattern to the pipeline.
Execute function through the resilience pipeline.
RetryPolicyProtocol
Section titled “RetryPolicyProtocol”Protocol for retry policy implementations.
ThrottlerProtocol
Section titled “ThrottlerProtocol”Protocol for throttler implementations.
Acquire permission to proceed.
Try to acquire permission. Returns True if successful.
Get throttling statistics.
Classes
Section titled “Classes”Bulkhead
Section titled “Bulkhead”Bulkhead pattern for limiting concurrent executions.
def __init__( config: BulkheadConfig, metrics_collector: MetricsRecorderProtocol | None = None ) -> None
Execute a function within the bulkhead.
Alias for execute — call a function within the bulkhead.
Call a sync function within the bulkhead using a blocking wait.
FIX MAJ-2: Uses a dedicated threading.Semaphore for sync safety.
BulkheadConfig
Section titled “BulkheadConfig”Configuration for bulkhead isolation.
CircuitBreaker
Section titled “CircuitBreaker”Circuit breaker implementation with async support.
The circuit breaker prevents cascading failures by tracking the health of remote service calls. It has three states:
- CLOSED: Normal operation. Requests pass through.
- OPEN: Service is failing. Requests fail immediately without calling the remote service.
- HALF_OPEN: After recovery_timeout, limited requests are allowed through to test if the service has recovered.
The circuit opens when the failure threshold is exceeded and closes after the success threshold is reached in half-open state.
Construction
Section titled “Construction”Two equivalent construction styles are supported:
Config-first (preferred for production code and DI-wired providers): Construct a CircuitBreakerConfig up-front, name it, and share it across multiple breakers or register it with a provider. All individual parameters are ignored when config is supplied.
from lexigram.resilience import CircuitBreaker, CircuitBreakerConfig
cfg = CircuitBreakerConfig( name="external-api", failure_threshold=5, recovery_timeout=60.0,)breaker = CircuitBreaker(config=cfg)Using the config object (recommended for production)
from lexigram.resilience import CircuitBreaker, CircuitBreakerConfig
config = CircuitBreakerConfig( failure_threshold=3, recovery_timeout=30.0, success_threshold=2,)breaker = CircuitBreaker(config=config)Attributes: state: Current state of the circuit breaker (CLOSED, OPEN, HALF_OPEN). metrics: Current metrics including success/failure counts.
Example
Using as a decorator
from lexigram.resilience import circuit_breaker
@circuit_breaker(name="external-api", failure_threshold=5)async def call_external_api(): response = await http_client.get("/api/data") return response.json()Using directly
from lexigram.resilience import CircuitBreaker, CircuitBreakerConfig
config = CircuitBreakerConfig(failure_threshold=3, recovery_timeout=30.0)breaker = CircuitBreaker(config=config)
async with breaker: result = await risky_operation()
# Or with fallbackasync def fallback(): return cached_data
breaker = CircuitBreaker(config=CircuitBreakerConfig(), fallback=fallback)result = await breaker.execute(risky_operation)def __init__( config: CircuitBreakerConfig, fallback: Callable[Ellipsis, Any] | None = None, metrics_collector: MetricsRecorderProtocol | None = None, backend: Any | None = None ) -> None
Initialize the circuit breaker.
| Parameter | Type | Description |
|---|---|---|
| `config` | CircuitBreakerConfig | A CircuitBreakerConfig object (required). |
| `fallback` | Callable[Ellipsis, Any] | None | Optional callable to execute when the circuit is open. |
| `metrics_collector` | MetricsRecorderProtocol | None | Optional metrics collector. |
| `backend` | Any | None | Optional distributed state backend. |
property state() -> CircuitState
Get current circuit state.
Get circuit breaker metrics.
Get metrics as a dictionary for backward compatibility.
Execute a callable through the circuit breaker.
Primary async call interface — accepts both sync and async callables and routes them through the internal circuit-state machine.
Note: typing is widened to Any to accommodate async callables
(Callable[..., Awaitable[T]]) alongside plain sync ones.
Alias for execute — satisfies CircuitBreakerProtocol.call.
Context manager to protect a code block.
Execute a sync function with circuit breaker protection.
Manually reset the circuit breaker to closed state.
Manually force the circuit breaker to open state.
def sensitive(cls) -> CircuitBreaker
Trip quickly — for critical dependencies.
Opens after 3 consecutive failures or a 30 % failure rate. Waits only 30 s before probing recovery, and requires just 1 probe success to close again. Use for primary databases, auth services, or any dependency whose failure should be surfaced to callers as soon as possible.
def tolerant(cls) -> CircuitBreaker
Trip slowly — for non-critical or high-volume dependencies.
Opens only after 10 consecutive failures or a 70 % failure rate. Waits 120 s before probing recovery, and requires 5 consecutive probe successes before closing. Use for analytics sinks, notification services, or any call whose failures should not immediately surface to callers.
CircuitBreakerConfig
Section titled “CircuitBreakerConfig”Configuration for circuit breaker.
Attributes:
failure_threshold: Number of failures before opening the circuit.
recovery_timeout: Seconds in open state before attempting half-open.
expected_exception: Exception types that count as failures.
success_threshold: Consecutive successes required to close from half-open.
timeout: Per-call timeout in seconds.
name: Human-readable identifier for this breaker instance.
sliding_window_seconds: Window for computing the failure rate.
failure_rate_threshold: Fraction of calls that must fail to trip.
backend: State store backend for distributed circuit breaker coordination.
"memory" (default) — in-process state, no coordination.
"redis" — uses a CacheBackendProtocol
resolved from the DI container.
"consul" — uses Consul KV for cross-datacenter coordination.
CircuitBreakerRegistry
Section titled “CircuitBreakerRegistry”Registry for managing named circuit breakers.
Initialize the circuit breaker registry.
Creates an empty registry for managing named circuit breakers.
Set metrics collector for all breakers in registry.
Set distributed state backend for all current and future breakers.
| Parameter | Type | Description |
|---|---|---|
| `backend` | Any | A CircuitBreakerBackend implementation enabling distributed state sharing across processes. |
def get(name: str) -> CircuitBreaker | None
Get circuit breaker by name.
async def get_or_create( name: str, config: CircuitBreakerConfig | None = None ) -> CircuitBreaker
Get or create a named circuit breaker.
Reset a named circuit breaker to closed state.
Force a named circuit breaker to open state.
Get metrics for a named circuit breaker.
List all circuit breakers with their state and metrics.
Clean up all circuit breakers.
CircuitClosedEvent
Section titled “CircuitClosedEvent”Circuit breaker closed and restored to normal operation.
CircuitClosedHook
Section titled “CircuitClosedHook”Payload fired when a circuit breaker transitions back to the CLOSED state.
Attributes: circuit_name: Name of the circuit breaker that closed.
CircuitOpenedEvent
Section titled “CircuitOpenedEvent”Circuit breaker opened due to threshold breach.
CircuitOpenedHook
Section titled “CircuitOpenedHook”Payload fired when a circuit breaker transitions to the OPEN state.
Attributes: circuit_name: Name of the circuit breaker that opened.
CircuitState
Section titled “CircuitState”Circuit breaker states.
DatabaseIdempotencyStore
Section titled “DatabaseIdempotencyStore”SQL-backed idempotency store using ``DatabaseProviderProtocol``.
Provides at-most-once execution guarantees backed by a relational database, suitable for deployments that do not have Redis but already run a SQL database.
The idempotency table is created automatically on first use (lazy initialisation). For production workloads consider creating the table via your migration pipeline using the DDL shown in the module docstring.
| Parameter | Type | Description |
|---|---|---|
| `db` | Database provider resolved from the container. | |
| `table` | Name of the idempotency table. Override if the default name clashes with an existing table in your schema. |
Create a database-backed idempotency store.
| Parameter | Type | Description |
|---|---|---|
| `db` | DatabaseProviderProtocol | Database provider resolved from the container. |
| `table` | str | Name of the idempotency table. |
Retrieve a stored result by idempotency key.
Expired records are treated as non-existent.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| Type | Description |
|---|---|
| Any | None | The stored result, or ``None`` if not found or expired. |
Store a result with an optional TTL.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| `value` | Any | The result to store (must be JSON-serialisable). |
| `ttl` | float | None | Time-to-live in seconds, or ``None`` for no expiry. |
Remove an idempotency record by key.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to remove. |
Atomically claim an idempotency key using INSERT ON CONFLICT DO NOTHING.
Issues an INSERT … ON CONFLICT (key) DO NOTHING which is atomic on
all major RDBMS (PostgreSQL, SQLite, MySQL). Returns True only if
a new row was inserted, meaning this caller won the race to claim the
key. Returns False if another caller already holds the key.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to acquire. |
| `ttl` | int | Time-to-live in seconds for the claimed key. |
| Type | Description |
|---|---|
| bool | ``True`` if the key was freshly claimed; ``False`` if already held. |
Remove all records whose TTL has elapsed.
Call this periodically (e.g. from a scheduled task) to keep the table from growing unboundedly.
Remove all expired records and return the count purged.
| Type | Description |
|---|---|
| int | Number of expired rows removed (best-effort; may be ``0`` if the driver does not surface affected-row counts). |
DistributedCircuitBreakerBackend
Section titled “DistributedCircuitBreakerBackend”Distributed circuit breaker backend backed by a lexigram.contracts.StateStoreProtocol.
Stores serialized CircuitBreakerState as JSON so that all
processes sharing the same StateStoreProtocol see a consistent view of
each circuit breaker’s state.
Example
from lexigram.contracts import StateStoreProtocolfrom lexigram.resilience.circuit.backend import DistributedCircuitBreakerBackend
backend = DistributedCircuitBreakerBackend(store=state_store)breaker = CircuitBreaker(name="payments", backend=backend)def __init__( store: StateStoreProtocol, ttl: int = 3600 ) -> None
Initialise the backend.
| Parameter | Type | Description |
|---|---|---|
| `store` | StateStoreProtocol | A StateStoreProtocol implementation (e.g. the Redis-backed state store from ``lexigram-sql``). |
| `ttl` | int | Time-to-live (seconds) for each stored state entry. Defaults to 1 hour, ensuring stale entries are eventually cleaned up even if delete_state is never called. |
Fetch circuit breaker state from the distributed store.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Circuit breaker name. |
| Type | Description |
|---|---|
| CircuitBreakerState | None | Deserialized state, or ``None`` if not found. |
Persist circuit breaker state to the distributed store.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Circuit breaker name. |
| `state` | CircuitBreakerState | State to store. |
Remove circuit breaker state from the distributed store.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Circuit breaker name. |
DistributedRateLimiter
Section titled “DistributedRateLimiter”Sliding-window rate limiter that persists state in a StateStoreProtocol.
The window is defined by the most recent period seconds. Each
successful acquisition records a timestamp; stale entries (older than
period) are discarded on every read so the effective count always
reflects only the live window.
| Parameter | Type | Description |
|---|---|---|
| `name` | Logical name for this limiter; used as part of the cache key. | |
| `max_calls` | Maximum number of acquisitions permitted within ``period``. | |
| `period` | Window size in seconds (float). | |
| `store` | A StateStoreProtocol implementation used to persist window timestamps across replicas. |
def __init__( name: str, max_calls: int, period: float, store: StateStoreProtocol ) -> None
Attempt to consume one token without blocking.
| Type | Description |
|---|---|
| bool | ``True`` if the acquisition succeeded (count was under the limit); ``False`` if the limit is currently exhausted. |
Block until one token is available and consume it.
Polls try_acquire using an exponential back-off that is
capped at half the window period so callers do not spin
excessively during sustained overload.
Return current in-process limiter statistics.
| Type | Description |
|---|---|
| dict[str, Any] | A plain mapping with keys ``total_requests``, ``allowed_requests``, ``denied_requests``, ``name``, ``max_calls``, and ``period``. |
Note Statistics are local to the current process replica and are not aggregated across the distributed fleet.
DurableIdempotencyProvider
Section titled “DurableIdempotencyProvider”DI provider that replaces the in-memory store with a Redis-backed store.
Registers RedisIdempotencyStore as the IdempotencyStoreProtocol factory, requiring a CacheBackendProtocol to be resolvable from the container.
This provider declares a dependency on ‘idempotency’ so the core provider runs first.
async def register(container: ContainerRegistrarProtocol) -> None
Override IdempotencyStoreProtocol with a RedisIdempotencyStore factory.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerRegistrarProtocol | The DI container registrar. |
async def boot(container: ContainerResolverProtocol) -> None
Verify that CacheBackendProtocol is available in the container.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerResolverProtocol | The DI container resolver. |
| Exception | Description |
|---|---|
| AppStartupError | When CacheBackendProtocol cannot be resolved. |
No-op shutdown for the Redis-backed provider.
IdempotencyConfig
Section titled “IdempotencyConfig”Idempotency subsystem configuration.
Controls both in-memory store behaviour (TTL, capacity, cleanup) and backing-store key conventions (prefix, max length).
Attributes: ttl: Time-to-live for cached results in seconds. max_entries: Maximum number of in-memory entries before FIFO eviction. cleanup_interval: Seconds between background expired-record sweeps. auto_cleanup: Whether to start the background cleanup task on init. key_prefix: Prefix prepended to every key in backing stores (e.g. Redis). max_key_length: Maximum allowed length for an idempotency key.
IdempotencyKeyHitEvent
Section titled “IdempotencyKeyHitEvent”Idempotency key cache hit detected.
IdempotencyMiddleware
Section titled “IdempotencyMiddleware”Framework-agnostic idempotency deduplication middleware.
Reads the Idempotency-Key HTTP header from an incoming request. When
the key has been seen before (and the stored result has not expired), the
original response is returned immediately without calling the downstream
handler. When the key is new the handler is invoked, and its response is
stored against the key for future duplicate requests.
Typical usage
store = InMemoryIdempotencyStore()middleware = IdempotencyMiddleware(store=store)
# In a simplified ASGI handler or test:response = await middleware.process( headers={"idempotency-key": "req-abc-123"}, handler=my_handler,)| Parameter | Type | Description |
|---|---|---|
| `store` | Backend used to persist idempotency records. | |
| `config` | Idempotency configuration (TTL, key prefix, etc.). | |
| `header_name` | HTTP header name to inspect (default: ``idempotency-key``). |
def __init__( store: IdempotencyStoreProtocol, config: IdempotencyConfig | None = None, header_name: str = _IDEMPOTENCY_KEY_HEADER ) -> None
Run the handler with idempotency deduplication.
When an Idempotency-Key header is present:
- Cache hit — deserialise and return the stored response without invoking handler.
- Cache miss — call handler, serialise the result, persist it, and return the result.
When the header is absent the handler is called directly (pass-through).
| Parameter | Type | Description |
|---|---|---|
| `headers` | dict[str, str] | Lower-cased request headers. |
| `handler` | Any | Async callable that produces the response. Called with ``(headers, *args, **kwargs)`` when no cache hit. *args: Forwarded to *handler* on cache miss. **kwargs: Forwarded to *handler* on cache miss. |
| Type | Description |
|---|---|
| Any | The response — either the freshly-computed one or the replayed stored result. |
IdempotencyModule
Section titled “IdempotencyModule”Self-contained module that wires up the idempotency store.
Usage
app = App(modules=[IdempotencyModule.configure()])Or with a custom config
app = App(modules=[IdempotencyModule.configure(IdempotencyConfig(ttl=300))])def configure( cls, config: IdempotencyConfig | None = None ) -> DynamicModule
Build a DynamicModule for the idempotency subsystem.
| Parameter | Type | Description |
|---|---|---|
| `config` | IdempotencyConfig | None | Optional idempotency configuration. When ``None`` the provider uses its defaults. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule that registers IdempotencyProvider and exports IdempotencyStoreProtocol. |
| Exception | Description |
|---|---|
| TypeError | When *config* is not ``None`` and not an IdempotencyConfig. |
IdempotencyProvider
Section titled “IdempotencyProvider”DI provider for the in-memory idempotency store.
Registers InMemoryIdempotencyStore as the IdempotencyStoreProtocol singleton with optional capacity and cleanup configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | Idempotency configuration. Defaults to IdempotencyConfig(). |
def __init__(config: IdempotencyConfig | None = None) -> None
Create the IdempotencyProvider.
| Parameter | Type | Description |
|---|---|---|
| `config` | IdempotencyConfig | None | Idempotency configuration. Defaults to IdempotencyConfig(). |
async def register(container: ContainerRegistrarProtocol) -> None
Bind IdempotencyStoreProtocol to an InMemoryIdempotencyStore singleton.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerRegistrarProtocol | The DI container registrar. |
async def boot(container: ContainerResolverProtocol) -> None
No boot-time work required for the in-memory store.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerResolverProtocol | The DI container resolver. |
Stop the background cleanup task if running.
IdempotencyRecord
Section titled “IdempotencyRecord”Immutable snapshot of an idempotency entry.
IdempotencyResult
Section titled “IdempotencyResult”Result of processing an idempotency-keyed request.
Attributes: is_duplicate: True if this request was already processed. key: The idempotency key. from_cache: True if the response was served from cache.
IdempotencyStatus
Section titled “IdempotencyStatus”Lifecycle status for idempotency records.
InMemoryCircuitBreakerBackend
Section titled “InMemoryCircuitBreakerBackend”In-memory backend for circuit breaker state.
Suitable for single-process deployments or testing. Note: Does not work across multiple processes/containers.
Get circuit breaker state by name.
Set circuit breaker state.
Delete circuit breaker state.
InMemoryIdempotencyStore
Section titled “InMemoryIdempotencyStore”In-memory implementation of IdempotencyStoreProtocol with TTL-based expiry.
Stores results in a dictionary keyed by idempotency key, with optional expiry timestamps. Suitable for development and testing.
Create an in-memory idempotency store.
| Parameter | Type | Description |
|---|---|---|
| `auto_cleanup_interval` | float | None | Seconds between automatic purges of expired entries. When set, a background task starts immediately. |
| `max_entries` | int | None | Maximum number of entries the store will hold. When the cap is reached, the *oldest* (first-inserted) unexpired entry is evicted before the new one is stored. ``None`` means the store grows without bound. |
Retrieve a stored result, returning None if expired or missing.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| Type | Description |
|---|---|
| Any | None | The stored result, or None if not found or expired. |
Store a result with optional TTL.
If max_entries was configured and storing key would exceed the cap, the oldest entry is evicted first (FIFO order). Updating an existing key never triggers eviction.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| `value` | Any | The result value to cache. |
| `ttl` | float | None | Time-to-live in seconds, or None for no expiry. |
Remove a stored idempotency record.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to remove. |
Atomically claim an idempotency key if not already held.
Uses a per-key asyncio.Lock to guarantee in-process atomicity.
Returns True if this coroutine was the first to claim the key and
should proceed with execution; False if the key already existed.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to acquire. |
| `ttl` | int | Time-to-live in seconds for the claimed key. |
| Type | Description |
|---|---|
| bool | ``True`` if the key was freshly claimed; ``False`` if already held. |
Remove all stored idempotency records.
async def get_record(key: str) -> IdempotencyRecord | None
Retrieve the full idempotency record for key, or None.
Returns an IdempotencyRecord
with a snapshot of the stored entry’s value, creation time, expiry,
and status. Returns None when the key is absent or expired.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to look up. |
| Type | Description |
|---|---|
| IdempotencyRecord | None | An ``IdempotencyRecord`` if the key exists and has not expired, otherwise ``None``. |
Enable periodic expiry of old entries.
| Parameter | Type | Description |
|---|---|---|
| `interval` | float | None | Optional override for the cleanup interval in seconds. If not provided, the value passed to the constructor is used. |
Cancel the background cleanup task.
Return the number of stored idempotency records (including expired).
| Type | Description |
|---|---|
| int | The total entry count. |
Check whether a non-expired entry exists for the given key.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| Type | Description |
|---|---|
| bool | True if the key is present and not expired. |
Remove all expired entries and return how many were purged.
| Type | Description |
|---|---|
| int | The number of entries removed. |
Return basic statistics about the store.
| Type | Description |
|---|---|
| dict[str, int] | A dict with ``total`` (all entries), ``active`` (non-expired), and ``expired`` counts. |
RateLimiter
Section titled “RateLimiter”Rate limiter using token bucket algorithm.
Acquire permission to proceed, blocking if rate exceeded.
Try to acquire permission without blocking.
Get rate limiter statistics as a plain mapping.
| Type | Description |
|---|---|
| Dictionary with counters | total_requests, allowed_requests, denied_requests, total_wait_time. |
RedisIdempotencyStore
Section titled “RedisIdempotencyStore”Idempotency store backed by a ``CacheBackendProtocol`` (e.g. Redis).
Delegates all storage operations to the provided CacheBackendProtocol,
enabling distributed deduplication across multiple application instances.
The backend is injected by the DI container, keeping this class fully
decoupled from any specific cache implementation.
| Parameter | Type | Description |
|---|---|---|
| `cache` | The cache backend used for key storage. | |
| `key_prefix` | Optional prefix applied to all idempotency keys to avoid collisions with other cache consumers. |
Create a Redis-backed idempotency store.
| Parameter | Type | Description |
|---|---|---|
| `cache` | CacheBackendProtocol | The cache backend resolved from the container. |
| `key_prefix` | str | Prefix prepended to every key in the cache. |
Retrieve a stored result by idempotency key.
Returns None for both missing keys and keys claimed but not yet
resolved (sentinel "__pending__" entries).
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| Type | Description |
|---|---|
| Any | None | The stored result, or ``None`` if not found or still pending. |
Retrieve the full idempotency record, including metadata.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| Type | Description |
|---|---|
| Any | Result containing IdempotencyRecord or None if missing. |
Store a result with an optional TTL.
The float TTL from the IdempotencyStoreProtocol protocol is rounded
up to the nearest second for the underlying cache backend.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key. |
| `value` | Any | The result to store. |
| `ttl` | float | None | Time-to-live in seconds, or ``None`` for no expiry. |
Remove an idempotency record by key.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to remove. |
Atomically claim an idempotency key using cache SET-NX semantics.
Stores a "__pending__" sentinel so that cross-process races can be
detected via get. CacheBackendProtocol implementations that route
to Redis will naturally provide atomic SET key NX EX ttl semantics
when the backing library (e.g. aioredis) supports it.
Note
If the CacheBackendProtocol implementation does not support atomic NX,
a narrow TOCTOU window remains. Use a CacheBackendProtocol backed by
Redis with NX support for fully distributed safety.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The idempotency key to acquire. |
| `ttl` | int | Time-to-live in seconds for the claimed key. |
| Type | Description |
|---|---|
| bool | ``True`` if the key was freshly claimed; ``False`` if already held. |
No-op for Redis-backed store — Redis TTLs expire entries natively.
Redis automatically evicts keys whose TTL has elapsed, so explicit cleanup is unnecessary. This method exists for API parity with the in-memory store.
| Type | Description |
|---|---|
| int | Always ``0`` since no active eviction is performed. |
ResilienceModule
Section titled “ResilienceModule”Circuit breakers, retry policies, bulkhead isolation, throttling, and rate limiting.
Call configure to configure the resilience subsystem with custom settings.
Usage
from lexigram.resilience.config import ResilienceConfig
@module( imports=[ResilienceModule.configure(ResilienceConfig(...))])class AppModule(Module): passdef configure( cls, config: Any | None = None ) -> DynamicModule
Create a ResilienceModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | Any | None | ResilienceConfig or ``None`` for framework defaults. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def stub(cls) -> DynamicModule
Return a ResilienceModule with framework defaults for testing.
All circuit breakers and retry policies use default configuration. Suitable for unit and integration tests.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule with default resilience configuration. |
ResiliencePipeline
Section titled “ResiliencePipeline”Pipeline combining multiple resilience patterns.
def __init__( retry_config: RetryConfig | None = None, circuit_config: CircuitBreakerConfig | None = None, bulkhead_config: BulkheadConfig | None = None, timeout_config: TimeoutConfig | None = None, *, order: list[str] | tuple[str, Ellipsis] | None = None ) -> None
Initialize a resilience pipeline.
| Parameter | Type | Description |
|---|---|---|
| `retry_config` | RetryConfig | None | Configuration for retry behaviour. When provided, a RetryPolicy will be created and applied when ``execute`` is invoked. |
| `circuit_config` | CircuitBreakerConfig | None | Configuration for the circuit breaker. When provided, a CircuitBreaker will be created. |
| `bulkhead_config` | BulkheadConfig | None | Configuration for the bulkhead limiter. When provided, a Bulkhead will be created. |
| `timeout_config` | TimeoutConfig | None | Optional timeout configuration. If set and the ``timeout`` step appears in ``order``, the wrapped function will execute within a timeout_context. |
| `order` | list[str] | tuple[str, Ellipsis] | None | Optional sequence defining the execution order of the resilience steps. Valid names are ``"bulkhead"``, ``"circuit_breaker"``, ``"retry"`` and ``"timeout"``. The first element in the list is the outermost wrapper. If ``None`` (the default) the canonical order ``["bulkhead", "circuit_breaker", "retry", "timeout"]`` is used. This parameter enables scenarios such as applying timeout *outside* of retries or placing the bulkhead at any position in the pipeline. |
def add(pattern: Any) -> ResiliencePipeline
Add a resilience pattern to the pipeline.
Note: This is currently a type-hint compatibility method to satisfy the ResiliencePipelineProtocol. Parameters should ideally be set via constructor for full pipeline ordering.
Execute function with all configured resilience patterns.
ResilienceProvider
Section titled “ResilienceProvider”Resilience patterns DI provider.
Registers circuit breaker, retry, bulkhead, throttle, and rate-limiter infrastructure into the container as singletons.
async def register(container: ContainerRegistrarProtocol) -> None
Register resilience services and configurations.
Initialize resilience patterns on application boot.
Cleanup resilience patterns on shutdown.
Report resilience health.
ResilienceStatus
Section titled “ResilienceStatus”Status enum for Resilience.
RetryAttemptedHook
Section titled “RetryAttemptedHook”Payload fired on each retry attempt after an operation failure.
Attributes: operation: Name or label of the operation being retried. attempt: Attempt number (1-based).
RetryConfig
Section titled “RetryConfig”Configuration for retry policy.
RetryExhaustedEvent
Section titled “RetryExhaustedEvent”Retry strategy exhausted all attempts.
RetryPolicy
Section titled “RetryPolicy”Policy wrapper for retries, used by the resilience pipeline.
Execute an async function with retries.
def aggressive(cls) -> RetryPolicy
Retry quickly — for fast, idempotent operations.
5 attempts with a short 100 ms base delay, 1.5× backoff, and a 2 s cap. Use for lightweight read operations, DNS lookups, or any call that is cheap to repeat and very unlikely to have side-effects.
def conservative(cls) -> RetryPolicy
Retry slowly — for expensive or non-idempotent operations.
3 attempts with a 2 s base delay, 2× backoff, and a 30 s cap. Use for database writes, payment APIs, or any call where rapid retrying could worsen the situation.
SlidingWindowLimiter
Section titled “SlidingWindowLimiter”Sliding window rate limiter for precise request tracking.
Acquire permission to proceed, blocking if window is full.
Try to acquire permission without blocking.
Get rate limiter statistics.
Get current number of requests in the window.
Throttler
Section titled “Throttler”Class-based throttler for more control over rate limiting.
Provides a Throttler instance that can be used to throttle multiple functions or to dynamically control throttling behavior.
Example
Using Throttler class
throttler = Throttler(calls=5, period=1.0)
@throttler.throttleasync def limited_func(): ...
# Or use directlyawait throttler.acquire()def __init__( calls: int, period: float, *, burst: int | None = None, strategy: str = 'token_bucket', metrics: MetricsRecorderProtocol | None = None ) -> None
Initialize the throttler.
| Parameter | Type | Description |
|---|---|---|
| `calls` | int | Maximum number of calls allowed within the period. |
| `period` | float | Time period in seconds. |
| `burst` | int | None | Maximum burst size for token bucket strategy. |
| `strategy` | str | Rate limiting strategy - "token_bucket" or "sliding_window". |
| `metrics` | MetricsRecorderProtocol | None | Optional MetricsRecorderProtocol for emitting ``throttle.allowed`` and ``throttle.denied`` counters. When provided, every acquire and try_acquire call records its outcome so telemetry dashboards can track throttle activity without polling get_stats. |
Decorator to throttle a function using this Throttler.
| Parameter | Type | Description |
|---|---|---|
| `func` | Callable[Ellipsis, Any] | The function to throttle. |
| Type | Description |
|---|---|
| Callable[Ellipsis, Any] | The wrapped function with throttling applied. |
Example
throttler = Throttler(calls=10, period=1.0)
@throttler.throttle … async def limited_api_call(): … return await fetch_data()
Acquire permission to proceed, blocking if rate exceeded.
Emits throttle.allowed via the MetricsRecorderProtocol
when one was provided at construction time.
Example
throttler = Throttler(calls=10, period=1.0) await throttler.acquire()
Can proceed with operation
Section titled “Can proceed with operation”
Try to acquire permission without blocking.
Emits throttle.allowed or throttle.denied via the
MetricsRecorderProtocol when one
was provided at construction time.
| Type | Description |
|---|---|
| bool | True if permission was granted, False if rate limit exceeded. |
Example
throttler = Throttler(calls=10, period=1.0) if await throttler.try_acquire(): … # Proceed with operation
Get throttler statistics.
| Type | Description |
|---|---|
| dict | Dictionary with throttle statistics. |
Example
from lexigram.logging import get_logger logger = get_logger(name) throttler = Throttler(calls=10, period=1.0) stats = throttler.get_stats() logger.info(“stats”, allowed=stats[‘allowed_requests’])
TimeoutConfig
Section titled “TimeoutConfig”Configuration for timeout handling.
TimeoutManager
Section titled “TimeoutManager”Named timeout registry with per-operation configuration.
A central registry that maps operation names to timeout durations. Unknown operations fall back to the default timeout. Use configure to set per-operation overrides before running operations.
Example
manager = TimeoutManager(default_seconds=10.0)manager.configure("send_email", seconds=5.0)manager.configure("run_report", seconds=120.0)
result = await manager.run("send_email", send_email(user))Initialise the manager with a default timeout.
| Parameter | Type | Description |
|---|---|---|
| `default_seconds` | float | Fallback timeout (in seconds) applied to any operation that has not been given an explicit override. Must be positive. |
| Exception | Description |
|---|---|
| ValueError | If *default_seconds* is not positive. |
Register a per-operation timeout override.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | A string key identifying the operation. |
| `seconds` | float | Timeout duration in seconds. Must be positive. |
| Exception | Description |
|---|---|
| ValueError | If *seconds* is not positive. |
Return the effective timeout for operation.
Returns the per-operation override if one has been configured, otherwise the manager’s default.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | The operation name to look up. |
| Type | Description |
|---|---|
| float | Timeout duration in seconds. |
Execute coro under the timeout configured for operation.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | The operation name used to look up the timeout. |
| `coro` | Awaitable[T] | The awaitable to run. |
| Type | Description |
|---|---|
| T | The value produced by *coro*. |
| Exception | Description |
|---|---|
| ResilienceTimeoutError | If *coro* does not complete within the configured timeout. |
Functions
Section titled “Functions”bulkhead
Section titled “bulkhead”
def bulkhead(config: BulkheadConfig) -> Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]]
Decorator to apply bulkhead pattern.
Limits the number of concurrent executions of the decorated function. Auto-detects if the decorated function is sync or async and handles accordingly.
| Parameter | Type | Description |
|---|---|---|
| `config` | BulkheadConfig | Bulkhead configuration specifying concurrency limits. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]] | A decorator that wraps the function with bulkhead protection. |
Example
from lexigram.resilience import bulkhead, BulkheadConfig
config = BulkheadConfig(max_concurrent=5) @bulkhead(config) async def api_call(): …
circuit_breaker
Section titled “circuit_breaker”
def circuit_breaker( name: str, registry: CircuitBreakerRegistry, config: CircuitBreakerConfig | None = None ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Awaitable[T]]]
Decorator to apply circuit breaker protection to an async function.
The registry must be provided explicitly to avoid global state.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Name of the circuit breaker (must match registry). |
| `registry` | CircuitBreakerRegistry | CircuitBreakerRegistry instance for managing breakers. |
| `config` | CircuitBreakerConfig | None | Optional configuration for the circuit breaker. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Awaitable[T]]] | A decorator that wraps the async function with circuit breaker protection. |
Example
from lexigram.resilience import circuit_breaker, CircuitBreakerRegistry
registry = CircuitBreakerRegistry() @circuit_breaker(“api”, registry) async def api_call(): …
circuit_breaker_sync
Section titled “circuit_breaker_sync”
def circuit_breaker_sync( name: str, registry: CircuitBreakerRegistry, config: CircuitBreakerConfig | None = None ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, T]]
Decorator to apply circuit breaker protection to a sync function.
The registry must be provided explicitly to avoid global state.
Note
This is for sync functions. For async functions, use circuit_breaker.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Name of the circuit breaker (must match registry). |
| `registry` | CircuitBreakerRegistry | CircuitBreakerRegistry instance for managing breakers. |
| `config` | CircuitBreakerConfig | None | Optional configuration for the circuit breaker. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, T]] | A decorator that wraps the sync function with circuit breaker protection. |
get_throttle_stats
Section titled “get_throttle_stats”
Get throttle statistics for a throttled function.
Resolves statistics from the limiter attached to the function wrapper.
idempotent
Section titled “idempotent”
def idempotent( store: IdempotencyStoreProtocol, *, key_func: Callable[Ellipsis, str] | None = None, ttl: float | None = 3600.0 ) -> Callable[Ellipsis, Any]
Decorator that ensures an async function executes at most once per key.
The idempotency key is derived from function arguments by default, or from a custom key_func. Results are cached in the provided store.
| Parameter | Type | Description |
|---|---|---|
| `store` | IdempotencyStoreProtocol | The IdempotencyStoreProtocol backend for caching results. |
| `key_func` | Callable[Ellipsis, str] | None | Optional callable that generates a key string from (*args, **kwargs). If None, a deterministic hash of the arguments is used. |
| `ttl` | float | None | Time-to-live in seconds for cached results. Defaults to 3600 (1 hour). |
| Type | Description |
|---|---|
| Callable[Ellipsis, Any] | A decorator that wraps async functions with idempotency logic. |
Example
from lexigram.resilience import idempotent, InMemoryIdempotencyStore
store = InMemoryIdempotencyStore()
@idempotent(store, ttl=60.0) async def create_order(order_id: str, amount: float) -> dict: return {“id”: order_id, “amount”: amount}
def retry( func_or_config: Callable[Ellipsis, Awaitable[T] | T] | RetryConfig, *args: Any, config: RetryConfig | None = None, **kwargs: Any ) -> Any
Unified retry API for sync and async callables.
Decorator mode (pass RetryConfig first)
config = RetryConfig(max_attempts=3)
@retry(config)async def fetch_data() -> bytes: ...Direct-call mode
result = await retry(fetch_data, config=config)| Parameter | Type | Description |
|---|---|---|
| `func_or_config` | Callable[Ellipsis, Awaitable[T] | T] | RetryConfig | A RetryConfig (decorator mode) or a callable to invoke with retries (direct-call mode). *args: Positional arguments forwarded to the callable in direct-call mode. |
| `config` | RetryConfig | None | Required in direct-call mode. Ignored in decorator mode. **kwargs: Keyword arguments forwarded to the callable in direct-call mode. |
| Type | Description |
|---|---|
| In decorator mode | a decorator wrapping a callable with retry logic. In direct-call mode: the return value of the callable. |
throttle
Section titled “throttle”
def throttle( calls: int, period: float, *, burst: int | None = None, strategy: str = 'token_bucket', key: str | None = None ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to throttle function calls.
Resolves the ThrottleRegistry from the DI container context.
with_timeout
Section titled “with_timeout”
def with_timeout(config: TimeoutConfig) -> Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]]
Decorator to apply timeout to a function.
Auto-detects if the decorated function is sync or async and handles accordingly.
| Parameter | Type | Description |
|---|---|---|
| `config` | TimeoutConfig | Timeout configuration specifying duration and behavior. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]] | A decorator that wraps the function with a timeout guard. |
Example
from lexigram.resilience import with_timeout, TimeoutConfig
config = TimeoutConfig(timeout=5.0) @with_timeout(config) async def slow_operation(): …
Exceptions
Section titled “Exceptions”BulkheadError
Section titled “BulkheadError”Bulkhead/rejection error.
BulkheadRejectedError
Section titled “BulkheadRejectedError”Bulkhead rejected due to capacity limits.
CircuitBreakerError
Section titled “CircuitBreakerError”Circuit breaker error.
CircuitOpenError
Section titled “CircuitOpenError”Circuit breaker is open.
DuplicateRequestError
Section titled “DuplicateRequestError”Raised when an idempotent key is reused before completion.
IdempotencyBackendError
Section titled “IdempotencyBackendError”Raised when a Redis or database backend connection fails.
This exception is raised when the underlying storage backend (Redis, PostgreSQL, etc.) is unavailable or returns an unexpected error during an idempotency store operation.
Attributes:
_code: Machine-readable error code LEX_ERR_IDEM_005.
IdempotencyConfigurationError
Section titled “IdempotencyConfigurationError”Raised when the idempotency store is misconfigured.
This exception is raised when required configuration values are missing, invalid, or mutually incompatible — for example a negative TTL, an unknown backend type, or a missing connection URL.
Attributes:
_code: Machine-readable error code LEX_ERR_IDEM_006.
IdempotencyConflictError
Section titled “IdempotencyConflictError”Raised when an idempotency key is already claimed (PENDING).
IdempotencyError
Section titled “IdempotencyError”Base exception for idempotency flows.
IdempotencyStoreError
Section titled “IdempotencyStoreError”Raised on unexpected failures inside the idempotency store.
ResilienceError
Section titled “ResilienceError”Base resilience error.
ResilienceTimeoutError
Section titled “ResilienceTimeoutError”Resilience operation timed out.
Inherits from both ResilienceError and built-in TimeoutError so that
isinstance(e, TimeoutError) is True, enabling compatibility with
asyncio timeout handling.
RetryError
Section titled “RetryError”Retry operation error.
RetryExhaustedError
Section titled “RetryExhaustedError”All retry attempts exhausted.