Skip to content
GitHubDiscord

API Reference

Protocol for bulkhead implementations.

Protocol for circuit breaker implementations.
property state() -> str

Get current circuit state (closed, open, half_open).

async def call(
    func: Any,
    *args: Any,
    **kwargs: Any
) -> Any

Execute function with circuit breaker protection.

def protect() -> AbstractAsyncContextManager[None]

Return an async context manager that protects a code block.

Raises CircuitOpenError when the circuit is open, and records success or failure based on the outcome of the protected block.

def reset() -> None

Reset circuit to closed state.

def force_open() -> None

Force circuit to open state.


Protocol for circuit breaker registries.
def get(name: str) -> CircuitBreakerProtocol | None

Get circuit breaker by name.

async def get_or_create(
    name: str,
    config: CircuitBreakerConfig | None = None
) -> CircuitBreakerProtocol

Get or create circuit breaker by name.

def list_breakers() -> dict[str, dict[str, Any]]

List all circuit breakers.


Protocol for storing and checking idempotency keys.
async def get(key: str) -> Result[Any | None, IdempotencyError]

Retrieve a stored result by idempotency key.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
Returns
TypeDescription
Result[Any | None, IdempotencyError]``Ok(value)`` when a cached result exists. ``Ok(None)`` when the key is not present or expired. ``Err(IdempotencyError)`` on store failures.
async def get_record(key: str) -> Result[IdempotencyRecord | None, IdempotencyError]

Retrieve the full idempotency record, including metadata.

async def set(
    key: str,
    value: Any,
    ttl: float | None = None
) -> Result[None, IdempotencyError]

Store a result with an optional time-to-live.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
`value`AnyThe result to store.
`ttl`float | NoneTime-to-live in seconds, or ``None`` for no expiry.
async def delete(key: str) -> Result[None, IdempotencyError]

Remove an idempotency record by key.

async def acquire(
    key: str,
    ttl: int
) -> Result[bool, IdempotencyError]

Atomically claim an idempotency key if it is not already held.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to acquire.
`ttl`intTime-to-live in seconds for the claimed key.
Returns
TypeDescription
Result[bool, IdempotencyError]``Ok(True)`` if this caller should proceed. ``Ok(False)`` if the key is already claimed. ``Err(IdempotencyError)`` on store failures.

Protocol for resilience pipeline that combines multiple patterns.
def add(pattern: Any) -> ResiliencePipelineProtocol

Add a resilience pattern to the pipeline.

async def execute(
    func: Any,
    *args: Any,
    **kwargs: Any
) -> Any

Execute function through the resilience pipeline.


Protocol for retry policy implementations.
async def execute(
    func: Any,
    *args: Any,
    **kwargs: Any
) -> Any

Execute function with retry logic.


Protocol for throttler implementations.
async def acquire() -> None

Acquire permission to proceed.

async def try_acquire() -> bool

Try to acquire permission. Returns True if successful.

def get_stats() -> dict[str, Any]

Get throttling statistics.


Bulkhead pattern for limiting concurrent executions.
def __init__(
    config: BulkheadConfig,
    metrics_collector: MetricsRecorderProtocol | None = None
) -> None
async def execute(
    func: Callable[Ellipsis, Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute a function within the bulkhead.

async def call(
    func: Callable[Ellipsis, Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Alias for execute — call a function within the bulkhead.

def execute_sync(
    func: Callable[Ellipsis, T],
    *args: Any,
    **kwargs: Any
) -> T

Call a sync function within the bulkhead using a blocking wait.

FIX MAJ-2: Uses a dedicated threading.Semaphore for sync safety.


Configuration for bulkhead isolation.

Circuit breaker implementation with async support.

The circuit breaker prevents cascading failures by tracking the health of remote service calls. It has three states:

  • CLOSED: Normal operation. Requests pass through.
  • OPEN: Service is failing. Requests fail immediately without calling the remote service.
  • HALF_OPEN: After recovery_timeout, limited requests are allowed through to test if the service has recovered.

The circuit opens when the failure threshold is exceeded and closes after the success threshold is reached in half-open state.

Two equivalent construction styles are supported:

Config-first (preferred for production code and DI-wired providers): Construct a CircuitBreakerConfig up-front, name it, and share it across multiple breakers or register it with a provider. All individual parameters are ignored when config is supplied.

from lexigram.resilience import CircuitBreaker, CircuitBreakerConfig
cfg = CircuitBreakerConfig(
name="external-api",
failure_threshold=5,
recovery_timeout=60.0,
)
breaker = CircuitBreaker(config=cfg)

Using the config object (recommended for production)

from lexigram.resilience import CircuitBreaker, CircuitBreakerConfig
config = CircuitBreakerConfig(
failure_threshold=3,
recovery_timeout=30.0,
success_threshold=2,
)
breaker = CircuitBreaker(config=config)

Attributes: state: Current state of the circuit breaker (CLOSED, OPEN, HALF_OPEN). metrics: Current metrics including success/failure counts.

Example

Using as a decorator

from lexigram.resilience import circuit_breaker
@circuit_breaker(name="external-api", failure_threshold=5)
async def call_external_api():
response = await http_client.get("/api/data")
return response.json()

Using directly

from lexigram.resilience import CircuitBreaker, CircuitBreakerConfig
config = CircuitBreakerConfig(failure_threshold=3, recovery_timeout=30.0)
breaker = CircuitBreaker(config=config)
async with breaker:
result = await risky_operation()
# Or with fallback
async def fallback():
return cached_data
breaker = CircuitBreaker(config=CircuitBreakerConfig(), fallback=fallback)
result = await breaker.execute(risky_operation)
def __init__(
    config: CircuitBreakerConfig,
    fallback: Callable[Ellipsis, Any] | None = None,
    metrics_collector: MetricsRecorderProtocol | None = None,
    backend: Any | None = None
) -> None

Initialize the circuit breaker.

Parameters
ParameterTypeDescription
`config`CircuitBreakerConfigA CircuitBreakerConfig object (required).
`fallback`Callable[Ellipsis, Any] | NoneOptional callable to execute when the circuit is open.
`metrics_collector`MetricsRecorderProtocol | NoneOptional metrics collector.
`backend`Any | NoneOptional distributed state backend.
property state() -> CircuitState

Get current circuit state.

property metrics() -> CircuitBreakerMetrics

Get circuit breaker metrics.

def get_metrics() -> dict[str, Any]

Get metrics as a dictionary for backward compatibility.

async def execute(
    func: Callable[Ellipsis, Any],
    *args: Any,
    **kwargs: Any
) -> Any

Execute a callable through the circuit breaker.

Primary async call interface — accepts both sync and async callables and routes them through the internal circuit-state machine.

Note: typing is widened to Any to accommodate async callables (Callable[..., Awaitable[T]]) alongside plain sync ones.

async def call(
    func: Callable[Ellipsis, Any],
    *args: Any,
    **kwargs: Any
) -> Any

Alias for execute — satisfies CircuitBreakerProtocol.call.

async def protect() -> AsyncGenerator[Any, None]

Context manager to protect a code block.

def execute_sync(
    func: Callable[Ellipsis, T],
    *args: Any,
    **kwargs: Any
) -> T

Execute a sync function with circuit breaker protection.

def reset() -> None

Manually reset the circuit breaker to closed state.

def force_open() -> None

Manually force the circuit breaker to open state.

def sensitive(cls) -> CircuitBreaker

Trip quickly — for critical dependencies.

Opens after 3 consecutive failures or a 30 % failure rate. Waits only 30 s before probing recovery, and requires just 1 probe success to close again. Use for primary databases, auth services, or any dependency whose failure should be surfaced to callers as soon as possible.

def tolerant(cls) -> CircuitBreaker

Trip slowly — for non-critical or high-volume dependencies.

Opens only after 10 consecutive failures or a 70 % failure rate. Waits 120 s before probing recovery, and requires 5 consecutive probe successes before closing. Use for analytics sinks, notification services, or any call whose failures should not immediately surface to callers.


Configuration for circuit breaker.

Attributes: failure_threshold: Number of failures before opening the circuit. recovery_timeout: Seconds in open state before attempting half-open. expected_exception: Exception types that count as failures. success_threshold: Consecutive successes required to close from half-open. timeout: Per-call timeout in seconds. name: Human-readable identifier for this breaker instance. sliding_window_seconds: Window for computing the failure rate. failure_rate_threshold: Fraction of calls that must fail to trip. backend: State store backend for distributed circuit breaker coordination. "memory" (default) — in-process state, no coordination. "redis" — uses a CacheBackendProtocol resolved from the DI container. "consul" — uses Consul KV for cross-datacenter coordination.


Registry for managing named circuit breakers.
def __init__() -> None

Initialize the circuit breaker registry.

Creates an empty registry for managing named circuit breakers.

def set_metrics_collector(collector: MetricsRecorderProtocol) -> None

Set metrics collector for all breakers in registry.

def set_backend(backend: Any) -> None

Set distributed state backend for all current and future breakers.

Parameters
ParameterTypeDescription
`backend`AnyA CircuitBreakerBackend implementation enabling distributed state sharing across processes.
def get(name: str) -> CircuitBreaker | None

Get circuit breaker by name.

async def get_or_create(
    name: str,
    config: CircuitBreakerConfig | None = None
) -> CircuitBreaker

Get or create a named circuit breaker.

def reset_breaker(name: str) -> None

Reset a named circuit breaker to closed state.

def force_open_breaker(name: str) -> None

Force a named circuit breaker to open state.

def get_breaker_metrics(name: str) -> CircuitBreakerMetrics | None

Get metrics for a named circuit breaker.

def list_breakers() -> dict[str, dict[str, Any]]

List all circuit breakers with their state and metrics.

def cleanup() -> None

Clean up all circuit breakers.


Circuit breaker closed and restored to normal operation.

Payload fired when a circuit breaker transitions back to the CLOSED state.

Attributes: circuit_name: Name of the circuit breaker that closed.


Circuit breaker opened due to threshold breach.

Payload fired when a circuit breaker transitions to the OPEN state.

Attributes: circuit_name: Name of the circuit breaker that opened.


Circuit breaker states.

SQL-backed idempotency store using ``DatabaseProviderProtocol``.

Provides at-most-once execution guarantees backed by a relational database, suitable for deployments that do not have Redis but already run a SQL database.

The idempotency table is created automatically on first use (lazy initialisation). For production workloads consider creating the table via your migration pipeline using the DDL shown in the module docstring.

Parameters
ParameterTypeDescription
`db`Database provider resolved from the container.
`table`Name of the idempotency table. Override if the default name clashes with an existing table in your schema.
def __init__(
    db: DatabaseProviderProtocol,
    table: str = 'idempotency_keys'
) -> None

Create a database-backed idempotency store.

Parameters
ParameterTypeDescription
`db`DatabaseProviderProtocolDatabase provider resolved from the container.
`table`strName of the idempotency table.
async def get(key: str) -> Any | None

Retrieve a stored result by idempotency key.

Expired records are treated as non-existent.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
Returns
TypeDescription
Any | NoneThe stored result, or ``None`` if not found or expired.
async def set(
    key: str,
    value: Any,
    ttl: float | None = None
) -> None

Store a result with an optional TTL.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
`value`AnyThe result to store (must be JSON-serialisable).
`ttl`float | NoneTime-to-live in seconds, or ``None`` for no expiry.
async def delete(key: str) -> None

Remove an idempotency record by key.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to remove.
async def acquire(
    key: str,
    ttl: int
) -> bool

Atomically claim an idempotency key using INSERT ON CONFLICT DO NOTHING.

Issues an INSERT … ON CONFLICT (key) DO NOTHING which is atomic on all major RDBMS (PostgreSQL, SQLite, MySQL). Returns True only if a new row was inserted, meaning this caller won the race to claim the key. Returns False if another caller already holds the key.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to acquire.
`ttl`intTime-to-live in seconds for the claimed key.
Returns
TypeDescription
bool``True`` if the key was freshly claimed; ``False`` if already held.
async def purge_expired() -> None

Remove all records whose TTL has elapsed.

Call this periodically (e.g. from a scheduled task) to keep the table from growing unboundedly.

async def cleanup_expired() -> int

Remove all expired records and return the count purged.

Returns
TypeDescription
intNumber of expired rows removed (best-effort; may be ``0`` if the driver does not surface affected-row counts).

Distributed circuit breaker backend backed by a lexigram.contracts.StateStoreProtocol.

Stores serialized CircuitBreakerState as JSON so that all processes sharing the same StateStoreProtocol see a consistent view of each circuit breaker’s state.

Example

from lexigram.contracts import StateStoreProtocol
from lexigram.resilience.circuit.backend import DistributedCircuitBreakerBackend
backend = DistributedCircuitBreakerBackend(store=state_store)
breaker = CircuitBreaker(name="payments", backend=backend)
def __init__(
    store: StateStoreProtocol,
    ttl: int = 3600
) -> None

Initialise the backend.

Parameters
ParameterTypeDescription
`store`StateStoreProtocolA StateStoreProtocol implementation (e.g. the Redis-backed state store from ``lexigram-sql``).
`ttl`intTime-to-live (seconds) for each stored state entry. Defaults to 1 hour, ensuring stale entries are eventually cleaned up even if delete_state is never called.
async def get_state(name: str) -> CircuitBreakerState | None

Fetch circuit breaker state from the distributed store.

Parameters
ParameterTypeDescription
`name`strCircuit breaker name.
Returns
TypeDescription
CircuitBreakerState | NoneDeserialized state, or ``None`` if not found.
async def set_state(
    name: str,
    state: CircuitBreakerState
) -> None

Persist circuit breaker state to the distributed store.

Parameters
ParameterTypeDescription
`name`strCircuit breaker name.
`state`CircuitBreakerStateState to store.
async def delete_state(name: str) -> None

Remove circuit breaker state from the distributed store.

Parameters
ParameterTypeDescription
`name`strCircuit breaker name.

Sliding-window rate limiter that persists state in a StateStoreProtocol.

The window is defined by the most recent period seconds. Each successful acquisition records a timestamp; stale entries (older than period) are discarded on every read so the effective count always reflects only the live window.

Parameters
ParameterTypeDescription
`name`Logical name for this limiter; used as part of the cache key.
`max_calls`Maximum number of acquisitions permitted within ``period``.
`period`Window size in seconds (float).
`store`A StateStoreProtocol implementation used to persist window timestamps across replicas.
def __init__(
    name: str,
    max_calls: int,
    period: float,
    store: StateStoreProtocol
) -> None
async def try_acquire() -> bool

Attempt to consume one token without blocking.

Returns
TypeDescription
bool``True`` if the acquisition succeeded (count was under the limit); ``False`` if the limit is currently exhausted.
async def acquire() -> None

Block until one token is available and consume it.

Polls try_acquire using an exponential back-off that is capped at half the window period so callers do not spin excessively during sustained overload.

def get_stats() -> dict[str, Any]

Return current in-process limiter statistics.

Returns
TypeDescription
dict[str, Any]A plain mapping with keys ``total_requests``, ``allowed_requests``, ``denied_requests``, ``name``, ``max_calls``, and ``period``.

Note Statistics are local to the current process replica and are not aggregated across the distributed fleet.


DI provider that replaces the in-memory store with a Redis-backed store.

Registers RedisIdempotencyStore as the IdempotencyStoreProtocol factory, requiring a CacheBackendProtocol to be resolvable from the container.

This provider declares a dependency on ‘idempotency’ so the core provider runs first.

async def register(container: ContainerRegistrarProtocol) -> None

Override IdempotencyStoreProtocol with a RedisIdempotencyStore factory.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe DI container registrar.
async def boot(container: ContainerResolverProtocol) -> None

Verify that CacheBackendProtocol is available in the container.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container resolver.
Raises
ExceptionDescription
AppStartupErrorWhen CacheBackendProtocol cannot be resolved.
async def shutdown() -> None

No-op shutdown for the Redis-backed provider.


Idempotency subsystem configuration.

Controls both in-memory store behaviour (TTL, capacity, cleanup) and backing-store key conventions (prefix, max length).

Attributes: ttl: Time-to-live for cached results in seconds. max_entries: Maximum number of in-memory entries before FIFO eviction. cleanup_interval: Seconds between background expired-record sweeps. auto_cleanup: Whether to start the background cleanup task on init. key_prefix: Prefix prepended to every key in backing stores (e.g. Redis). max_key_length: Maximum allowed length for an idempotency key.


Idempotency key cache hit detected.

Framework-agnostic idempotency deduplication middleware.

Reads the Idempotency-Key HTTP header from an incoming request. When the key has been seen before (and the stored result has not expired), the original response is returned immediately without calling the downstream handler. When the key is new the handler is invoked, and its response is stored against the key for future duplicate requests.

Typical usage

store = InMemoryIdempotencyStore()
middleware = IdempotencyMiddleware(store=store)
# In a simplified ASGI handler or test:
response = await middleware.process(
headers={"idempotency-key": "req-abc-123"},
handler=my_handler,
)
Parameters
ParameterTypeDescription
`store`Backend used to persist idempotency records.
`config`Idempotency configuration (TTL, key prefix, etc.).
`header_name`HTTP header name to inspect (default: ``idempotency-key``).
def __init__(
    store: IdempotencyStoreProtocol,
    config: IdempotencyConfig | None = None,
    header_name: str = _IDEMPOTENCY_KEY_HEADER
) -> None
async def process(
    headers: dict[str, str],
    handler: Any,
    *args: Any,
    **kwargs: Any
) -> Any

Run the handler with idempotency deduplication.

When an Idempotency-Key header is present:

  • Cache hit — deserialise and return the stored response without invoking handler.
  • Cache miss — call handler, serialise the result, persist it, and return the result.

When the header is absent the handler is called directly (pass-through).

Parameters
ParameterTypeDescription
`headers`dict[str, str]Lower-cased request headers.
`handler`AnyAsync callable that produces the response. Called with ``(headers, *args, **kwargs)`` when no cache hit. *args: Forwarded to *handler* on cache miss. **kwargs: Forwarded to *handler* on cache miss.
Returns
TypeDescription
AnyThe response — either the freshly-computed one or the replayed stored result.

Self-contained module that wires up the idempotency store.

Usage

app = App(modules=[IdempotencyModule.configure()])

Or with a custom config

app = App(modules=[IdempotencyModule.configure(IdempotencyConfig(ttl=300))])
def configure(
    cls,
    config: IdempotencyConfig | None = None
) -> DynamicModule

Build a DynamicModule for the idempotency subsystem.

Parameters
ParameterTypeDescription
`config`IdempotencyConfig | NoneOptional idempotency configuration. When ``None`` the provider uses its defaults.
Returns
TypeDescription
DynamicModuleA DynamicModule that registers IdempotencyProvider and exports IdempotencyStoreProtocol.
Raises
ExceptionDescription
TypeErrorWhen *config* is not ``None`` and not an IdempotencyConfig.

DI provider for the in-memory idempotency store.

Registers InMemoryIdempotencyStore as the IdempotencyStoreProtocol singleton with optional capacity and cleanup configuration.

Parameters
ParameterTypeDescription
`config`Idempotency configuration. Defaults to IdempotencyConfig().
def __init__(config: IdempotencyConfig | None = None) -> None

Create the IdempotencyProvider.

Parameters
ParameterTypeDescription
`config`IdempotencyConfig | NoneIdempotency configuration. Defaults to IdempotencyConfig().
async def register(container: ContainerRegistrarProtocol) -> None

Bind IdempotencyStoreProtocol to an InMemoryIdempotencyStore singleton.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe DI container registrar.
async def boot(container: ContainerResolverProtocol) -> None

No boot-time work required for the in-memory store.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container resolver.
async def shutdown() -> None

Stop the background cleanup task if running.


Immutable snapshot of an idempotency entry.

Result of processing an idempotency-keyed request.

Attributes: is_duplicate: True if this request was already processed. key: The idempotency key. from_cache: True if the response was served from cache.


Lifecycle status for idempotency records.

In-memory backend for circuit breaker state.

Suitable for single-process deployments or testing. Note: Does not work across multiple processes/containers.

def __init__() -> None
async def get_state(name: str) -> CircuitBreakerState | None

Get circuit breaker state by name.

async def set_state(
    name: str,
    state: CircuitBreakerState
) -> None

Set circuit breaker state.

async def delete_state(name: str) -> None

Delete circuit breaker state.


In-memory implementation of IdempotencyStoreProtocol with TTL-based expiry.

Stores results in a dictionary keyed by idempotency key, with optional expiry timestamps. Suitable for development and testing.

def __init__(
    auto_cleanup_interval: float | None = None,
    max_entries: int | None = None
) -> None

Create an in-memory idempotency store.

Parameters
ParameterTypeDescription
`auto_cleanup_interval`float | NoneSeconds between automatic purges of expired entries. When set, a background task starts immediately.
`max_entries`int | NoneMaximum number of entries the store will hold. When the cap is reached, the *oldest* (first-inserted) unexpired entry is evicted before the new one is stored. ``None`` means the store grows without bound.
async def get(key: str) -> Any | None

Retrieve a stored result, returning None if expired or missing.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
Returns
TypeDescription
Any | NoneThe stored result, or None if not found or expired.
async def set(
    key: str,
    value: Any,
    ttl: float | None = None
) -> None

Store a result with optional TTL.

If max_entries was configured and storing key would exceed the cap, the oldest entry is evicted first (FIFO order). Updating an existing key never triggers eviction.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
`value`AnyThe result value to cache.
`ttl`float | NoneTime-to-live in seconds, or None for no expiry.
async def delete(key: str) -> None

Remove a stored idempotency record.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to remove.
async def acquire(
    key: str,
    ttl: int
) -> bool

Atomically claim an idempotency key if not already held.

Uses a per-key asyncio.Lock to guarantee in-process atomicity. Returns True if this coroutine was the first to claim the key and should proceed with execution; False if the key already existed.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to acquire.
`ttl`intTime-to-live in seconds for the claimed key.
Returns
TypeDescription
bool``True`` if the key was freshly claimed; ``False`` if already held.
def clear() -> None

Remove all stored idempotency records.

async def get_record(key: str) -> IdempotencyRecord | None

Retrieve the full idempotency record for key, or None.

Returns an IdempotencyRecord with a snapshot of the stored entry’s value, creation time, expiry, and status. Returns None when the key is absent or expired.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to look up.
Returns
TypeDescription
IdempotencyRecord | NoneAn ``IdempotencyRecord`` if the key exists and has not expired, otherwise ``None``.
async def start_auto_cleanup(interval: float | None = None) -> None

Enable periodic expiry of old entries.

Parameters
ParameterTypeDescription
`interval`float | NoneOptional override for the cleanup interval in seconds. If not provided, the value passed to the constructor is used.
async def stop_auto_cleanup() -> None

Cancel the background cleanup task.

property size() -> int

Return the number of stored idempotency records (including expired).

Returns
TypeDescription
intThe total entry count.
async def has(key: str) -> bool

Check whether a non-expired entry exists for the given key.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
Returns
TypeDescription
boolTrue if the key is present and not expired.
async def cleanup_expired() -> int

Remove all expired entries and return how many were purged.

Returns
TypeDescription
intThe number of entries removed.
def get_stats() -> dict[str, int]

Return basic statistics about the store.

Returns
TypeDescription
dict[str, int]A dict with ``total`` (all entries), ``active`` (non-expired), and ``expired`` counts.

Rate limiter using token bucket algorithm.
def __init__(
    rate: int,
    per: float = 1.0,
    burst: int | None = None
) -> None
async def acquire() -> None

Acquire permission to proceed, blocking if rate exceeded.

async def try_acquire() -> bool

Try to acquire permission without blocking.

def get_stats() -> dict[str, Any]

Get rate limiter statistics as a plain mapping.

Returns
TypeDescription
Dictionary with counterstotal_requests, allowed_requests, denied_requests, total_wait_time.

Idempotency store backed by a ``CacheBackendProtocol`` (e.g. Redis).

Delegates all storage operations to the provided CacheBackendProtocol, enabling distributed deduplication across multiple application instances. The backend is injected by the DI container, keeping this class fully decoupled from any specific cache implementation.

Parameters
ParameterTypeDescription
`cache`The cache backend used for key storage.
`key_prefix`Optional prefix applied to all idempotency keys to avoid collisions with other cache consumers.
def __init__(
    cache: CacheBackendProtocol,
    key_prefix: str = 'idempotency:'
) -> None

Create a Redis-backed idempotency store.

Parameters
ParameterTypeDescription
`cache`CacheBackendProtocolThe cache backend resolved from the container.
`key_prefix`strPrefix prepended to every key in the cache.
async def get(key: str) -> Any | None

Retrieve a stored result by idempotency key.

Returns None for both missing keys and keys claimed but not yet resolved (sentinel "__pending__" entries).

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
Returns
TypeDescription
Any | NoneThe stored result, or ``None`` if not found or still pending.
async def get_record(key: str) -> Any

Retrieve the full idempotency record, including metadata.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
Returns
TypeDescription
AnyResult containing IdempotencyRecord or None if missing.
async def set(
    key: str,
    value: Any,
    ttl: float | None = None
) -> None

Store a result with an optional TTL.

The float TTL from the IdempotencyStoreProtocol protocol is rounded up to the nearest second for the underlying cache backend.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
`value`AnyThe result to store.
`ttl`float | NoneTime-to-live in seconds, or ``None`` for no expiry.
async def delete(key: str) -> None

Remove an idempotency record by key.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to remove.
async def acquire(
    key: str,
    ttl: int
) -> bool

Atomically claim an idempotency key using cache SET-NX semantics.

Stores a "__pending__" sentinel so that cross-process races can be detected via get. CacheBackendProtocol implementations that route to Redis will naturally provide atomic SET key NX EX ttl semantics when the backing library (e.g. aioredis) supports it.

Note

If the CacheBackendProtocol implementation does not support atomic NX, a narrow TOCTOU window remains. Use a CacheBackendProtocol backed by Redis with NX support for fully distributed safety.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to acquire.
`ttl`intTime-to-live in seconds for the claimed key.
Returns
TypeDescription
bool``True`` if the key was freshly claimed; ``False`` if already held.
async def cleanup_expired() -> int

No-op for Redis-backed store — Redis TTLs expire entries natively.

Redis automatically evicts keys whose TTL has elapsed, so explicit cleanup is unnecessary. This method exists for API parity with the in-memory store.

Returns
TypeDescription
intAlways ``0`` since no active eviction is performed.

Circuit breakers, retry policies, bulkhead isolation, throttling, and rate limiting.

Call configure to configure the resilience subsystem with custom settings.

Usage

from lexigram.resilience.config import ResilienceConfig
@module(
imports=[ResilienceModule.configure(ResilienceConfig(...))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: Any | None = None
) -> DynamicModule

Create a ResilienceModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneResilienceConfig or ``None`` for framework defaults.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(cls) -> DynamicModule

Return a ResilienceModule with framework defaults for testing.

All circuit breakers and retry policies use default configuration. Suitable for unit and integration tests.

Returns
TypeDescription
DynamicModuleA DynamicModule with default resilience configuration.

Pipeline combining multiple resilience patterns.
def __init__(
    retry_config: RetryConfig | None = None,
    circuit_config: CircuitBreakerConfig | None = None,
    bulkhead_config: BulkheadConfig | None = None,
    timeout_config: TimeoutConfig | None = None,
    *,
    order: list[str] | tuple[str, Ellipsis] | None = None
) -> None

Initialize a resilience pipeline.

Parameters
ParameterTypeDescription
`retry_config`RetryConfig | NoneConfiguration for retry behaviour. When provided, a RetryPolicy will be created and applied when ``execute`` is invoked.
`circuit_config`CircuitBreakerConfig | NoneConfiguration for the circuit breaker. When provided, a CircuitBreaker will be created.
`bulkhead_config`BulkheadConfig | NoneConfiguration for the bulkhead limiter. When provided, a Bulkhead will be created.
`timeout_config`TimeoutConfig | NoneOptional timeout configuration. If set and the ``timeout`` step appears in ``order``, the wrapped function will execute within a timeout_context.
`order`list[str] | tuple[str, Ellipsis] | NoneOptional sequence defining the execution order of the resilience steps. Valid names are ``"bulkhead"``, ``"circuit_breaker"``, ``"retry"`` and ``"timeout"``. The first element in the list is the outermost wrapper. If ``None`` (the default) the canonical order ``["bulkhead", "circuit_breaker", "retry", "timeout"]`` is used. This parameter enables scenarios such as applying timeout *outside* of retries or placing the bulkhead at any position in the pipeline.
def add(pattern: Any) -> ResiliencePipeline

Add a resilience pattern to the pipeline.

Note: This is currently a type-hint compatibility method to satisfy the ResiliencePipelineProtocol. Parameters should ideally be set via constructor for full pipeline ordering.

async def execute(
    func: Callable[Ellipsis, Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute function with all configured resilience patterns.


Resilience patterns DI provider.

Registers circuit breaker, retry, bulkhead, throttle, and rate-limiter infrastructure into the container as singletons.

def __init__() -> None
async def register(container: ContainerRegistrarProtocol) -> None

Register resilience services and configurations.

async def boot(container: BootContainerProtocol) -> None

Initialize resilience patterns on application boot.

async def shutdown() -> None

Cleanup resilience patterns on shutdown.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report resilience health.


Status enum for Resilience.

Payload fired on each retry attempt after an operation failure.

Attributes: operation: Name or label of the operation being retried. attempt: Attempt number (1-based).


Configuration for retry policy.

Retry strategy exhausted all attempts.

Policy wrapper for retries, used by the resilience pipeline.
def __init__(config: RetryConfig) -> None
async def execute(
    func: Callable[Ellipsis, Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute an async function with retries.

def aggressive(cls) -> RetryPolicy

Retry quickly — for fast, idempotent operations.

5 attempts with a short 100 ms base delay, 1.5× backoff, and a 2 s cap. Use for lightweight read operations, DNS lookups, or any call that is cheap to repeat and very unlikely to have side-effects.

def conservative(cls) -> RetryPolicy

Retry slowly — for expensive or non-idempotent operations.

3 attempts with a 2 s base delay, 2× backoff, and a 30 s cap. Use for database writes, payment APIs, or any call where rapid retrying could worsen the situation.


Sliding window rate limiter for precise request tracking.
def __init__(
    window_size: float,
    max_requests: int
) -> None
async def acquire() -> None

Acquire permission to proceed, blocking if window is full.

async def try_acquire() -> bool

Try to acquire permission without blocking.

def get_stats() -> RateLimiterStats

Get rate limiter statistics.

property current_requests() -> int

Get current number of requests in the window.


Class-based throttler for more control over rate limiting.

Provides a Throttler instance that can be used to throttle multiple functions or to dynamically control throttling behavior.

Example

Using Throttler class

throttler = Throttler(calls=5, period=1.0)
@throttler.throttle
async def limited_func():
...
# Or use directly
await throttler.acquire()
def __init__(
    calls: int,
    period: float,
    *,
    burst: int | None = None,
    strategy: str = 'token_bucket',
    metrics: MetricsRecorderProtocol | None = None
) -> None

Initialize the throttler.

Parameters
ParameterTypeDescription
`calls`intMaximum number of calls allowed within the period.
`period`floatTime period in seconds.
`burst`int | NoneMaximum burst size for token bucket strategy.
`strategy`strRate limiting strategy - "token_bucket" or "sliding_window".
`metrics`MetricsRecorderProtocol | NoneOptional MetricsRecorderProtocol for emitting ``throttle.allowed`` and ``throttle.denied`` counters. When provided, every acquire and try_acquire call records its outcome so telemetry dashboards can track throttle activity without polling get_stats.
def throttle(func: Callable[Ellipsis, Any]) -> Callable[Ellipsis, Any]

Decorator to throttle a function using this Throttler.

Parameters
ParameterTypeDescription
`func`Callable[Ellipsis, Any]The function to throttle.
Returns
TypeDescription
Callable[Ellipsis, Any]The wrapped function with throttling applied.

Example

throttler = Throttler(calls=10, period=1.0)

@throttler.throttle … async def limited_api_call(): … return await fetch_data()

async def acquire() -> None

Acquire permission to proceed, blocking if rate exceeded.

Emits throttle.allowed via the MetricsRecorderProtocol when one was provided at construction time.

Example

throttler = Throttler(calls=10, period=1.0) await throttler.acquire()

async def try_acquire() -> bool

Try to acquire permission without blocking.

Emits throttle.allowed or throttle.denied via the MetricsRecorderProtocol when one was provided at construction time.

Returns
TypeDescription
boolTrue if permission was granted, False if rate limit exceeded.

Example

throttler = Throttler(calls=10, period=1.0) if await throttler.try_acquire(): … # Proceed with operation

def get_stats() -> dict

Get throttler statistics.

Returns
TypeDescription
dictDictionary with throttle statistics.

Example

from lexigram.logging import get_logger logger = get_logger(name) throttler = Throttler(calls=10, period=1.0) stats = throttler.get_stats() logger.info(“stats”, allowed=stats[‘allowed_requests’])


Configuration for timeout handling.

Named timeout registry with per-operation configuration.

A central registry that maps operation names to timeout durations. Unknown operations fall back to the default timeout. Use configure to set per-operation overrides before running operations.

Example

manager = TimeoutManager(default_seconds=10.0)
manager.configure("send_email", seconds=5.0)
manager.configure("run_report", seconds=120.0)
result = await manager.run("send_email", send_email(user))
def __init__(default_seconds: float = 30.0) -> None

Initialise the manager with a default timeout.

Parameters
ParameterTypeDescription
`default_seconds`floatFallback timeout (in seconds) applied to any operation that has not been given an explicit override. Must be positive.
Raises
ExceptionDescription
ValueErrorIf *default_seconds* is not positive.
def configure(
    operation: str,
    seconds: float
) -> None

Register a per-operation timeout override.

Parameters
ParameterTypeDescription
`operation`strA string key identifying the operation.
`seconds`floatTimeout duration in seconds. Must be positive.
Raises
ExceptionDescription
ValueErrorIf *seconds* is not positive.
def get_timeout(operation: str) -> float

Return the effective timeout for operation.

Returns the per-operation override if one has been configured, otherwise the manager’s default.

Parameters
ParameterTypeDescription
`operation`strThe operation name to look up.
Returns
TypeDescription
floatTimeout duration in seconds.
async def run(
    operation: str,
    coro: Awaitable[T]
) -> T

Execute coro under the timeout configured for operation.

Parameters
ParameterTypeDescription
`operation`strThe operation name used to look up the timeout.
`coro`Awaitable[T]The awaitable to run.
Returns
TypeDescription
TThe value produced by *coro*.
Raises
ExceptionDescription
ResilienceTimeoutErrorIf *coro* does not complete within the configured timeout.

def bulkhead(config: BulkheadConfig) -> Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]]

Decorator to apply bulkhead pattern.

Limits the number of concurrent executions of the decorated function. Auto-detects if the decorated function is sync or async and handles accordingly.

Parameters
ParameterTypeDescription
`config`BulkheadConfigBulkhead configuration specifying concurrency limits.
Returns
TypeDescription
Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]]A decorator that wraps the function with bulkhead protection.

Example

from lexigram.resilience import bulkhead, BulkheadConfig

config = BulkheadConfig(max_concurrent=5) @bulkhead(config) async def api_call(): …


def circuit_breaker(
    name: str,
    registry: CircuitBreakerRegistry,
    config: CircuitBreakerConfig | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Awaitable[T]]]

Decorator to apply circuit breaker protection to an async function.

The registry must be provided explicitly to avoid global state.

Parameters
ParameterTypeDescription
`name`strName of the circuit breaker (must match registry).
`registry`CircuitBreakerRegistryCircuitBreakerRegistry instance for managing breakers.
`config`CircuitBreakerConfig | NoneOptional configuration for the circuit breaker.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Awaitable[T]]]A decorator that wraps the async function with circuit breaker protection.

Example

from lexigram.resilience import circuit_breaker, CircuitBreakerRegistry

registry = CircuitBreakerRegistry() @circuit_breaker(“api”, registry) async def api_call(): …


def circuit_breaker_sync(
    name: str,
    registry: CircuitBreakerRegistry,
    config: CircuitBreakerConfig | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, T]]

Decorator to apply circuit breaker protection to a sync function.

The registry must be provided explicitly to avoid global state.

Note

This is for sync functions. For async functions, use circuit_breaker.

Parameters
ParameterTypeDescription
`name`strName of the circuit breaker (must match registry).
`registry`CircuitBreakerRegistryCircuitBreakerRegistry instance for managing breakers.
`config`CircuitBreakerConfig | NoneOptional configuration for the circuit breaker.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, T]]A decorator that wraps the sync function with circuit breaker protection.

def get_throttle_stats(func: Callable[Ellipsis, Any]) -> dict[str, float | int] | None

Get throttle statistics for a throttled function.

Resolves statistics from the limiter attached to the function wrapper.


def idempotent(
    store: IdempotencyStoreProtocol,
    *,
    key_func: Callable[Ellipsis, str] | None = None,
    ttl: float | None = 3600.0
) -> Callable[Ellipsis, Any]

Decorator that ensures an async function executes at most once per key.

The idempotency key is derived from function arguments by default, or from a custom key_func. Results are cached in the provided store.

Parameters
ParameterTypeDescription
`store`IdempotencyStoreProtocolThe IdempotencyStoreProtocol backend for caching results.
`key_func`Callable[Ellipsis, str] | NoneOptional callable that generates a key string from (*args, **kwargs). If None, a deterministic hash of the arguments is used.
`ttl`float | NoneTime-to-live in seconds for cached results. Defaults to 3600 (1 hour).
Returns
TypeDescription
Callable[Ellipsis, Any]A decorator that wraps async functions with idempotency logic.

Example

from lexigram.resilience import idempotent, InMemoryIdempotencyStore

store = InMemoryIdempotencyStore()

@idempotent(store, ttl=60.0) async def create_order(order_id: str, amount: float) -> dict: return {“id”: order_id, “amount”: amount}


def retry(
    func_or_config: Callable[Ellipsis, Awaitable[T] | T] | RetryConfig,
    *args: Any,
    config: RetryConfig | None = None,
    **kwargs: Any
) -> Any

Unified retry API for sync and async callables.

Decorator mode (pass RetryConfig first)

config = RetryConfig(max_attempts=3)
@retry(config)
async def fetch_data() -> bytes: ...

Direct-call mode

result = await retry(fetch_data, config=config)
Parameters
ParameterTypeDescription
`func_or_config`Callable[Ellipsis, Awaitable[T] | T] | RetryConfigA RetryConfig (decorator mode) or a callable to invoke with retries (direct-call mode). *args: Positional arguments forwarded to the callable in direct-call mode.
`config`RetryConfig | NoneRequired in direct-call mode. Ignored in decorator mode. **kwargs: Keyword arguments forwarded to the callable in direct-call mode.
Returns
TypeDescription
In decorator modea decorator wrapping a callable with retry logic. In direct-call mode: the return value of the callable.

def throttle(
    calls: int,
    period: float,
    *,
    burst: int | None = None,
    strategy: str = 'token_bucket',
    key: str | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to throttle function calls.

Resolves the ThrottleRegistry from the DI container context.


def with_timeout(config: TimeoutConfig) -> Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]]

Decorator to apply timeout to a function.

Auto-detects if the decorated function is sync or async and handles accordingly.

Parameters
ParameterTypeDescription
`config`TimeoutConfigTimeout configuration specifying duration and behavior.
Returns
TypeDescription
Callable[[Callable[Ellipsis, T]], Callable[Ellipsis, T]]A decorator that wraps the function with a timeout guard.

Example

from lexigram.resilience import with_timeout, TimeoutConfig

config = TimeoutConfig(timeout=5.0) @with_timeout(config) async def slow_operation(): …


Bulkhead/rejection error.
def __init__(
    message: str = 'Bulkhead rejected',
    **kwargs: Any
) -> None

Bulkhead rejected due to capacity limits.
def __init__(
    message: str = 'Bulkhead capacity exceeded',
    **kwargs: Any
) -> None

Circuit breaker error.
def __init__(
    message: str = 'Circuit breaker error',
    **kwargs: Any
) -> None

Circuit breaker is open.
def __init__(
    message: str = 'Circuit breaker is open',
    **kwargs: Any
) -> None

Raised when an idempotent key is reused before completion.
def __init__(
    key: str,
    message: str | None = None
) -> None

Raised when a Redis or database backend connection fails.

This exception is raised when the underlying storage backend (Redis, PostgreSQL, etc.) is unavailable or returns an unexpected error during an idempotency store operation.

Attributes: _code: Machine-readable error code LEX_ERR_IDEM_005.


Raised when the idempotency store is misconfigured.

This exception is raised when required configuration values are missing, invalid, or mutually incompatible — for example a negative TTL, an unknown backend type, or a missing connection URL.

Attributes: _code: Machine-readable error code LEX_ERR_IDEM_006.


Raised when an idempotency key is already claimed (PENDING).

Base exception for idempotency flows.

Raised on unexpected failures inside the idempotency store.

Base resilience error.
def __init__(
    message: str = 'Resilience error',
    **kwargs: Any
) -> None

Resilience operation timed out.

Inherits from both ResilienceError and built-in TimeoutError so that isinstance(e, TimeoutError) is True, enabling compatibility with asyncio timeout handling.

def __init__(
    message: str = 'Operation timed out',
    **kwargs: Any
) -> None

Retry operation error.
def __init__(
    message: str = 'Retry error',
    **kwargs: Any
) -> None

All retry attempts exhausted.
def __init__(
    message: str = 'All retry attempts exhausted',
    attempts: int = 0,
    last_error: BaseException | None = None,
    **kwargs: Any
) -> None