Skip to content
GitHub

Guide

PackageRequiredPurpose
lexigramYesCore framework
lexigram-contractsYesProtocol definitions

External calls (APIs, databases, services) fail intermittently, slow down, or go down entirely. Without protection, a single failing dependency cascades — consuming threads, exhausting connections, and degrading your entire application.

lexigram-resilience provides proven fault-tolerance patterns to make your application resilient to failure.

Each pattern addresses a specific failure mode:

PatternFailure ModeEffect
Circuit BreakerCascading failuresStops calls when a service is down. Throws CircuitOpenError.
RetryTransient errorsRe-executes with exponential backoff and jitter.
BulkheadResource exhaustionIsolates calls into semaphore-guarded pools.
TimeoutSlow responsesCancels operations that exceed a budget. Throws ResilienceTimeoutError.
Rate LimiterOverloadLimits request rate with a token-bucket algorithm.
ThrottleBurst trafficSmooths request flow over a sliding window.

Patterns compose via ResiliencePipeline. You can wire the pipeline through DI to avoid hardcoding any resilience logic in your services.

Tracks failures per named breaker. When the failure_threshold is reached the circuit opens — subsequent calls fail fast. After recovery_timeout seconds the circuit transitions to half-open for a single probe call. Success closes it; failure keeps it open.

from lexigram.resilience import CircuitBreaker, CircuitBreakerConfig
from lexigram.resilience.circuit import CircuitState
config = CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0)
breaker = CircuitBreaker(config)
async with breaker.protect():
result = await risky_api_call()
# Or use the decorator with a registry:
from lexigram.resilience import circuit_breaker, CircuitBreakerRegistry
registry = CircuitBreakerRegistry()
@circuit_breaker("api", registry)
async def call_api() -> dict:
...

Re-executes with configurable backoff. Retryable exceptions are specified via retry_on; abort_on excludes non-retryable errors. Jitter prevents thundering herds.

from lexigram.resilience import retry, RetryConfig
from lexigram.contracts.exceptions import DomainError
cfg = RetryConfig(max_attempts=3, base_delay=1.0, jitter=True)
@retry(cfg)
async def fetch_payment(id: str) -> dict:
...

Use abort_on to skip retries for non-retryable errors:

cfg = RetryConfig(
max_attempts=3,
abort_on=(ValueError, PermissionError),
)

Limits concurrency with a semaphore. When max_concurrent is exceeded, calls queue (up to queue_size) or are rejected.

from lexigram.resilience import bulkhead, BulkheadConfig
@bulkhead(BulkheadConfig(max_concurrent=5, queue_size=50))
async def db_query(query: str) -> list[dict]:
...

Cancels operations exceeding a duration. Powered by asyncio.timeout.

from lexigram.resilience import with_timeout, TimeoutConfig
@with_timeout(TimeoutConfig(timeout=5.0))
async def fetch_data() -> dict:
...

Combines multiple patterns in a configurable order. Default order: bulkhead → circuit breaker → retry → timeout.

from lexigram.resilience import ResiliencePipeline
from lexigram.contracts.infra.resilience import (
RetryConfig, CircuitBreakerConfig, TimeoutConfig,
)
pipeline = ResiliencePipeline(
retry_config=RetryConfig(max_attempts=3),
circuit_config=CircuitBreakerConfig(failure_threshold=5),
timeout_config=TimeoutConfig(timeout=10.0),
order=["circuit_breaker", "retry", "timeout"], # no bulkhead
)
result = await pipeline.execute(my_function, arg1, arg2)

Accept ResiliencePipelineFactoryProtocol | None in your service constructors for testable resilience:

from lexigram.contracts.infra.resilience import ResiliencePipelineFactoryProtocol
class PaymentService:
def __init__(self, pipeline_factory: ResiliencePipelineFactoryProtocol | None = None) -> None:
self._factory = pipeline_factory

Token-bucket rate limiter that blocks until a token is available:

from lexigram.resilience import RateLimiter
limiter = RateLimiter(rate=100) # 100 tokens/sec
async def handle_request() -> None:
await limiter.acquire()
...

Sliding-window throttler with configurable max requests per window:

from lexigram.resilience import throttle, get_throttle_stats
@throttle(max_requests=100, window_seconds=60.0)
async def api_handler() -> dict:
...

Guarantees at-most-once execution. Results are cached in a pluggable store (in-memory, Redis, SQL).

from lexigram.resilience import idempotent, InMemoryIdempotencyStore
store = InMemoryIdempotencyStore()
@idempotent(store, ttl=3600.0)
async def create_order(order_id: str) -> dict:
...
  • Circuit breaker every downstream. Protect each outbound HTTP call, database query, and queue publish with a named breaker.
  • Retry transient failures only. Use abort_on=() to exclude auth and validation errors from retry.
  • Bulkhead resource pools. Isolate database connections from API calls so a slow API does not starve the database.
  • Set timeouts everywhere. An unbounded timeout is an unlimited resource leak.
  • Wire via DI. Accept ResiliencePipelineFactoryProtocol | None in constructors — test mocks can disable resilience.