Skip to content
GitHubDiscord

API Reference

Protocol for cache backend implementations.

This interface defines the contract that all cache backend implementations must follow. It provides a unified API for different storage mechanisms (memory, Redis, Memcached, etc.).

Example

class RedisBackend:
async def get(self, key: str) -> Any | None:
data = await self._redis.get(key)
return self._serializer.deserialize(data) if data else None
async def set(self, key: str, value: Any, ttl: int | None = None) -> bool:
data = self._serializer.serialize(value)
await self._redis.set(key, data, ex=ttl)
return True
async def get(key: str) -> Result[Any | None, CacheError]

Get a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to retrieve.
Returns
TypeDescription
Result[Any | None, CacheError]Ok(value) if found, Ok(None) if not found, Err(CacheError) on failure.

Note

The return type is Any by design: the cache is a heterogeneous store and the caller knows the expected type. Use cast(T, result.unwrap()) at call sites for type safety.

async def set(
    key: str,
    value: Any,
    ttl: int | None = None
) -> Result[None, CacheError]

Set a value in the cache with optional TTL.

Parameters
ParameterTypeDescription
`key`strThe cache key to set.
`value`AnyThe value to cache.
`ttl`int | NoneTime to live in seconds (optional).
Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def delete(key: str) -> Result[bool, CacheError]

Delete a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to delete.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if deleted, Ok(False) if not found, Err(CacheError) on failure.
async def delete_many(keys: list[str]) -> Result[int, CacheError]

Delete multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to delete.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def delete_pattern(pattern: str) -> Result[int, CacheError]

Delete all keys matching a glob-style pattern.

Parameters
ParameterTypeDescription
`pattern`strGlob pattern (e.g. ``"pet:list:*"``). Supports ``*`` as a wildcard matching any sequence of characters.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def exists(key: str) -> Result[bool, CacheError]

Check if a key exists in the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to check.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if exists, Ok(False) otherwise, Err(CacheError) on failure.
async def clear() -> Result[None, CacheError]

Clear all values from the cache.

Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def get_many(keys: list[str]) -> Result[dict[str, Any], CacheError]

Get multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to retrieve.
Returns
TypeDescription
Result[dict[str, Any], CacheError]Ok(dict) mapping found keys to values, Err(CacheError) on failure.
async def set_many(
    items: dict[str, Any],
    ttl: int | None = None
) -> Result[None, CacheError]

Set multiple values in the cache.

Parameters
ParameterTypeDescription
`items`dict[str, Any]Dictionary of key-value pairs to cache.
`ttl`int | NoneTime to live in seconds for all items.
Returns
TypeDescription
Result[None, CacheError]Ok(None) if all items set successfully, Err(CacheError) on failure.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a health check on the cache backend.

Returns
TypeDescription
HealthCheckResultStructured HealthCheckResult.

Injectable time source for the framework.
def now() -> datetime

Return the current timezone-aware UTC time.

def monotonic() -> float

Return a monotonic elapsed-time counter.

def timestamp() -> float

Return the current Unix timestamp in UTC seconds.

def time() -> float

Backward-compatible alias for timestamp.


Protocol for configuration access across the framework.

Any configuration object (BaseConfig, LexigramConfig, or custom implementations) can satisfy this protocol by implementing get(), get_section(), and has_section().

Example

config = container.resolve(ConfigProtocol)
db_url = config.get("database.url", "sqlite:///default.db")
logging = config.get_section("logging", LoggingConfig)
property environment() -> Environment

The active deployment environment.

property is_production() -> bool

Return True when the active environment is production.

property is_development() -> bool

Return True when the active environment is development.

property is_testing() -> bool

Return True when the active environment is testing.

property is_staging() -> bool

Return True when the active environment is staging.

property is_debug() -> bool

Return True when debug mode is enabled.

def get(
    key: str,
    default: Any = None
) -> Any

Get a configuration value by dot-notation key.

Parameters
ParameterTypeDescription
`key`strConfiguration key (e.g. ``"app.name"`` or ``"database.url"``).
`default`AnyValue returned when the key is not found.
Returns
TypeDescription
AnyThe configuration value, or *default* if not found.
def get_section(
    name: str,
    model_cls: type[T] | None = None
) -> T | dict[str, Any]

Get a typed configuration section.

Parameters
ParameterTypeDescription
`name`strSection name (e.g. ``"logging"``).
`model_cls`type[T] | NoneOptional model class to coerce the section into.
Returns
TypeDescription
T | dict[str, Any]A model instance when *model_cls* is provided, otherwise a raw dict or the attribute value.
def has_section(name: str) -> bool

Check whether a configuration section exists.

Parameters
ParameterTypeDescription
`name`strSection name to check.
Returns
TypeDescription
boolTrue if the section is present.

Protocol for event bus middleware.

Middleware intercepts event publication, allowing cross-cutting concerns like logging, metrics, or error handling to be applied transparently.

The middleware receives the event and a next_handler coroutine that invokes the next middleware (or the actual handlers). The middleware must call next_handler to continue the chain.

Example

class LoggingMiddleware:
async def __call__(self, event: Any, next_handler: Any) -> None:
logger.info("publishing", event_type=type(event).__name__)
await next_handler(event)
logger.info("published", event_type=type(event).__name__)

Structural protocol for hook registries.

Defines the contract for action hooks (fire-and-forget, error-isolated) and filter hooks (value-pipeline, error-propagating) with priority ordering.

def register_action(
    hook_name: str,
    handler: Callable[Ellipsis, Any],
    priority: int = 100,
    *,
    once: bool = False
) -> None

Register an action handler for the given hook.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`handler`Callable[Ellipsis, Any]Callable to invoke. Can be sync or async.
`priority`intLower values run first. Defaults to 100.
`once`boolIf True, auto-remove handler after first invocation.
def register_filter(
    hook_name: str,
    handler: Callable[Ellipsis, Any],
    priority: int = 100,
    *,
    once: bool = False
) -> None

Register a filter handler for the given hook.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`handler`Callable[Ellipsis, Any]Callable that receives and returns a value.
`priority`intLower values run first. Defaults to 100.
`once`boolIf True, auto-remove handler after first invocation.
def unregister_action(
    hook_name: str,
    handler: Callable[Ellipsis, Any]
) -> bool

Remove an action handler. Returns True if found and removed.

def unregister_filter(
    hook_name: str,
    handler: Callable[Ellipsis, Any]
) -> bool

Remove a filter handler. Returns True if found and removed.

async def call_action(
    hook_name: str,
    **kwargs: Any
) -> None

Invoke all action handlers. Errors are isolated and logged.

async def apply_filter(
    hook_name: str,
    value: Any,
    **kwargs: Any
) -> Any

Pass value through all filter handlers. Errors propagate.

def has_action(hook_name: str) -> bool

Check if any action handlers are registered.

def has_filter(hook_name: str) -> bool

Check if any filter handlers are registered.

def clear(hook_name: str | None = None) -> None

Clear handlers for a specific hook, or all hooks.


Injectable identity generator.
property strategy() -> IdStrategy

Return the active ID generation strategy.

def generate() -> str

Generate a new unique identifier.

def generate_for(entity_type: str) -> str

Generate a new identifier for a specific entity type.


Base class for all fluent builders in Lexigram.

Provides foundational support for state management and fluent chaining.

def __init__() -> None

Initialize builder state.

def copy() -> Self

Return a deep copy of the builder state.

Useful for creating slightly different variations of an object without rebuilding from scratch.

def build() -> T

Construct the final object from the builder state.

Returns
TypeDescription
TThe constructed object of type T.

Application lifecycle states.

The composition root.

Usage

async with Application.boot(providers=[MyProvider()]) as app:
# resolve Invoker from the container for entry-point injection
from lexigram.app.invoker import Invoker
invoker = await app.container.resolve(Invoker)
await invoker.invoke(main)

Or for long-running processes

app = Application()
app.add_provider(MyProvider())
from lexigram.app import run_application
await run_application(app)
def __init__(
    name: str = 'lexigram-app',
    config: LexigramConfig | None = None
) -> None
property state() -> AppState

Current application state.

property is_running() -> bool

Check if application is running.

property logger() -> LoggerProtocol

Get the application logger.

property config() -> LexigramConfig

Application configuration.

property container() -> Container

DI container.

property providers() -> list[Provider]

Get all registered providers.

def add_middleware(middleware: MiddlewareProtocol[Any, Any]) -> None

Add a middleware to the application.

def add_provider(provider: Provider) -> None

Add a provider to be orchestrated.

Must be called before start().

def add_providers(providers: list[Provider]) -> None

Add multiple providers. Must be called before start().

def discover_providers(*packages: str) -> None

Discover and add Provider subclasses from packages.

Scans each package recursively for Provider subclasses with a no-argument constructor and registers them.

Also discovers @injectable / @singleton decorated classes and schedules them for auto-registration into the container during boot.

Convention: place providers in <module>/provider.py or <module>/providers/ — any file is scanned.

Parameters
ParameterTypeDescription

Example

app.discover_providers("my_platform.modules", "my_platform.infrastructure")
def add_module(module: type | Any) -> None

Add a module to the application.

Modules organize providers and define visibility boundaries. Must be called before start().

def add_modules(modules: list[type | Any]) -> None

Add multiple modules. Must be called before start().

def discover_modules(
    *,
    entry_point_group: str = 'lexigram.modules',
    directories: list[str | Path] | None = None,
    enabled: list[str] | None = None,
    disabled: list[str] | None = None
) -> None

Discover Module classes and register them with this application.

Scans importlib.metadata entry points and optionally filesystem directories. Each discovered Module is passed to add_module(), ensuring it flows through CompiledModuleGraph during start().

Parameters
ParameterTypeDescription
`entry_point_group`strEntry-point group to scan. Default "lexigram.modules".
`directories`list[str | Path] | NoneExtra filesystem directories to scan.
`enabled`list[str] | NoneAllowlist of module names (empty = all allowed).
`disabled`list[str] | NoneDenylist of module names to skip.

Example

app.discover_modules()
app.discover_modules(directories=["./plugins"], disabled=["dev-only"])
async def start() -> None

Boot all providers and start the application.

Phases: 1. Emit ApplicationStarting event 2. Register all providers with the container 3. Boot all providers (dependency order, parallel where possible) 4. Resolve EventBusProtocol from container for lifecycle events 5. Emit ApplicationStarted event

If any phase fails, already-booted providers are shut down and the application moves to STOPPED.

Raises
ExceptionDescription
RuntimeErrorIf application is not in CREATED state.
async def stop() -> None

Shutdown all providers in reverse order.

Safe to call multiple times. Handles being called from STARTING state (failed boot) as well as RUNNING.

Emits ApplicationStopping before shutdown and ApplicationStopped after completion.

async def health_check(timeout: float = 5.0) -> AggregateHealthResult

Aggregate health check from all providers.

Returns an AggregateHealthResult whose status follows worst-case aggregation across all registered provider health checks.

async def liveness(timeout: float = 5.0) -> AggregateHealthResult

Run liveness checks for the application.

async def readiness(timeout: float = 5.0) -> AggregateHealthResult

Run readiness checks for the application.

async def startup_check(timeout: float = 5.0) -> AggregateHealthResult

Run startup checks for the application.

async def boot(
    cls,
    name: str = 'lexigram-app',
    providers: list[Provider] | None = None,
    modules: list[Any] | None = None,
    config: LexigramConfig | None = None
) -> AsyncIterator[Application]

Create, start, yield, and guarantee shutdown.

Usage

async with Application.boot(providers=[...]) as app:
await app.invoke(main)

Register clock services for the framework.
def __init__(config: ClockConfig | None = None) -> None
async def register(container: ContainerRegistrarProtocol) -> None

Register the default clock implementation.

async def boot(container: ContainerResolverProtocol) -> None

Clock services do not require boot-time work.

async def shutdown() -> None

Clock services do not require shutdown work.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report the clock subsystem as healthy.


Configuration loading, models, and sources DI provider.

This provider handles loading configuration from various sources (YAML, Env) and registering the configuration instance into the DI container.

All config classes are backed by DomainModel (stdlib dataclasses). Environment variable reading is handled by EnvironmentConfigSource, which is always added as a source.

def __init__(
    config_class: type[Any] = LexigramConfig,
    sources: list[Any] | None = None,
    env_prefix: str = '',
    extensions: dict[str, type] | None = None
) -> None
async def register(container: ContainerRegistrarProtocol) -> None

Load configuration and register as a singleton.

async def boot(container: ContainerResolverProtocol) -> None

No boot-time work required for the config module.

async def shutdown() -> None

No resources to release for the config module.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report config loading status.


Configuration loading, parsing, or validation failed.

Raised by:

  • ConfigProvider during initialisation
  • Config loaders when sources are invalid
  • Validation of ConfigProtocol implementations

Dependency injection container.

Manages service registration, resolution, and lifecycle. Delegates to ContainerRegistrarImpl, ContainerResolverImpl, and ContainerValidator for write, read, and validation responsibilities respectively.

Usage

container = Container()
container.singleton(MyService, MyService())
container.transient(IRepo, SqlRepo)
service = await container.resolve(MyService)
def __init__(
    parent: Container | None = None,
    testing_mode: bool = False
) -> None
property registrar() -> ContainerRegistrarImpl

Return the write-only registrar component.

property resolver() -> ContainerResolverImpl

Return the read-only resolver component.

def create_invoker() -> FunctionInvoker

Create a FunctionInvoker bound to this container.

Returns a new invoker that shares this container’s service resolver and type-hint cache, so repeated calls to FunctionInvoker.call benefit from the warm cache without exposing the resolver’s private attributes.

Returns
TypeDescription
FunctionInvokerA FunctionInvoker configured with this container's resolver and hints cache.

Example

invoker = container.create_invoker()
result = await invoker.call(my_function)
def clear() -> None

Clear all registered services.

Raises
ExceptionDescription
ContainerErrorif the container has been frozen.
def transient(
    service_type: type[T],
    factory: ServiceFactory[T],
    validate: bool = True
) -> None

Register a transient service (new instance each resolution).

def singleton(
    service_type: type[T],
    instance: T | None = None,
    *,
    name: str | None = None,
    factory: ServiceFactory[T] | None = None,
    validate: bool = True
) -> None
def singleton(
    service_type: Any,
    instance: Any = None,
    *,
    name: str | None = None,
    factory: Any | None = None,
    validate: bool = True
) -> None
def singleton(
    service_type: Any,
    instance: Any = None,
    *,
    name: str | None = None,
    factory: Any | None = None,
    validate: bool = True
) -> None

Register a singleton service (shared instance).

Pass instance for a pre-built singleton, or factory for lazy creation. Use name for named registrations resolvable via Annotated[T, Named('...')]].

def scoped(
    service_type: type[T],
    factory: ServiceFactory[T],
    validate: bool = True,
    *,
    name: str | None = None
) -> None

Register a scoped service (instance per scope).

def override(
    service_type: type[T],
    instance: T
) -> None

Replace a service registration for testing purposes.

This method is restricted to containers created with testing_mode=True. It works even on frozen containers, intended for test scenarios where you need to swap a real service with a fake or mock.

Parameters
ParameterTypeDescription
`service_type`type[T]The service type to override.
`instance`TThe replacement instance.
Raises
ExceptionDescription
ContainerErrorIf the container is not in testing mode.
ContainerErrorIf the service is not registered.
property is_frozen() -> bool

Return True if this container has been frozen.

Once frozen, no further service registrations are allowed. Use this in tests or startup checks to verify container state.

def freeze(validate: bool = True) -> None

Prevent any further service registrations on this container.

After freezing, calls to transient(), singleton(), scoped() or clear() will raise ContainerError. Freezing is intended to be invoked once the registration phase is complete (e.g. after ProviderOrchestrator.register_all()).

Parameters
ParameterTypeDescription
`validate`boolWhen ``True`` (default), runs pre-flight dependency validation before freezing. Raises ContainerValidationError if any missing dependencies, circular references, or scope violations are detected.
Raises
ExceptionDescription
ContainerValidationErrorIf ``validate=True`` and validation issues are found.
async def resolve(
    service_type: type[T],
    *,
    bypass_visibility: bool = False
) -> T
async def resolve(
    service_type: Any,
    *,
    bypass_visibility: bool = False
) -> Any
async def resolve(
    service_type: Any,
    *,
    bypass_visibility: bool = False
) -> Any

Asynchronously resolve a service by its registered type.

Parameters
ParameterTypeDescription
`service_type`AnyThe service type to resolve.
`bypass_visibility`boolIf True, skip module visibility enforcement. Use only in framework-internal resolution paths.
Returns
TypeDescription
AnyThe resolved service instance.
Raises
ExceptionDescription
ModuleVisibilityErrorIf the current module context does not have visibility over the requested service type.
def resolve_sync(service_type: type[T]) -> T

Synchronously resolve an already-instantiated singleton.

This is intended for use in decorators or other synchronous contexts where async resolution is not possible. It ONLY works for singletons that have already been instantiated (e.g. during boot).

Parameters
ParameterTypeDescription
`service_type`type[T]The service type to resolve.
Returns
TypeDescription
TThe resolved instance.
Raises
ExceptionDescription
UnresolvableDependencyErrorIf not a singleton or not instantiated.
def validate() -> list[str]

Validate the container configuration.

Checks:

  • All registered services have resolvable dependencies (missing dependencies)
  • No circular dependencies across the entire graph
  • No scope violations (singleton depending on scoped/transient)
Returns
TypeDescription
list[str]List of validation issues (empty if valid).
def validate_no_orphans() -> list[OrphanedRegistration]

Find registrations that no other service depends on.

This is a development-time validator to identify dead code services that are registered but never used.

Returns
TypeDescription
list[OrphanedRegistration]List of potentially orphaned registrations.
def has(service_type: object) -> bool

Check if a service is explicitly registered.

async def resolve_optional(service_type: type[T]) -> T | None
async def resolve_optional(service_type: Any) -> Any | None
async def resolve_optional(service_type: Any) -> Any | None

Resolve a service, returning None if not registered.

Provides graceful handling for optional dependencies without requiring callers to catch exceptions.

Parameters
ParameterTypeDescription
`service_type`AnyThe type to resolve.
Returns
TypeDescription
Any | NoneThe resolved instance or None if the service is not registered.
async def resolve_all(service_type: type[T]) -> list[T]
async def resolve_all(service_type: Any) -> list[Any]
async def resolve_all(service_type: Any) -> list[Any]

Resolve all registered implementations that are subtypes of a service type.

Parameters
ParameterTypeDescription
`service_type`AnyThe base type whose implementations to resolve.
Returns
TypeDescription
list[Any]A list of resolved instances for matching registrations.
def is_singleton(service_type: object) -> bool

Check if a service is registered as a singleton.

def dump_registrations() -> list[dict[str, Any]]

Return a JSON-serialisable snapshot of all container registrations.

def dump_dependency_graph() -> dict[str, list[str]]

Return an adjacency map of service → direct dependency names.

def log_registrations() -> None

Log a human-readable table of all container registrations.

def create_scope() -> Scope

Create a request-scoped resolution context.

async def scope() -> AsyncIterator[Scope]

Async context manager for a request-scoped container.

Creates a new Scope and disposes it automatically when the context exits.

Usage

async with container.scope() as scoped:
service = await scoped.resolve(UserService)
Yields
TypeDescription
AsyncIterator[Scope]A fresh Scope instance.
async def call(
    func: Callable[Ellipsis, Awaitable[T] | T],
    *args: Any,
    **kwargs: Any
) -> T

Call a function with dependency injection (Protocol implementation).

async def dispose() -> None

Dispose the container and all internal singletons.

Iterates through all registered singletons and disposes of those that implement AsyncDisposableProtocol or have a close/aclose method.


Fluent API for building DI containers.

This builder provides a clean, declarative way to configure services and their dependencies for testing and programmatic container construction.

For application-level composition, use Application instead — it provides the full lifecycle (register → freeze → validate → boot → shutdown) and module compilation.

Example

container = await (
ContainerBuilder()
.add_singleton(DatabaseConfig, lambda: db_config)
.add_scoped(UserRepository, SQLAlchemyUserRepository)
.add_transient(UserService, UserService)
.build()
)
def __init__() -> None

Initialize the container builder.

def add_singleton(
    interface: type[T],
    implementation: type[T] | Callable[[], T],
    metadata: dict[str, Any] | None = None
) -> Self

Register a singleton service.

Parameters
ParameterTypeDescription
`interface`type[T]The service interface/contract.
`implementation`type[T] | Callable[[], T]The concrete class or factory function.
`metadata`dict[str, Any] | NoneOptional metadata.
Returns
TypeDescription
SelfSelf for method chaining.
def add_instance(
    interface: type[T],
    instance: T
) -> Self

Register a pre-built instance as a singleton.

Parameters
ParameterTypeDescription
`interface`type[T]The service interface/contract.
`instance`TThe pre-built instance.
Returns
TypeDescription
SelfSelf for method chaining.
def add_scoped(
    interface: type[T],
    implementation: type[T] | Callable[Ellipsis, T],
    metadata: dict[str, Any] | None = None
) -> Self

Register a scoped service.

Parameters
ParameterTypeDescription
`interface`type[T]The service interface/contract.
`implementation`type[T] | Callable[Ellipsis, T]The concrete class or factory function.
`metadata`dict[str, Any] | NoneOptional metadata.
Returns
TypeDescription
SelfSelf for method chaining.
def add_transient(
    interface: type[T],
    implementation: type[T] | Callable[Ellipsis, T],
    metadata: dict[str, Any] | None = None
) -> Self

Register a transient service.

Parameters
ParameterTypeDescription
`interface`type[T]The service interface/contract.
`implementation`type[T] | Callable[Ellipsis, T]The concrete class or factory function.
`metadata`dict[str, Any] | NoneOptional metadata.
Returns
TypeDescription
SelfSelf for method chaining.
def add_module(module_or_dynamic: type | DynamicModule) -> Self

Add a module to configure the container.

Accepts module classes (decorated with @module) and DynamicModule instances.

Module providers are registered during build. The builder performs basic module processing (provider extraction) but does NOT run the full ModuleCompiler — it does not validate import/export visibility. For full module validation, use Application.

Parameters
ParameterTypeDescription
`module_or_dynamic`type | DynamicModuleA module class or DynamicModule.
Returns
TypeDescription
SelfSelf for method chaining.
def add_provider(provider: Any) -> Self

Add a provider whose register() will be called during build.

Parameters
ParameterTypeDescription
`provider`AnyA Provider instance.
Returns
TypeDescription
SelfSelf for method chaining.
def disable_validation() -> Self

Disable container validation during build.

Returns
TypeDescription
SelfSelf for method chaining.
async def build() -> ContainerResolverProtocol

Build and validate the container.

Processing order: 1. Extract providers from modules 2. Register module providers 3. Apply direct service registrations 4. Call provider.register() on standalone providers 5. Validate (if enabled)

Returns
TypeDescription
ContainerResolverProtocolA ContainerResolverProtocol for resolving services.
Raises
ExceptionDescription
ContainerBuildErrorIf build or validation fails.

User-facing typed wrapper around an injected ``ContextVarRegistry``.
def __init__(registry: ContextVarRegistry) -> None
property registry() -> ContextVarRegistry

The underlying registry (for advanced use / wiring).

def get(
    key: ContextKey[T],
    default: T | None = None
) -> T | None

Get the current value for a typed key.

def set(
    key: ContextKey[T],
    value: T
) -> contextvars.Token[T | None]

Set a typed value, returning a reset token.

def reset(
    key: ContextKey[T],
    token: contextvars.Token[T | None]
) -> None

Reset a typed value using a token from set.

def register_key(key: ContextKey[Any]) -> None

Register a new context key at runtime.

def get_dynamic(
    key: str,
    default: Any = None
) -> Any

Get a value by string key.

def set_dynamic(
    key: str,
    value: Any
) -> contextvars.Token[Any]

Set a value by string key.

def reset_dynamic(
    key: str,
    token: contextvars.Token[Any]
) -> None

Reset a value by string key.

def snapshot() -> dict[str, Any]

Snapshot of all non-None context values.

def has(key: str) -> bool

Check whether a context key is registered.


Immutable, typed key for context-variable access (pure data).

Provider for **shared low-level Lexigram infrastructure**.

This is a leaf provider (no sub-providers) that registers the primitive cross-cutting services needed by every other provider:

  • Context / ContextVarRegistry — request-scoped context propagation
  • Registry — a named-object registry for ad-hoc service look-up

Do NOT confuse with CoreProvider:

+-----------------------------------+----------------------------------------------------+ | CoreInfrastructureProvider | CoreProvider | +===================================+====================================================+ | Leaf provider, no sub-providers | Aggregating provider, no direct registrations | +-----------------------------------+----------------------------------------------------+ | Registers concrete primitives | Delegates to sub-providers via orchestrator | +-----------------------------------+----------------------------------------------------+ | Used inside the framework | Used by application bootstrap (LexigramApp) | +-----------------------------------+----------------------------------------------------+ | name = "common" | name = "core" | +-----------------------------------+----------------------------------------------------+

async def register(container: ContainerRegistrarProtocol) -> None

Register common utilities with the container.

async def boot(container: ContainerResolverProtocol) -> None

Initialize common services.

async def shutdown() -> None

Clean up common resources.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check health of shared infrastructure.


Kernel provider manifest for a minimal Lexigram application.

Bundles the four kernel framework providers that every Lexigram application requires: configuration, logging, infrastructure primitives, and DI introspection.

For a batteries-included bundle that also provides serialization and concurrency, use StandardModule.

Extension packages add their own providers separately. CoreModule has zero cross-package imports — it references only providers defined within the lexigram core package and contracts from lexigram-contracts.

def build_providers(
    cls,
    config_class: type[Any] | None = None,
    config_sources: list[Any] | None = None,
    env_prefix: str = '',
    overrides: dict[str, Provider] | None = None
) -> list[Provider]

Instantiate and return the core provider list.

Creates a fresh provider instance from each registered core provider class. Use overrides to replace any default provider by its name.

Parameters
ParameterTypeDescription
`config_class`type[Any] | NoneCustom configuration dataclass to use instead of the default LexigramConfig.
`config_sources`list[Any] | NoneExplicit list of ConfigSource instances. When ``None``, ``ConfigProvider`` falls back to its default source chain (environment variables + config files).
`env_prefix`strEnvironment variable prefix forwarded to the ``ConfigProvider``.
`overrides`dict[str, Provider] | NoneMapping of provider **name** → replacement Provider instance. A name matching a core provider replaces that provider; unmatched names are appended to the end of the list.
Returns
TypeDescription
list[Provider]Ordered list of Provider instances ready to pass to Application.boot.

Example

providers = CoreModule.build_providers(
config_class=MyConfig,
overrides={"logging": CustomLoggingProvider()},
)
def configure(
    cls,
    config_class: type[Any] | None = None,
    config_sources: list[Any] | None = None,
    env_prefix: str = '',
    overrides: dict[str, Provider] | None = None
) -> DynamicModule

Create a DynamicModule configured with core provider instances.

Parameters
ParameterTypeDescription
`config_class`type[Any] | NoneCustom configuration dataclass for ``ConfigProvider``.
`config_sources`list[Any] | NoneExplicit config source list for ``ConfigProvider``.
`env_prefix`strEnvironment variable prefix for ``ConfigProvider``.
`overrides`dict[str, Provider] | NoneMapping of provider name -> replacement provider instance.
Returns
TypeDescription
DynamicModuleDynamicModule configured with instantiated core providers and default core exports.

Main **aggregating** provider for Lexigram core services.

CoreProvider is the single entry point for application bootstrap. It does not register services directly — instead it relies on the orchestrator to discover and execute all framework sub-providers in dependency order.

Do NOT confuse with CoreInfrastructureProvider:

+-----------------------------------+----------------------------------------------------+ | CoreProvider | CoreInfrastructureProvider | +===================================+====================================================+ | Aggregating, no registrations | Leaf provider, registers concrete primitives | +-----------------------------------+----------------------------------------------------+ | Used by application bootstrap | Used inside the framework internals | +-----------------------------------+----------------------------------------------------+ | name = "core" | name = "common" | +-----------------------------------+----------------------------------------------------+

def __init__() -> None

Initialize CoreProvider.

async def register(container: ContainerRegistrarProtocol) -> None

Register core services.

Note: Sub-providers are now handled by the orchestrator via the sub_providers property and recursive discovery.

async def boot(container: ContainerResolverProtocol) -> None

Boot core services.

async def shutdown() -> None

Shutdown core services.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

CoreProvider itself is always healthy if it reached boot.

Individual core services (config, logging, etc.) are checked by their own providers via the orchestrator.


Domain model invariant violated or business rule broken.

Raised by:

  • Domain event handlers
  • Aggregate root validations
  • Entity lifecycle violations

Runtime-configured module descriptor.

Returned by factory methods like Module.configure() and Module.scope(). When a DynamicModule appears in an import list or is passed to Application.add_module(), it fully replaces the static @module() metadata for the referenced module class.

Provider entries may be classes (instantiated by the orchestrator) or pre-constructed instances (registered directly)

DynamicModule(
module=CacheModule,
providers=[
CacheProvider(backend="redis"), # instance
MetricsProvider, # class
],
exports=[CacheBackendProtocol],
is_global=True,
)

Identity and Deduplication: The module field determines identity. Two DynamicModule entries referencing the same module class but with different configurations raise ModuleDuplicateError at compile time.

property resolved_name() -> str

Module name: explicit name or module.__name__.


def __init__(error: E) -> None
def is_ok() -> bool
def is_err() -> bool
def unwrap() -> T
def unwrap_err() -> E
def unwrap_or(default: T) -> T
def unwrap_or_else(op: Callable[[E], T]) -> T
def map_sync(op: Callable[[T], U]) -> Result[U, E]
def map_err(op: Callable[[E], F]) -> Result[T, F]
def and_then_sync(op: Callable[[T], Result[U, E]]) -> Result[U, E]
def or_else_sync(op: Callable[[E], Result[T, F]]) -> Result[T, F]
def expect(message: str) -> T
def match(
    ok: Callable[[T], U],
    err: Callable[[E], U]
) -> U
async def map(op: Callable[[T], Awaitable[U]]) -> Result[U, E]
async def async_map(op: Callable[[T], Awaitable[U]]) -> Result[U, E]

Alias for map — exists for backward compatibility.

async def and_then(op: Callable[[T], Awaitable[Result[U, E]]]) -> Result[U, E]
async def or_else(op: Callable[[E], Awaitable[Result[T, F]]]) -> Result[T, F]
def flatten() -> Result[Any, E]
def filter(
    predicate: Callable[[T], bool],
    error: E
) -> Result[T, E]
def ok_or(default: U) -> U

Deterministic clock for tests.
def __init__(initial: datetime | None = None) -> None
def set(dt: datetime) -> None

Set the clock to a specific time.

def freeze(dt: datetime) -> None

Backward-compatible alias for set.

def advance(delta: float | timedelta | Duration) -> None

Advance the clock by a duration.

def tick() -> None

Advance the clock by one second.

def now() -> datetime

Return the current frozen time.

def monotonic() -> float

Return elapsed seconds since the clock was created or set.

def timestamp() -> float

Return the Unix timestamp for the current frozen time.

def time() -> float

Backward-compatible alias for timestamp.


Named priority levels for hook handlers.

Lower values run first. Custom integer values are accepted anywhere a priority is expected — these named levels are conveniences.

Spacing between levels leaves room for application-specific intermediate values (e.g. HookPriority.EARLY + 10).


Named hook registry for framework extensibility.

Thread-safe registry for action hooks (fire-and-forget, error-isolated) and filter hooks (value-pipeline, error-propagating) with priority ordering.

Read operations (has_*, call_action, apply_filter) are lock-free — they snapshot the handler list and iterate the snapshot. Write operations (register_*, unregister_*) acquire the lock.

Parameters
ParameterTypeDescription
`name`Registry name used in structured log output. Defaults to ``"default"``.

Example

hooks = HookRegistry("app")
@hooks.action("before_request", priority=HookPriority.EARLY)
async def log_request(**kwargs: Any) -> None:
logger.info("incoming", path=kwargs["path"])
await hooks.call_action("before_request", path="/api/users")
def __init__(name: str = 'default') -> None
def register_action(
    hook_name: str,
    handler: Callable[Ellipsis, Any],
    priority: int = HookPriority.NORMAL,
    *,
    once: bool = False
) -> None

Register an action handler for the given hook.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`handler`Callable[Ellipsis, Any]Callable to invoke. Can be sync or async.
`priority`intLower values run first. Defaults to ``HookPriority.NORMAL`` (100).
`once`boolIf True, auto-remove handler after first invocation.
def register_filter(
    hook_name: str,
    handler: Callable[Ellipsis, Any],
    priority: int = HookPriority.NORMAL,
    *,
    once: bool = False
) -> None

Register a filter handler for the given hook.

Filter handlers receive a value as the first positional argument and must return the (possibly transformed) value.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`handler`Callable[Ellipsis, Any]Callable that receives and returns a value.
`priority`intLower values run first. Defaults to ``HookPriority.NORMAL`` (100).
`once`boolIf True, auto-remove handler after first invocation.
def unregister_action(
    hook_name: str,
    handler: Callable[Ellipsis, Any]
) -> bool

Remove an action handler from a hook.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`handler`Callable[Ellipsis, Any]The handler to remove (identity comparison).
Returns
TypeDescription
boolTrue if the handler was found and removed.
def unregister_filter(
    hook_name: str,
    handler: Callable[Ellipsis, Any]
) -> bool

Remove a filter handler from a hook.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`handler`Callable[Ellipsis, Any]The handler to remove (identity comparison).
Returns
TypeDescription
boolTrue if the handler was found and removed.
async def call_action(
    hook_name: str,
    **kwargs: Any
) -> None

Invoke all action handlers for the named hook.

Handlers run sequentially in priority order. Errors in individual handlers are logged but do not prevent subsequent handlers from running.

Once-handlers are removed after they fire (even if they raise).

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name. **kwargs: Arguments passed to each handler.
async def apply_filter(
    hook_name: str,
    value: Any,
    **kwargs: Any
) -> Any

Pass a value through all filter handlers for the named hook.

Each handler receives the value (possibly transformed by prior handlers) and must return the new value. Handlers run sequentially in priority order.

Once-handlers are removed after they fire (even if they raise). Errors abort the pipeline and re-raise to the caller.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`value`AnyThe initial value to filter. **kwargs: Additional arguments passed to each handler.
Returns
TypeDescription
AnyThe final filtered value.
def has_action(hook_name: str) -> bool

Check if any action handlers are registered for a hook.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
Returns
TypeDescription
boolTrue if at least one handler is registered.
def has_filter(hook_name: str) -> bool

Check if any filter handlers are registered for a hook.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
Returns
TypeDescription
boolTrue if at least one handler is registered.
def action_names() -> list[str]

List all registered action hook names.

Returns
TypeDescription
list[str]List of hook names that have at least one action handler.
def filter_names() -> list[str]

List all registered filter hook names.

Returns
TypeDescription
list[str]List of hook names that have at least one filter handler.
def clear(hook_name: str | None = None) -> None

Clear all handlers, or handlers for a specific hook.

Parameters
ParameterTypeDescription
`hook_name`str | NoneIf provided, clear only this hook. Otherwise clear all.
def action(
    hook_name: str,
    *,
    priority: int = HookPriority.NORMAL,
    once: bool = False
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to register an action handler.

Returns the handler unchanged so it can still be called directly.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`priority`intLower values run first. Defaults to ``HookPriority.NORMAL``.
`once`boolIf True, auto-remove handler after first invocation.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator that registers the handler and returns it unchanged.

Example

@hooks.action("before_request", priority=HookPriority.EARLY)
async def log_request(**kwargs: Any) -> None:
logger.info("request", path=kwargs["path"])
def filter(
    hook_name: str,
    *,
    priority: int = HookPriority.NORMAL,
    once: bool = False
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to register a filter handler.

Returns the handler unchanged so it can still be called directly.

Parameters
ParameterTypeDescription
`hook_name`strThe hook point name.
`priority`intLower values run first. Defaults to ``HookPriority.NORMAL``.
`once`boolIf True, auto-remove handler after first invocation.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator that registers the handler and returns it unchanged.

Example

@hooks.filter("transform_response")
async def add_headers(value: Any, **kwargs: Any) -> Any:
value.headers["X-Custom"] = "yes"
return value

Register identity generators for the framework.
def __init__(config: IdentityConfig | None = None) -> None
async def register(container: ContainerRegistrarProtocol) -> None

Register the active generator implementation.

async def boot(container: BootContainerProtocol) -> None

Install the resolved generator into the active holder.

async def shutdown() -> None

Identity services do not require shutdown work.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report the identity subsystem as healthy.


Decorator class to mark a class as injectable for dependency injection.

This decorator marks classes for automatic registration with the dependency injection container. The container can then resolve these services and automatically inject their dependencies.

Parameters
ParameterTypeDescription
`scope`The service scope — transient (new instance each time), singleton (shared instance), or scoped (instance per scope). Defaults to transient.
`name`Optional name for the service registration. If not provided, the class name is used.
def __init__(
    scope: ServiceScope = ServiceScope.TRANSIENT,
    name: str | None = None
) -> None

Initialize the Injectable decorator.

Parameters
ParameterTypeDescription
`scope`ServiceScopeThe service scope for this class.
`name`str | NoneOptional name for the service.

Dependency injection or container resolution failed.

Raised by:

  • Container.resolve() when a service is not registered
  • Provider.register() on configuration errors
  • Module resolution when imports fail
  • Circular dependency detection

Invokes functions with dependency injection and middleware.

This class decouples the invocation logic from the Application class, allowing it to be used in other contexts (like testing or custom runners).

The context dict passed through the middleware pipeline satisfies InvocationContextProtocol — a transport-neutral invocation context that transports can extend with typed attributes in Phase 3.

def __init__(
    container: Container,
    middleware: MiddlewarePipelineProtocol
) -> None
async def invoke(
    func: Callable[Ellipsis, Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Invoke a function with automatic dependency injection and middleware.

The invocation context passed through the middleware pipeline is a dict[str, Any] that structurally satisfies InvocationContextProtocol. Middleware implementations should treat it as opaque and forward it unchanged unless they are deliberately enriching the context.


Structured logging factory and helpers DI provider.
async def register(container: ContainerRegistrarProtocol) -> None

Register logging services.

async def boot(container: ContainerResolverProtocol) -> None

Apply logging configuration from LexigramConfig.

async def shutdown() -> None

No resources to release for the logging module.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check — always healthy (in-process only, no external backend).

Parameters
ParameterTypeDescription
`timeout`floatIgnored for in-process providers.
Returns
TypeDescription
HealthCheckResultAlways HEALTHY — no external backend to check.

Mutable builder for MiddlewarePipeline.

Collect middleware via add (fluent interface supported), then call build to produce the frozen MiddlewarePipeline that is safe to share.

The chain itself is never executed — it is only a construction helper. After build is called the chain remains mutable; further add calls extend the next build.

def __init__(middleware: list[Any] | None = None) -> None

Create a chain, optionally pre-populated with middleware.

Parameters
ParameterTypeDescription
`middleware`list[Any] | NoneInitial list of middleware to seed the chain with.
def add(middleware: Any) -> MiddlewareChain

Append a middleware and return self for chaining.

Parameters
ParameterTypeDescription
`middleware`AnyAn ``async (context, next_handler) -> result`` callable.
Returns
TypeDescription
MiddlewareChain``self`` so calls can be chained fluently.
def build() -> MiddlewarePipeline

Produce an immutable MiddlewarePipeline from the current state.

The returned pipeline is a snapshot — subsequent add calls do not affect already-built pipelines.

Returns
TypeDescription
MiddlewarePipelineA new MiddlewarePipeline containing all added middleware.
def clear() -> None

Remove all middleware from the chain without building.


Middleware chain, pipeline, and registry configuration.

Composable async middleware pipeline, exception filter chain, and request pipeline.

Call configure to configure the middleware subsystem with custom settings.

Usage

from lexigram.middleware.config import MiddlewareConfig
@module(
imports=[MiddlewareModule.configure(MiddlewareConfig(...))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: Any | None = None
) -> DynamicModule

Create a MiddlewareModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneMiddlewareConfig or ``None`` for framework defaults.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(
    cls,
    config: Any | None = None
) -> DynamicModule

Return a MiddlewareModule with framework defaults for testing.

All middleware uses default configuration. No request pipeline is active — only the DI bindings are registered.

Parameters
ParameterTypeDescription
`config`Any | NoneAccepted for interface compatibility with stub; ignored. Pass MiddlewareConfig to configure instead.
Returns
TypeDescription
DynamicModuleA DynamicModule with default middleware configuration.

Middleware chain, pipeline, and registry DI provider.

On register():

  • Registers MiddlewareChain, ExceptionFilterChain, and MiddlewarePipeline as container singletons.
  • Scans the lexigram.middleware entry-point group and delegates registration to every discovered Provider subclass so that third-party middleware packages integrate automatically.
def __init__() -> None
async def register(container: ContainerRegistrarProtocol) -> None

Register middleware singletons and discover third-party providers.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe DI container registrar.
async def boot(container: BootContainerProtocol) -> None

Boot discovered providers after middleware chains are initialized.

Parameters
ParameterTypeDescription
`container`BootContainerProtocolThe DI container for boot phase.
async def shutdown() -> None

Shut down discovered providers in reverse order.

Providers are shut down in reverse discovery order to respect dependency cleanup semantics (last booted = first shut down).

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check provider health.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for health check response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult with status and component details.

Named middleware registry with priority ordering.

Inherits from Registry with priority support for ordering middleware by priority. Middleware is stored by name and retrieved in ascending priority order (lower number = earlier execution).

The registry is thread-safe through the base Registry implementation.

Example

registry = MiddlewareRegistry()
registry.register("auth", AuthMiddleware(), priority=10)
registry.register("logging", LoggingMiddleware(), priority=20)
# Returns middleware in priority order (auth first, then logging)
middleware_list = registry.all()
def __init__() -> None

Initialize the middleware registry with priority ordering.

def register(
    name: str,
    middleware: Any,
    *,
    priority: int = 50
) -> None

Register a named middleware.

Parameters
ParameterTypeDescription
`name`strUnique name for this middleware.
`middleware`AnyThe middleware instance.
`priority`intExecution order - lower values run first. Default 50.
Raises
ExceptionDescription
ValueErrorIf a middleware with the same name is already registered.
def unregister(name: str) -> Any

Remove and return a named middleware.

Parameters
ParameterTypeDescription
`name`strThe middleware name to remove.
Returns
TypeDescription
AnyThe removed middleware instance.
Raises
ExceptionDescription
KeyErrorIf no middleware with the given name is registered.
def get(name: str) -> Any

Retrieve a named middleware.

Parameters
ParameterTypeDescription
`name`strThe middleware name to look up.
Returns
TypeDescription
AnyThe middleware instance.
Raises
ExceptionDescription
KeyErrorIf no middleware with the given name is registered.
def all() -> list[Any]

Return all middleware in priority order.

Returns
TypeDescription
list[Any]List of middleware instances sorted by ascending priority.
def has(name: str) -> bool

Check if a middleware with the given name is registered.

def names() -> list[str]

Return a sorted list of registered middleware names.

def build_chain() -> MiddlewareChain

Build a MiddlewareChain from all registered middleware.

Returns middleware in ascending priority order (lower priority number executes first).

Returns
TypeDescription
MiddlewareChainA new MiddlewareChain containing all registered middleware.

Example

registry = MiddlewareRegistry()
registry.register("auth", AuthMiddleware(), priority=10)
registry.register("logging", LoggingMiddleware(), priority=20)
chain = registry.build_chain()
result = await chain.execute(request)
def with_defaults(cls) -> MiddlewareRegistry

Pre-populate with standard infrastructure middleware.


Optional base class for module classes.

Provides ClassVar defaults that the module decorator reads, IDE autocompletion, and the configure() / scope() / stub() factory pattern for creating DynamicModule descriptors.

Inheriting from Module is NOT required — any class decorated with @module() is a valid Lexigram module. This base class is a convenience, not a mandate.

Module instances can also be used directly as class decorators, which enables the root-module composition style familiar from other frameworks:

.. code-block:: python

@Module(imports=[UserModule, ProductModule])
class AppModule(ModuleBase):
pass

ClassVar Inheritance

class BaseInfra(Module):
providers = [LoggingProvider]
@module()
class FullInfra(BaseInfra):
exports = [LoggerProtocol]

Factory Pattern

@module()
class DatabaseModule(Module):
@classmethod
def configure(cls, url: str) -> DynamicModule:
return DynamicModule(
module=cls,
providers=[DatabaseProvider(url=url)],
exports=[DatabaseSession, TransactionManager],
is_global=True,
)
@classmethod
def scope(cls, *models: type) -> DynamicModule:
return DynamicModule(
module=cls,
providers=[ModelRegistryProvider(models)],
)
def __init__(
    name: str | None = None,
    providers: list[type] | None = None,
    imports: list[Any] | None = None,
    exports: list[type] | None = None,
    controllers: list[type] | None = None,
    is_global: bool = False,
    scan: list[str] | None = None
) -> None

Optionally override ClassVar defaults with instance values.

Parameters
ParameterTypeDescription
`name`str | NoneModule name.
`providers`list[type] | NoneProvider classes declared by this module.
`imports`list[Any] | NoneModule classes or DynamicModule instances to import.
`exports`list[type] | NoneContract types to expose to importing modules.
`controllers`list[type] | NoneController classes registered with the web layer.
`is_global`boolWhen ``True``, exports are visible to all modules.
`scan`list[str] | NonePackage paths to scan for ``@injectable`` / ``@singleton`` classes.
def configure(
    cls,
    *args: Any,
    **kwargs: Any
) -> DynamicModule

Configure this module globally. Called once at the app root.

Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleDynamicModule for root import.
Raises
ExceptionDescription
NotImplementedErrorSubclasses must override this method.
def scope(
    cls,
    *providers: type,
    **kwargs: Any
) -> DynamicModule

Register additional providers in a per-feature scope.

Creates an anonymous sub-class token so the compiler treats this as a distinct module (avoids ModuleDuplicateError when both configure() and scope() are imported into the same compiled graph).

Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleDynamicModule with a unique anonymous token as module identity.
def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return a test-mode module with in-memory/noop backends.

Parameters
ParameterTypeDescription
`config`AnyOptional test configuration override.
Returns
TypeDescription
DynamicModuleDynamicModule using test providers.
Raises
ExceptionDescription
NotImplementedErrorSubclasses must override this method.
async def on_module_booted(cls) -> None

Called after all providers in this module have been booted.

Override in subclasses to run post-boot initialization logic. The container is fully available at this point.

async def on_module_shutdown(cls) -> None

Called before this module’s providers are shut down.

Override in subclasses to run pre-shutdown cleanup logic.


Optional base class for module classes.

Provides ClassVar defaults that the module decorator reads, IDE autocompletion, and the configure() / scope() / stub() factory pattern for creating DynamicModule descriptors.

Inheriting from Module is NOT required — any class decorated with @module() is a valid Lexigram module. This base class is a convenience, not a mandate.

Module instances can also be used directly as class decorators, which enables the root-module composition style familiar from other frameworks:

.. code-block:: python

@Module(imports=[UserModule, ProductModule])
class AppModule(ModuleBase):
pass

ClassVar Inheritance

class BaseInfra(Module):
providers = [LoggingProvider]
@module()
class FullInfra(BaseInfra):
exports = [LoggerProtocol]

Factory Pattern

@module()
class DatabaseModule(Module):
@classmethod
def configure(cls, url: str) -> DynamicModule:
return DynamicModule(
module=cls,
providers=[DatabaseProvider(url=url)],
exports=[DatabaseSession, TransactionManager],
is_global=True,
)
@classmethod
def scope(cls, *models: type) -> DynamicModule:
return DynamicModule(
module=cls,
providers=[ModelRegistryProvider(models)],
)
def __init__(
    name: str | None = None,
    providers: list[type] | None = None,
    imports: list[Any] | None = None,
    exports: list[type] | None = None,
    controllers: list[type] | None = None,
    is_global: bool = False,
    scan: list[str] | None = None
) -> None

Optionally override ClassVar defaults with instance values.

Parameters
ParameterTypeDescription
`name`str | NoneModule name.
`providers`list[type] | NoneProvider classes declared by this module.
`imports`list[Any] | NoneModule classes or DynamicModule instances to import.
`exports`list[type] | NoneContract types to expose to importing modules.
`controllers`list[type] | NoneController classes registered with the web layer.
`is_global`boolWhen ``True``, exports are visible to all modules.
`scan`list[str] | NonePackage paths to scan for ``@injectable`` / ``@singleton`` classes.
def configure(
    cls,
    *args: Any,
    **kwargs: Any
) -> DynamicModule

Configure this module globally. Called once at the app root.

Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleDynamicModule for root import.
Raises
ExceptionDescription
NotImplementedErrorSubclasses must override this method.
def scope(
    cls,
    *providers: type,
    **kwargs: Any
) -> DynamicModule

Register additional providers in a per-feature scope.

Creates an anonymous sub-class token so the compiler treats this as a distinct module (avoids ModuleDuplicateError when both configure() and scope() are imported into the same compiled graph).

Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleDynamicModule with a unique anonymous token as module identity.
def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return a test-mode module with in-memory/noop backends.

Parameters
ParameterTypeDescription
`config`AnyOptional test configuration override.
Returns
TypeDescription
DynamicModuleDynamicModule using test providers.
Raises
ExceptionDescription
NotImplementedErrorSubclasses must override this method.
async def on_module_booted(cls) -> None

Called after all providers in this module have been booted.

Override in subclasses to run post-boot initialization logic. The container is fully available at this point.

async def on_module_shutdown(cls) -> None

Called before this module’s providers are shut down.

Override in subclasses to run pre-shutdown cleanup logic.


def __init__(value: T) -> None
def is_ok() -> bool
def is_err() -> bool
def unwrap() -> T
def unwrap_err() -> E
def unwrap_or(default: T) -> T
def unwrap_or_else(op: Callable[[E], T]) -> T
def map_sync(op: Callable[[T], U]) -> Result[U, E]
def map_err(op: Callable[[E], F]) -> Result[T, F]
def and_then_sync(op: Callable[[T], Result[U, E]]) -> Result[U, E]
def or_else_sync(op: Callable[[E], Result[T, F]]) -> Result[T, F]
def expect(message: str) -> T
def match(
    ok: Callable[[T], U],
    err: Callable[[E], U]
) -> U
async def map(op: Callable[[T], Awaitable[U]]) -> Result[U, E]
async def async_map(op: Callable[[T], Awaitable[U]]) -> Result[U, E]

Alias for map — exists for backward compatibility.

async def and_then(op: Callable[[T], Awaitable[Result[U, E]]]) -> Result[U, E]
async def or_else(op: Callable[[E], Awaitable[Result[T, F]]]) -> Result[T, F]
def flatten() -> Result[Any, E]
def filter(
    predicate: Callable[[T], bool],
    error: E
) -> Result[T, E]
def ok_or(default: U) -> T

Base class for all Lexigram providers.

Providers are part of the composition root. They register bindings and wire the object graph during boot.

def __init__(
    name: str | None = None,
    priority: ProviderPriority | None = None,
    dependencies: tuple[str, Ellipsis] | None = None,
    optional_dependencies: tuple[str, Ellipsis] | None = None,
    boot_timeout: float | None = None,
    required: bool | None = None
) -> None
property state() -> ProviderState
property is_booted() -> bool
property config() -> Any
def config(value: Any) -> None

Allow providers to assign configuration during initialization.

def from_config(
    cls,
    config: Any,
    **context: Any
) -> Self

Create a provider instance from a typed config object.

Subclasses should implement from_config(cls, config: TypedConfig, **context: Any) -> Self with the exact typed config model.

Parameters
ParameterTypeDescription
`config`AnyProvider-specific configuration object. **context: Optional extra keyword arguments (e.g. pre-built dependencies such as a queue or store instance).
Returns
TypeDescription
SelfA new provider instance configured from *config*.
async def register(container: ContainerRegistrarProtocol) -> None

Bind services into the container.

async def boot(container: BootContainerProtocol) -> None

Initialize services and wire dependencies.

async def shutdown() -> None

Tear down resources.

async def on_error(
    error: Exception,
    phase: str
) -> None

Called when boot() or shutdown() raises an exception.

Override to perform cleanup on startup/shutdown failure. This hook allows providers to clean up partial state when lifecycle methods fail.

The default implementation logs the error and does not re-raise.

Parameters
ParameterTypeDescription
`error`ExceptionThe exception that was raised.
`phase`strEither 'boot' or 'shutdown' indicating which phase failed.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Return health status of this provider.


Provider initialization priority.

Providers are booted in ascending order of priority. Lower values run first; the typical ordering is:

  • CRITICAL (0) – absolutely foundational services that others depend on (e.g. configuration, diagnostics).
  • INFRASTRUCTURE (10) – low-level plumbing such as database connections, messaging clients, and cache providers.
  • SECURITY (20) – authentication/authorization infrastructure.
  • NORMAL (30) – everyday domain services with no special ordering.
  • APPLICATION (40) – application-level tools (CLI, admin utilities) that depend on domain services but are not themselves domain logic.
  • DOMAIN (50) – business-logic providers that may depend on earlier layers.
  • PRESENTATION (80) – web/API layers and other entry points.
  • COMMS (90) – outbound communication providers (email, SMS, webhooks) often run late to avoid interfering with core initialization.
  • LOW (100) – lowest-priority, optional providers that can boot last.

Provider lifecycle state.

Unified Registry for managing collections.

Features:

  • Generic type support
  • Factory/lazy registration
  • Lifecycle hooks (on_register, on_unregister)
  • Decorator support
  • Thread-safe (using internally managed lock)
  • Iteration support
  • Priority ordering support

The Registry supports three distinct registration patterns. Choose the one that best fits your use-case:

1. Direct registration — use when the value already exists

registry.register("my_handler", MyHandler())

2. Factory (lazy) registration — use when construction is expensive or must be deferred until the first lookup

registry.register_factory("my_handler", lambda: MyHandler())

The factory is called once on the first get() / resolve() and the result is cached. Subsequent lookups return the cached instance.

3. Decorator registration — syntactic sugar for direct registration, useful when defining a class or function inline

@registry.register("my_handler")
class MyHandler:
...

Calling registry.register(key) with only the key (no value) activates decorator mode, so the decorated object is registered under key and returned unchanged (enabling chaining with other decorators).

For most framework-internal use adopt the decorator pattern when registering at module load time and the factory pattern when registration must be deferred or is conditional on runtime configuration.

def __init__(
    name: str | None = None,
    *,
    allow_overwrite: bool = False,
    priority_key: Callable[[V], int] | None = None
) -> None
property name() -> str

Registry name for identification.

property allow_overwrite() -> bool

Whether overwriting existing keys is allowed.

def freeze() -> None

Prevent further registrations after application boot.

Once frozen, calls to register, register_factory, unregister, and clear will raise RegistryAlreadyExistsError. Read operations (get, resolve, has, etc.) are unaffected.

Call this from Provider.boot() or Application.boot() after all providers have completed their register() phase.

property is_frozen() -> bool

Return whether this registry has been frozen.

def register(
    key: K,
    value: V | None = None,
    *,
    allow_overwrite: bool | None = None
) -> V | Callable[[V], V] | None

Register an item with the given key or use as decorator.

def register_factory(
    key: K,
    factory: Callable[[], V],
    *,
    allow_overwrite: bool | None = None
) -> None

Register a factory function for lazy instantiation.

def unregister(key: K) -> V | None

Unregister and return an item.

def get(key: K) -> V
def get(
    key: K,
    default: V
) -> V
def get(
    key: K,
    default: V | None = None
) -> V | None

Retrieve an item by key.

def resolve(key: K) -> V

Resolve an item, raising if not found.

def get_or_raise(key: K) -> V

Alias for resolve.

def has(key: K) -> bool

Check if key is registered (item or factory).

def keys() -> Iterable[K]

Return all registered keys (items only).

def values() -> Iterable[V]

Return all registered values.

def values_ordered() -> list[V]

Return all registered values sorted by priority.

If a priority_key was provided during initialization, values are sorted by that key in ascending order (lower values first). Otherwise, returns values in insertion order.

Returns
TypeDescription
list[V]List of values sorted by priority, or insertion order if no priority_key is configured.
def items() -> Iterable[tuple[K, V]]

Return all registered items.

def factories() -> Iterable[K]

Return keys with registered factories.

def all_keys() -> set[K]

Return all keys (items + factories).

def clear() -> None

Clear all registered items and factories.

def register_decorator(
    key: K | None = None,
    *,
    allow_overwrite: bool | None = None
) -> Callable[[V], V]

Decorator for easy registration.


Context manager that sets request-scoped variables and restores them on exit.
def __init__(
    registry: ContextVarRegistry,
    *,
    request_id: str | None = None,
    method: str | None = None,
    path: str | None = None,
    correlation_id: str | None = None,
    causation_id: str | None = None,
    user_id: str | None = None,
    tenant_id: str | None = None,
    start_time: float | None = None,
    trace_id: str | None = None,
    span_id: str | None = None,
    trace_flags: str | None = None
) -> None

Base Result type. Not abstract — Ok and Err are the only variants.
def is_ok() -> bool
def is_err() -> bool
def unwrap() -> T
def unwrap_err() -> E
def unwrap_or(default: T) -> T
def unwrap_or_else(op: Callable[[E], T]) -> T
def map_sync(op: Callable[[T], U]) -> Result[U, E]
def map_err(op: Callable[[E], F]) -> Result[T, F]
def and_then_sync(op: Callable[[T], Result[U, E]]) -> Result[U, E]
def or_else_sync(op: Callable[[E], Result[T, F]]) -> Result[T, F]
def expect(message: str) -> T
def match(
    ok: Callable[[T], U],
    err: Callable[[E], U]
) -> U
async def map(op: Callable[[T], Awaitable[U]]) -> Result[U, E]
async def and_then(op: Callable[[T], Awaitable[Result[U, E]]]) -> Result[U, E]
async def or_else(op: Callable[[E], Awaitable[Result[T, F]]]) -> Result[T, F]
def flatten() -> Result[Any, E]
def filter(
    predicate: Callable[[T], bool],
    error: E
) -> Result[T, E]
def ok_or(default: U) -> T | U
def from_exception(
    cls,
    exc: Exception,
    ok_type: type[T] = type(None)
) -> Result[T, Exception]

Wrap a caught exception into an Err result.

def to_optional() -> T | None
def inspect(op: Callable[[T], None]) -> Result[T, E]
def inspect_err(op: Callable[[E], None]) -> Result[T, E]

Pipeline for chaining Result operations with a fluent, type-safe API.

The error type E is preserved through the entire chain so mypy can verify error handling at every step. Use pipeline to start a pipeline from an infallible initial value.

def __init__(initial: Result[T, E]) -> None
def then(func: Callable[[T], Result[U, E]]) -> ResultPipeline[U, E]

Chain a function that returns a Result, preserving the error type.

def map(func: Callable[[T], U]) -> ResultPipeline[U, E]

Apply a function to the value if Ok, preserving the error type.

def recover(func: Callable[[E], Result[T, F]]) -> ResultPipeline[T, F]

Recover from an error by applying a function that may produce a new error type.

def finalize() -> Result[T, E]

Get the final typed Result.


Request-scoped resolution context.

Caches scoped service instances for the lifetime of the scope. Singletons and transients delegate to the root container.

Supports nested scopes (K-02): parent scopes can be specified to create hierarchical scoping where child scopes can access parent scope services.

Usage

async with container.create_scope() as scope:
session = await scope.resolve(DbSession)
# same instance within this scope

Nested scope usage

async with container.create_scope() as parent_scope:
# parent scope
async with parent_scope.create_scope() as child_scope:
# child scope can access parent services
session = await child_scope.resolve(DbSession)
def __init__(
    root: Container,
    parent: Scope | None = None
) -> None
async def resolve(service_type: type[T]) -> T

Resolve a service from this scope.

K-02: Falls back to parent scope if not found in current scope.

def create_scope() -> Scope

K-02: Create a child scope nested within this scope.

The child scope can access services from both the root container and the parent scope.

async def call(
    func: Callable[Ellipsis, Awaitable[T] | T],
    *args: Any,
    **kwargs: Any
) -> T

Call a function with dependency injection (Protocol implementation).

def has(service_type: object) -> bool

Check if a service is registered (delegates to root container).

async def resolve_optional(service_type: type[T]) -> T | None

Resolve a service, returning None if not registered.

async def resolve_all(service_type: type[T]) -> list[T]

Resolve all registered implementations that are subtypes of a service type.

async def dispose() -> None

Dispose all scoped resources.


JSON serialisation or deserialisation failed.

Raised by:

  • JsonSerializer implementations
  • Domain model serialisation
  • Schema validation during decode

UTC system clock implementation.
def now() -> datetime

Return the current timezone-aware UTC datetime.

def monotonic() -> float

Return the monotonic clock value.

def timestamp() -> float

Return the current Unix timestamp.

def time() -> float

Backward-compatible alias for timestamp.


def as_result(*exceptions: type[Exception]) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[Result[T, Exception]]]]

Decorator to capture exceptions and return a Result — async functions (canonical).

For sync functions use as_result_sync.

At least one exception type must be specified to prevent silently wrapping unexpected infrastructure failures.

Parameters
ParameterTypeDescription
Raises
ExceptionDescription
TypeErrorIf called with no exception types.

Example

@as_result(IOError, TimeoutError)
async def fetch(url: str) -> bytes:
return await client.get(url)

def as_result_sync(*exceptions: type[Exception]) -> Callable[[Callable[P, T]], Callable[P, Result[T, Exception]]]

Decorator to capture exceptions and return a Result — sync functions only.

For async functions use as_result (the canonical async variant).

At least one exception type must be specified to prevent silently wrapping unexpected infrastructure failures.

Parameters
ParameterTypeDescription
Raises
ExceptionDescription
TypeErrorIf called with no exception types.

Example

@as_result_sync(ValueError, KeyError)
def parse(data: str) -> int:
return int(data)

def buildable(cls: type[T]) -> type[T]

Class decorator that auto-generates a fluent Builder for the decorated class.

Introspects __init__ to discover parameters and generates with_<param>() setter methods on a dynamically created builder. Required parameters (those without defaults) are validated at build() time and raise TypeError if missing.

Adds a .builder() class method that returns a fresh builder instance each time it is called.

Example

@buildable
class User:
def __init__(self, name: str, email: str, age: int = 0):
self.name = name
self.email = email
self.age = age
user = (
User.builder()
.with_name("John")
.with_email("john@example.com")
.with_age(30)
.build()
)
Parameters
ParameterTypeDescription
`cls`type[T]The class to decorate. Must have a standard ``__init__`` with type-annotated parameters.
Returns
TypeDescription
type[T]The same class with an attached ``.builder()`` static method.

def builder_field(field_or_func: str | Callable[Ellipsis, Any]) -> Callable[Ellipsis, Any]

Decorator or helper to define builder methods.

Can be used as a decorator on a method to mark it as a builder field, or as a metadata marker for automated builder generation.


def collect(results: Iterable[Result[T, E]]) -> Result[list[T], E]

Collect an iterable of Results into a Result of a list.


def create_app(
    name: str = 'lexigram-app',
    config: LexigramConfig | None = None
) -> Application

Create and return a new Lexigram application.

Returns a Application with no providers registered. Call Application.add_provider (or Application.add_providers) before Application.boot to register providers.

For entry-point-based auto-discovery, use lexigram.plugins.discovery.discover_providers from the lexigram-plugins package.

Parameters
ParameterTypeDescription
`name`strHuman-readable application name used in log output. Defaults to ``"lexigram-app"``.
`config`LexigramConfig | NoneApplication configuration. If omitted an empty LexigramConfig is created, which reads values from environment variables via the standard ``LX_*`` prefix.
Returns
TypeDescription
ApplicationA new Application ready for provider registration and boot.

Example

app = create_app(config=LexigramConfig.from_env_profile("production"))
app.add_provider(WebProvider(web_config))
await app.boot()

def create_module(
    name: str | None = None,
    *,
    providers: list[type] | None = None,
    imports: list[type | DynamicModule] | None = None,
    exports: list[type] | None = None,
    controllers: list[type] | None = None,
    is_global: bool = False,
    scan: list[str] | None = None,
    require_stub: bool = False
) -> Any

Create a module via a decorator — convenience alias for module.


def inject(func_or_cls: Any) -> Any

Decorator to enable automatic dependency injection.

Applied to a class: marks the class for constructor injection. Applied to an async function: wraps it to resolve missing arguments from the current DI container when invoked via container.call().

Parameters
ParameterTypeDescription
`func_or_cls`AnyThe class or async function to decorate.
Returns
TypeDescription
AnyThe decorated class or function with automatic DI support.
Raises
ExceptionDescription
TypeErrorIf applied to a synchronous (non-async) function.

def injectable(
    scope_or_cls: ServiceScope | type[T] = ServiceScope.TRANSIENT,
    name: str | None = None,
    *,
    scope: ServiceScope | None = None
) -> Callable[[type[T]], type[T]] | type[T]

Decorator to mark a class as injectable.

Parameters
ParameterTypeDescription
`scope_or_cls`ServiceScope | type[T]The service scope or the class if used without parens.
`name`str | NoneOptional name for the service registration.
`scope`ServiceScope | NoneExplicit scope (keyword-only alternative to scope_or_cls).
Returns
TypeDescription
Callable[[type[T]], type[T]] | type[T]The decorator function or the decorated class.

def module(
    cls: type[T],
    *,
    name: str | None = ...,
    providers: list[Any] | None = ...,
    imports: list[type | DynamicModule] | None = ...,
    exports: list[type] | None = ...,
    controllers: list[type] | None = ...,
    is_global: bool = ...,
    scan: list[str] | None = ...,
    require_stub: bool = ...
) -> type[T]

def partition(results: Iterable[Result[T, E]]) -> tuple[list[T], list[E]]

Partition results into successes and failures.


def provide(func: Callable[Ellipsis, Any]) -> type[FunctionProvider]

def request_context(
    registry: ContextVarRegistry,
    *,
    request_id: str | None = None,
    method: str | None = None,
    path: str | None = None,
    correlation_id: str | None = None,
    causation_id: str | None = None,
    user_id: str | None = None,
    tenant_id: str | None = None
) -> Generator[RequestContext, None, None]

Functional context manager for request-scoped context

with request_scope(registry, request_id=“abc”) as req_ctx: …


async def run_application(app: Application) -> None

Run an application with signal handling.

Starts the application, waits for SIGINT/SIGTERM, then performs a graceful shutdown. Handles :exc:KeyboardInterrupt and :exc:SystemExit silently so the process terminates cleanly.

Parameters
ParameterTypeDescription
`app`ApplicationThe Application instance to run.

def scoped(cls: type[T]) -> type[T]

Decorator to register a class as a scoped service.

A scoped service creates one instance per scope (e.g., per request).

Parameters
ParameterTypeDescription
`cls`type[T]The class to register as scoped.
Returns
TypeDescription
type[T]The decorated class marked for scoped registration.

def singleton(cls: type[T]) -> type[T]

Decorator to register a class as a singleton service.

A singleton service is created once and shared across all resolutions.

Parameters
ParameterTypeDescription
`cls`type[T]The class to register as a singleton.
Returns
TypeDescription
type[T]The decorated class marked for singleton registration.

def transient(cls: type[T]) -> type[T]

Decorator to register a class as a transient service.

A transient service creates a new instance each time it is resolved.

Parameters
ParameterTypeDescription
`cls`type[T]The class to register as transient.
Returns
TypeDescription
type[T]The decorated class marked for transient registration.

def try_catch(
    catch: tuple[type[Exception], Ellipsis],
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs
) -> Awaitable[Result[T, Exception]]

Execute an async function and return its result as Ok, or Err if a listed exception occurs.

This is the canonical (async-first) variant. Callers must declare which exception types are expected failures; unexpected exceptions propagate normally.

For sync functions use try_catch_sync.

Parameters
ParameterTypeDescription
`catch`tuple[type[Exception], Ellipsis]A tuple of exception types to catch and wrap in ``Err``.
`func`Callable[P, Awaitable[T]]The async callable to invoke. *args: Positional arguments forwarded to *func*. **kwargs: Keyword arguments forwarded to *func*.

Example

result = await try_catch((IOError, TimeoutError), fetch, url)

def try_catch_sync(
    catch: tuple[type[Exception], Ellipsis],
    func: Callable[P, T],
    *args: P.args,
    **kwargs: P.kwargs
) -> Result[T, Exception]

Execute a sync function and return its result as Ok, or Err if a listed exception occurs.

Unlike a bare try/except Exception, callers must explicitly declare which exception types are expected failures. Unexpected exceptions propagate normally.

For async functions use try_catch (the canonical async variant).

Parameters
ParameterTypeDescription
`catch`tuple[type[Exception], Ellipsis]A tuple of exception types to catch and wrap in ``Err``.
`func`Callable[P, T]The callable to invoke. *args: Positional arguments forwarded to *func*. **kwargs: Keyword arguments forwarded to *func*.

Example

result = try_catch_sync((ValueError, KeyError), parse_int, "42")

Base container error.
def __init__(
    message: str = 'Container error',
    **kwargs: Any
) -> None

Business/domain-level errors (validation failures, not found, auth).
def __init__(
    message: str = 'Domain error',
    **kwargs: Any
) -> None

System-level failures (DB down, network timeouts).
def __init__(
    message: str = 'Infrastructure error',
    **kwargs: Any
) -> None

Root exception for the entire Lexigram ecosystem.

All Lexigram exceptions inherit from this class. This ensures that isinstance checks work identically across core, events, web, and all subpackages.

Attributes: code: Machine-readable error code (e.g., ‘LEX_CONTAINER_001’) message: Human-readable error message details: Additional context dictionary cause: Original exception that triggered this one hint: Optional suggestion for fixing the error

def __init__(
    message: str | None = None,
    details: dict[str, Any] | None = None,
    cause: BaseException | None = None,
    hint: str | None = None,
    **kwargs: Any
) -> None
property docs_url() -> str

Return the documentation URL for this error code.

def to_dict() -> dict[str, Any]

Serialize exception to dictionary for logging/API responses.

def with_details(**kwargs: Any) -> Self

Return self with additional detail key-value pairs merged in.

Mutates and returns self to preserve the exception subtype.

def with_hint(hint: str) -> Self

Return self with a human-readable hint attached.

def with_cause(cause: BaseException) -> Self

Return self with __cause__ set to cause.

def format() -> str

Format for developer-friendly console output.

Returns a multi-line string containing the error class name, code, message, details, hint, and cause chain when present.


Base exception for all lexigram domain errors.

Extends LexigramError so that isinstance(exc, LexigramError) holds for every framework exception, enabling unified handling across all packages.

Use for catchable, domain-level failures such as configuration validation, dependency-injection resolution, input validation, and domain-model invariant violations. For infrastructure failures (database, network, I/O), raise the exception directly without wrapping it in this hierarchy.

Example

try:
service = await container.resolve(MyService)
except LexigramException as e:
logger.error("resolution_failed", error=str(e))
raise

Module configuration or lifecycle error.
def __init__(
    message: str = 'Module error',
    **kwargs: Any
) -> None

Resource not found error (domain-level).
def __init__(
    message: str = 'Not found',
    **kwargs: Any
) -> None

Provider configuration or lifecycle error.
def __init__(
    message: str = 'Provider error',
    **kwargs: Any
) -> None