Skip to content
GitHubDiscord

API Reference

Protocol for AI provider implementations.
async def chat(
    messages: list[ChatMessageProtocol],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any
) -> CompletionProtocol

Chat with optional tool calling.

Parameters
ParameterTypeDescription
`messages`list[ChatMessageProtocol]List of chat messages.
`tools`list[dict[str, Any]] | NoneOptional list of JSON Schema tool definitions. Intentionally ``list[dict[str, Any]]`` — tool schemas are provider-specific JSON and their shape is not constrained here. **kwargs: Provider-specific options.
Returns
TypeDescription
CompletionProtocolCompletion with optional tool calls.

AI-domain graph node protocol for workflow execution.

A workflow node represents a single unit of work in a directed graph. Nodes receive the current shared state, perform their work, and return a state update dict that is merged into the global state.

property name() -> str

Unique node identifier within the workflow.

async def execute(state: dict[str, Any]) -> dict[str, Any]

Execute this node with the current workflow state.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current shared workflow state.
Returns
TypeDescription
dict[str, Any]Dict of state updates to merge into the shared state.

Protocol for agent execution engines.

The executor runs an agent with governance checks, memory management, and observability integration.

async def run(
    agent: AgentProtocol,
    message: str,
    session_id: str | None = None,
    user_id: str | None = None,
    **kwargs: Any
) -> Any

Execute an agent and return the response.

Returns Result[AgentResponse, AgentError].


Protocol for AI agents.

An agent declares its identity, capabilities (tools), and persona (system prompt). The AgentExecutor uses this protocol to drive the reasoning loop.

property name() -> str

Unique agent identifier.

property tools() -> list[ToolProtocol]

Tools available to this agent.

property system_prompt() -> str

System prompt defining the agent’s persona and constraints.


Structural protocol for aggregate root objects.

Any class that implements event buffering and consistency boundaries satisfies this protocol. Type-check against it instead of the concrete AggregateRoot base class where possible.

def add_event(event: DomainEvent) -> None

Register a domain event with the aggregate.

def collect_events() -> list[DomainEvent]

Return and clear all buffered domain events.

def pull_events() -> list[DomainEvent]

Return buffered events without clearing them.

def clear_events() -> None

Discard all buffered domain events.

property has_uncommitted_events() -> bool

Return True when there are buffered events not yet published.


Protocol for a simple in-process async mutual-exclusion lock.

Mirrors the interface of asyncio.Lock. Implementations are single-process (i.e. not distributed); use DistributedLockProtocol or LockManagerProtocol when cross-process coordination is required.

Example

from lexigram.contracts.lock import AsyncLockProtocol
async def protected(lock: AsyncLockProtocol) -> None:
async with lock:
await do_critical_work()

Container registration

container.singleton(AsyncLockProtocol, InMemoryAsyncLock)
async def acquire() -> bool

Acquire the lock, blocking until it becomes available.

Returns
TypeDescription
bool``True`` once the lock has been acquired.
def release() -> None

Release the lock.

Raises
ExceptionDescription
RuntimeErrorIf the lock is not currently held.
def locked() -> bool

Return True if the lock is currently held by any coroutine.


Async persistent store for sensitive named secret values.

Typical usage

store = await container.resolve(AsyncSecretStoreProtocol)
token = await store.get("stripe_api_key")
async def get(name: str) -> str | None

Return the secret value for name, or None if absent.

Parameters
ParameterTypeDescription
`name`strUnique secret identifier.
async def get_bulk(*names: str) -> dict[str, str]

Return a mapping of name → value for all requested secrets.

Parameters
ParameterTypeDescription
`names`strOne or more secret names.
Returns
TypeDescription
dict[str, str]Dict containing only the names that were found.
async def set(
    name: str,
    value: str
) -> None

Write or overwrite a secret value.

Parameters
ParameterTypeDescription
`name`strUnique secret identifier.
`value`strPlaintext secret value.
async def delete(name: str) -> None

Remove a secret. No-op if absent.

Parameters
ParameterTypeDescription
`name`strUnique secret identifier.

Protocol for value serialization.

Implementations handle conversion between Python objects and string representations suitable for storage or transmission. Used across extensions (cache, messaging, tasks, etc.) for consistent serialization.

Example

class JSONSerializer:
async def serialize(self, value: Any) -> str:
return json.dumps(value)
async def deserialize(self, data: str) -> Any:
return json.loads(data)
async def serialize(value: Any) -> str

Serialize a Python object to string.

Parameters
ParameterTypeDescription
`value`AnyPython object to serialize.
Returns
TypeDescription
strString representation.
async def deserialize(data: str) -> Any

Deserialize string back to Python object.

Parameters
ParameterTypeDescription
`data`strSerialized string data.
Returns
TypeDescription
AnyReconstructed Python object.

Protocol for authentication and authorization providers.

Auth providers are responsible for user authentication, authorization, token management, and access control.

async def get_user(user_id: str) -> Any | None

Retrieve a user by their unique identifier.

Parameters
ParameterTypeDescription
`user_id`strUnique user identifier.
Returns
TypeDescription
Any | NoneUser object or None if not found.
async def verify_token(token: str) -> Any

Verify an encoded auth token and return verification details.

Parameters
ParameterTypeDescription
`token`strRaw encoded auth token (JWT or similar).
Returns
TypeDescription
AnyVerification result (typically ``Result[VerifiedToken, TokenError]``).
def has_any_role(
    user: Any,
    roles: list[str]
) -> bool

Return True if user holds at least one of the given roles.

Parameters
ParameterTypeDescription
`user`AnyAuthenticated user object.
`roles`list[str]Role names to check.
Returns
TypeDescription
boolTrue if the user has at least one of the supplied roles.
def has_any_permission(
    user: Any,
    permissions: list[str]
) -> bool

Return True if user has at least one of the given permissions.

Parameters
ParameterTypeDescription
`user`AnyAuthenticated user object.
`permissions`list[str]Permission names to check.
Returns
TypeDescription
boolTrue if the user has at least one of the supplied permissions.

Protocol for authenticated user attached to requests.

This protocol defines the contract that any authenticated user implementation must follow for type-safe user handling.

Example

def get_user_profile(user: AuthenticatedUserProtocol) -> dict:
return {"user_id": user.user_id, "roles": user.roles}
property user_id() -> str

Unique identifier for the user.

property name() -> str

Name of the authenticated user.

property email() -> str

Email address of the user.

property is_active() -> bool

Whether the user account is active.

property is_verified() -> bool

Whether the user’s email is verified.

property roles() -> list[str]

List of roles assigned to the user.

property permissions() -> list[str]

List of permissions granted to the user.

def has_role(role: str) -> bool

Check if user has a specific role.

Parameters
ParameterTypeDescription
`role`strThe role to check for.
Returns
TypeDescription
boolTrue if the user has the role.
def has_permission(permission: str) -> bool

Check if user has a specific permission.

Parameters
ParameterTypeDescription
`permission`strThe permission to check for.
Returns
TypeDescription
boolTrue if the user has the permission.

Protocol for authorization decisions.

Authorizers determine if an authenticated user can perform a specific action on a resource.

async def authorize(
    user: Any,
    action: str,
    resource: Any
) -> bool

Check if user is authorized for action on resource.

Parameters
ParameterTypeDescription
`user`AnyAuthenticated user.
`action`strAction to perform (e.g., "read", "write", "delete").
`resource`AnyTarget resource.
Returns
TypeDescription
boolTrue if authorized.
async def check_access(
    user: Any,
    allowed_roles: set[str],
    resource: str | None = None,
    action: str | None = None
) -> bool

Check if user has access based on roles and permissions.

Parameters
ParameterTypeDescription
`user`AnyAuthenticated user.
`allowed_roles`set[str]Set of roles that grant access.
`resource`str | NoneTarget resource (optional).
`action`str | NoneAction to perform (optional).
Returns
TypeDescription
boolTrue if access is granted.
async def can(
    user: Any,
    action: str,
    resource: str
) -> bool

Check if user can perform action on resource.

Parameters
ParameterTypeDescription
`user`AnyAuthenticated user.
`action`strAction to perform.
`resource`strTarget resource.
Returns
TypeDescription
boolTrue if authorized.

Protocol for blob storage operations.

This interface defines the contract for file storage backends (S3, GCS, Azure Blob, local filesystem, etc.).

Example

class S3BlobStore:
async def upload(self, path: str, data: bytes, **options) -> FileInfo:
await self._client.put_object(Bucket=self._bucket, Key=path, Body=data)
return FileInfo(path=path, size=len(data), ...)
async def upload(
    path: str,
    data: bytes | AsyncIterator[bytes],
    content_type: str | None = None,
    **options: Any
) -> FileInfo

Upload data to the storage backend.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`data`bytes | AsyncIterator[bytes]File content as bytes or async iterator.
`content_type`str | NoneMIME type of the content. **options: Additional upload options.
Returns
TypeDescription
FileInfoFileInfo with path, size, and metadata.
async def download(path: str) -> bytes

Download file content into memory.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
bytesFile content as bytes.
def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file content (memory efficient).

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`chunk_size`intSize of each chunk in bytes.
Yields
TypeDescription
AsyncIterator[bytes]File content in chunks.
async def delete(path: str) -> None

Delete a file.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
async def exists(path: str) -> bool

Check if file exists.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
boolTrue if file exists.
async def info(path: str) -> FileInfo

Get file metadata.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
FileInfoFileInfo with size, content_type, etc.
def list(prefix: str = '') -> AsyncIterator[FileInfo]

List files with a given prefix.

Parameters
ParameterTypeDescription
`prefix`strPath prefix to filter by.
Yields
TypeDescription
AsyncIterator[FileInfo]FileInfo for each matching file.
async def get_url(path: str) -> str

Get public URL (if applicable).

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
strPublic URL string.
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Get a temporary secure URL.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`expires_in`timedeltaURL validity window as a timedelta (default one hour). Pass ``timedelta(minutes=5)`` for secure short-lived downloads or ``timedelta(hours=24)`` for bulk exports.
`method`strHTTP method (GET or PUT).
Returns
TypeDescription
strPresigned URL string.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Protocol for CORS (Cross-Origin Resource Sharing) policy configuration.

Defines how to evaluate CORS requests and provide the necessary headers and configuration for browser-based clients.

Example

class CORSPolicy:
def is_origin_allowed(self, origin: str) -> bool:
return origin in ("https://app.example.com", "https://admin.example.com")
def get_allowed_headers(self) -> list[str]:
return ["content-type", "authorization", "x-request-id"]
def get_allowed_methods(self) -> list[str]:
return ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
def get_max_age(self) -> int:
return 3600
def is_origin_allowed(origin: str) -> bool

Check if an origin is permitted for CORS requests.

Parameters
ParameterTypeDescription
`origin`strThe value of the Origin header from a CORS preflight request.
Returns
TypeDescription
boolTrue if the origin is allowed, False otherwise.
def get_allowed_headers() -> list[str]

Return the list of allowed request headers.

Returns
TypeDescription
list[str]List of header names that clients are allowed to send.
Common values["content-type", "authorization", "x-request-id"].
def get_allowed_methods() -> list[str]

Return the list of allowed HTTP methods.

Returns
TypeDescription
list[str]List of HTTP methods clients are allowed to use.
Typical["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"].
def get_max_age() -> int

Return the preflight cache duration in seconds.

Returns
TypeDescription
intMaximum time (in seconds) browsers should cache preflight responses. Typical values: 3600 (1 hour) to 86400 (24 hours).

Protocol for cache backend implementations.

This interface defines the contract that all cache backend implementations must follow. It provides a unified API for different storage mechanisms (memory, Redis, Memcached, etc.).

Example

class RedisBackend:
async def get(self, key: str) -> Any | None:
data = await self._redis.get(key)
return self._serializer.deserialize(data) if data else None
async def set(self, key: str, value: Any, ttl: int | None = None) -> bool:
data = self._serializer.serialize(value)
await self._redis.set(key, data, ex=ttl)
return True
async def get(key: str) -> Result[Any | None, CacheError]

Get a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to retrieve.
Returns
TypeDescription
Result[Any | None, CacheError]Ok(value) if found, Ok(None) if not found, Err(CacheError) on failure.

Note

The return type is Any by design: the cache is a heterogeneous store and the caller knows the expected type. Use cast(T, result.unwrap()) at call sites for type safety.

async def set(
    key: str,
    value: Any,
    ttl: int | None = None
) -> Result[None, CacheError]

Set a value in the cache with optional TTL.

Parameters
ParameterTypeDescription
`key`strThe cache key to set.
`value`AnyThe value to cache.
`ttl`int | NoneTime to live in seconds (optional).
Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def delete(key: str) -> Result[bool, CacheError]

Delete a value from the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to delete.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if deleted, Ok(False) if not found, Err(CacheError) on failure.
async def delete_many(keys: list[str]) -> Result[int, CacheError]

Delete multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to delete.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def delete_pattern(pattern: str) -> Result[int, CacheError]

Delete all keys matching a glob-style pattern.

Parameters
ParameterTypeDescription
`pattern`strGlob pattern (e.g. ``"pet:list:*"``). Supports ``*`` as a wildcard matching any sequence of characters.
Returns
TypeDescription
Result[int, CacheError]Ok(count) of deleted keys, Err(CacheError) on failure.
async def exists(key: str) -> Result[bool, CacheError]

Check if a key exists in the cache.

Parameters
ParameterTypeDescription
`key`strThe cache key to check.
Returns
TypeDescription
Result[bool, CacheError]Ok(True) if exists, Ok(False) otherwise, Err(CacheError) on failure.
async def clear() -> Result[None, CacheError]

Clear all values from the cache.

Returns
TypeDescription
Result[None, CacheError]Ok(None) if successful, Err(CacheError) on failure.
async def get_many(keys: list[str]) -> Result[dict[str, Any], CacheError]

Get multiple values from the cache.

Parameters
ParameterTypeDescription
`keys`list[str]List of cache keys to retrieve.
Returns
TypeDescription
Result[dict[str, Any], CacheError]Ok(dict) mapping found keys to values, Err(CacheError) on failure.
async def set_many(
    items: dict[str, Any],
    ttl: int | None = None
) -> Result[None, CacheError]

Set multiple values in the cache.

Parameters
ParameterTypeDescription
`items`dict[str, Any]Dictionary of key-value pairs to cache.
`ttl`int | NoneTime to live in seconds for all items.
Returns
TypeDescription
Result[None, CacheError]Ok(None) if all items set successfully, Err(CacheError) on failure.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a health check on the cache backend.

Returns
TypeDescription
HealthCheckResultStructured HealthCheckResult.

Protocol for cache providers.

Cache providers are responsible for setting up caching backends, cache warming, and cache management.


Protocol for CLI contributors that register generators, commands, health checks, doctor checks, shell context, and hooks into the CLI.

Each contributor is discovered via the lexigram.cli.contributors entry point group. Install the package to make contributions available.

property contributor_id() -> str

Unique identifier for this contributor (e.g. ‘core’, ‘web’, ‘sql’).

def get_generators() -> list[GeneratorDefinition]

Return all generator definitions this contributor provides.

Returns
TypeDescription
list[GeneratorDefinition]A list of GeneratorDefinition instances. May be empty.
def get_commands() -> list[CommandContribution]

Return CLI command groups contributed by this package.

Returns
TypeDescription
list[CommandContribution]A list of CommandContribution instances. May be empty.
def get_health_checks() -> list[HealthCheckContribution]

Return runtime health checks contributed by this package.

Health checks require a booted DI container.

Returns
TypeDescription
list[HealthCheckContribution]A list of HealthCheckContribution instances. May be empty.
def get_doctor_checks() -> list[DoctorCheckContribution]

Return static environment/config diagnostic checks.

Doctor checks are sync and require no container.

Returns
TypeDescription
list[DoctorCheckContribution]A list of DoctorCheckContribution instances. May be empty.
def get_shell_context() -> list[ShellContextContribution]

Return objects to inject into the interactive shell namespace.

Returns
TypeDescription
list[ShellContextContribution]A list of ShellContextContribution instances. May be empty.
def get_hooks() -> list[HookContribution]

Return CLI lifecycle hooks contributed by this package.

Returns
TypeDescription
list[HookContribution]A list of HookContribution instances. May be empty.

Injectable time source for the framework.
def now() -> datetime

Return the current timezone-aware UTC time.

def monotonic() -> float

Return a monotonic elapsed-time counter.

def timestamp() -> float

Return the current Unix timestamp in UTC seconds.

def time() -> float

Backward-compatible alias for timestamp.


Protocol for command bus implementations.

Example

class CommandBusProtocol:
async def dispatch(self, command: Command) -> Any:
handler = self._handlers[type(command)]
return await handler.handle(command)
async def dispatch(command: Any) -> Any

Dispatch a command to its handler.

Parameters
ParameterTypeDescription
`command`AnyCommand to dispatch.
Returns
TypeDescription
AnyResult from the command handler.

Protocol for configuration access across the framework.

Any configuration object (BaseConfig, LexigramConfig, or custom implementations) can satisfy this protocol by implementing get(), get_section(), and has_section().

Example

config = container.resolve(ConfigProtocol)
db_url = config.get("database.url", "sqlite:///default.db")
logging = config.get_section("logging", LoggingConfig)
property environment() -> Environment

The active deployment environment.

property is_production() -> bool

Return True when the active environment is production.

property is_development() -> bool

Return True when the active environment is development.

property is_testing() -> bool

Return True when the active environment is testing.

property is_staging() -> bool

Return True when the active environment is staging.

property is_debug() -> bool

Return True when debug mode is enabled.

def get(
    key: str,
    default: Any = None
) -> Any

Get a configuration value by dot-notation key.

Parameters
ParameterTypeDescription
`key`strConfiguration key (e.g. ``"app.name"`` or ``"database.url"``).
`default`AnyValue returned when the key is not found.
Returns
TypeDescription
AnyThe configuration value, or *default* if not found.
def get_section(
    name: str,
    model_cls: type[T] | None = None
) -> T | dict[str, Any]

Get a typed configuration section.

Parameters
ParameterTypeDescription
`name`strSection name (e.g. ``"logging"``).
`model_cls`type[T] | NoneOptional model class to coerce the section into.
Returns
TypeDescription
T | dict[str, Any]A model instance when *model_cls* is provided, otherwise a raw dict or the attribute value.
def has_section(name: str) -> bool

Check whether a configuration section exists.

Parameters
ParameterTypeDescription
`name`strSection name to check.
Returns
TypeDescription
boolTrue if the section is present.

Protocol for connection pools.
property max_connections() -> int
property connection_timeout() -> float
async def initialize() -> None

Initialize the connection pool.

async def shutdown() -> None

Shutdown the connection pool.

def get_connection() -> AbstractAsyncContextManager[Any]

Get a connection from the pool.

async def get_pool_stats() -> dict[str, Any]

Get pool statistics.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check pool health.

async def get_query_stats(time_range_seconds: int = 3600) -> dict[str, Any]

Get query statistics.

async def warm(count: int | None = None) -> None

Pre-create count connections to avoid cold-start latency.

Parameters
ParameterTypeDescription
`count`int | NoneNumber of connections to open. Defaults to ``min_connections`` (or the pool minimum) if not specified.
async def validate_connections() -> int

Validate all idle connections in the pool, evicting dead ones.

Returns
TypeDescription
intNumber of valid connections remaining after validation.

Protocol for database connections.

Represents an active database connection that can execute queries.

async def execute(
    sql: str,
    params: list[Any] | None = None
) -> QueryResult

Execute a SQL query.

Parameters
ParameterTypeDescription
`sql`strSQL query string.
`params`list[Any] | NoneQuery parameters for parameterized queries.
Returns
TypeDescription
QueryResultQueryResult with rows and metadata.
async def close() -> None

Close the connection.

async def fetch(
    query: str,
    *args: Any,
    **kwargs: Any
) -> list[dict[str, Any]]

Fetch rows from the database.

Parameters
ParameterTypeDescription
`query`strSQL query string. *args: Positional query parameters. **kwargs: Named query parameters.
Returns
TypeDescription
list[dict[str, Any]]List of row dictionaries.

Combined DI container protocol covering registration, resolution, and validation.

Use this instead of Any when a component needs to both register services (via singleton/transient) and resolve them (via resolve) within the same method. This is a structural Protocol — any object implementing both ContainerRegistrarProtocol and ContainerResolverProtocol satisfies this type.


Protocol for registering dependencies in the container.

Used during the registration phase where resolution is not yet permitted.

def transient(
    service_type: type[T],
    factory: Any,
    validate: bool = True
) -> None

Register a transient service.

def singleton(
    service_type: type[T],
    instance: T | None = None,
    *,
    name: str | None = None,
    factory: Any | None = None,
    validate: bool = True
) -> None
def singleton(
    service_type: Any,
    instance: Any = None,
    *,
    name: str | None = None,
    factory: Any | None = None,
    validate: bool = True
) -> None
def singleton(
    service_type: Any,
    instance: Any = None,
    *,
    name: str | None = None,
    factory: Any | None = None,
    validate: bool = True
) -> None

Register a singleton service (shared instance).

Parameters
ParameterTypeDescription
`service_type`AnyThe abstract type (protocol/class) being registered. Accepts both concrete classes (``type[T]`` with full type inference) and Protocol types (via ``Any`` fallback).
`instance`AnyPre-built singleton instance.
`name`str | NoneOptional string key. When provided, the binding is stored under this name instead of ``service_type``. Resolve via ``Annotated[T, Named(name)]``.
`factory`Any | NoneFactory callable for lazy creation.
`validate`boolWhether to validate protocol conformance.
def scoped(
    service_type: type[T],
    factory: Any,
    validate: bool = True,
    *,
    name: str | None = None
) -> None

Register a scoped service (one instance per scope).

Parameters
ParameterTypeDescription
`service_type`type[T]The abstract type being registered.
`factory`AnyFactory callable, called once per scope.
`validate`boolWhether to validate protocol conformance.
`name`str | NoneOptional string key for named scoped registration. Resolve via ``Annotated[T, Named(name)]``.
def has(service_type: Any) -> bool

Check if a service is registered.


Protocol for resolving dependencies from the container.

Async-first code should depend on this protocol. It provides only an asynchronous resolve() method and makes no guarantees about sync behaviour. All synchronous helpers have been removed to keep the API simple and forward‑looking.

Used during the boot phase and runtime after registration is complete.

async def resolve(
    service_type: type[T],
    *,
    bypass_visibility: bool = False
) -> T
async def resolve(
    service_type: Any,
    *,
    bypass_visibility: bool = False
) -> Any
async def resolve(
    service_type: Any,
    *,
    bypass_visibility: bool = False
) -> Any

Asynchronously resolve a service by its type.

The framework is async-first and does not support string or other loose keys. Consumers should only register and resolve using types so that the DI container remains fully type-safe.

Accepts both concrete classes (type[T] with full return-type inference) and Protocol types (via Any fallback).

Parameters
ParameterTypeDescription
`service_type`AnyThe service type to resolve.
`bypass_visibility`boolIf True, skip module visibility enforcement. Use only in framework-internal resolution paths.
async def call(
    func: Callable[Ellipsis, Awaitable[T] | T],
    *args: Any,
    **kwargs: Any
) -> T

Call a function with dependency injection.

def create_scope() -> Any

Create a request-scoped resolution context.

def has(service_type: Any) -> bool

Check if a service is registered.

async def resolve_optional(service_type: type[T]) -> T | None
async def resolve_optional(service_type: Any) -> Any | None
async def resolve_optional(service_type: Any) -> Any | None

Resolve a service, returning None if not registered.

This provides a graceful way to handle optional dependencies without catching exceptions.

Parameters
ParameterTypeDescription
`service_type`AnyThe type to resolve.
Returns
TypeDescription
Any | NoneThe resolved instance or None if the service is not registered.
async def resolve_all(service_type: type[T]) -> list[T]
async def resolve_all(service_type: Any) -> list[Any]
async def resolve_all(service_type: Any) -> list[Any]

Resolve all registered implementations that are subtypes of a service type.

Useful for collecting multiple implementations of a protocol or abstract base class (e.g. all registered EventHandlerProtocol instances).

Parameters
ParameterTypeDescription
`service_type`AnyThe base type whose implementations to resolve.
Returns
TypeDescription
list[Any]A list of resolved instances.

Relevance-based conversation history pruning.

Unlike sliding-window truncation, implementations score each turn for relevance to the current query and preserve high-value turns regardless of their position in the history.

async def prune(
    history: list[ChatMessageProtocol],
    current_query: str,
    max_turns: int
) -> list[ChatMessageProtocol]

Prune history to at most max_turns, preserving relevant turns.

Parameters
ParameterTypeDescription
`history`list[ChatMessageProtocol]Full conversation history as ChatMessageProtocol instances.
`current_query`strThe current user query (used for relevance scoring).
`max_turns`intMaximum number of turns to retain.
Returns
TypeDescription
list[ChatMessageProtocol]Pruned history in chronological order.

Protocol for cursor-based paginated result pages.

Implemented by CursorPage and Relay-compliant GraphQL page types. Use this for code that should work with any cursor-pagination result.

property items() -> list[T]

Items in this page.

property next_cursor() -> str | None

Opaque cursor for the next page, or None if no more.

property has_more() -> bool

Whether there are more results after this page.


Protocol for database providers.

This defines the interface that all database providers must implement, regardless of the underlying database technology.

Example

class PostgresProvider:
async def connect(self) -> None:
self._pool = await asyncpg.create_pool(self._dsn)
async def execute_query(
self,
sql: str,
params: list[Any] | None = None,
) -> QueryResult:
async with self._pool.acquire() as conn:
rows = await conn.fetch(sql, *params or [])
return QueryResult(rows=list(map(dict, rows)), ...)
async def connect() -> None

Establish connection to the database.

async def disconnect() -> None

Close connection to the database.

async def is_connected() -> bool

Check if database is connected.

async def execute_query(
    sql: str,
    params: list[Any] | None = None,
    **kwargs: Any
) -> QueryResult

Execute a SELECT query.

Parameters
ParameterTypeDescription
`sql`strSQL query string.
`params`list[Any] | NoneQuery parameters. **kwargs: Additional options.
Returns
TypeDescription
QueryResultQueryResult with rows and execution metadata.
async def execute_insert(
    table: str,
    data: dict[str, Any],
    **kwargs: Any
) -> InsertResult

Execute an INSERT operation.

Parameters
ParameterTypeDescription
`table`strTable name.
`data`dict[str, Any]Column-value mapping. **kwargs: Additional options.
Returns
TypeDescription
InsertResultInsertResult with inserted ID and influenced rows.
async def execute_update(
    table: str,
    data: dict[str, Any],
    where_clause: str,
    where_params: list[Any] | None = None,
    **kwargs: Any
) -> UpdateResult

Execute an UPDATE operation.

Parameters
ParameterTypeDescription
`table`strTable name.
`data`dict[str, Any]Column-value updates.
`where_clause`strWHERE condition.
`where_params`list[Any] | NoneParameters for WHERE clause. **kwargs: Additional options.
Returns
TypeDescription
UpdateResultUpdateResult with affected rows.
async def execute_delete(
    table: str,
    where_clause: str,
    where_params: list[Any] | None = None,
    **kwargs: Any
) -> DeleteResult

Execute a DELETE operation.

Parameters
ParameterTypeDescription
`table`strTable name.
`where_clause`strWHERE condition.
`where_params`list[Any] | NoneParameters for WHERE clause. **kwargs: Additional options.
Returns
TypeDescription
DeleteResultDeleteResult with affected rows.
async def execute(
    sql: str,
    params: Any = None
) -> QueryResult

Execute a raw SQL query with parameters.

Parameters
ParameterTypeDescription
`sql`strSQL query string.
`params`AnyQuery parameters.
Returns
TypeDescription
QueryResultQueryResult with execution results.
def transaction(isolation_level: IsolationLevel | None = None) -> AbstractAsyncContextManager[Any]

Context manager for transactions.

Parameters
ParameterTypeDescription
`isolation_level`IsolationLevel | NoneOptional ANSI SQL isolation level. When ``None`` the driver's default isolation level is used.

Example

async with db.transaction(isolation_level=IsolationLevel.SERIALIZABLE):
await db.execute("INSERT INTO ...")
await db.execute("UPDATE ...")
async def begin_transaction() -> None

Begin a transaction.

async def commit_transaction() -> None

Commit current transaction.

async def rollback_transaction() -> None

Rollback current transaction.

async def table_exists(table_name: str) -> bool

Check if a table exists.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check on database connection.

def scoped_context() -> AbstractAsyncContextManager[Any]

Return an async context manager that establishes a scoped session.

The scoped context binds a database session to the current async context so that get_scoped_connection can retrieve it without passing the connection around explicitly.

Returns
TypeDescription
AbstractAsyncContextManager[Any]Async context manager that yields no value (or the session).
async def get_scoped_connection() -> ConnectionProtocol

Return the connection bound to the current scoped context.

Must be called within an active scoped_context block.

Returns
TypeDescription
ConnectionProtocolActive ConnectionProtocol for the current scope.
async def acquire() -> ConnectionProtocol

Acquire a connection from the pool for manual management.

Use this when you need fine-grained control over connection lifecycle, but prefer scoped_context when possible for automatic cleanup.

Returns
TypeDescription
ConnectionProtocolAn acquired connection that must be released via release.

Example

conn = await db.acquire()
try:
result = await conn.execute("SELECT * FROM users")
finally:
await db.release(conn)
async def release(connection: ConnectionProtocol) -> None

Release a connection back to the pool.

Parameters
ParameterTypeDescription
`connection`ConnectionProtocolConnection acquired via acquire.

Protocol for distributed locking.

Distributed locks coordinate access to shared resources across multiple processes, threads, or services. They provide:

  • Mutual exclusion: Only one holder at a time
  • Safety: Locks expire after a TTL to prevent deadlocks
  • Non-blocking acquire: Returns immediately if lock is held

Implementations must be safe for use in async contexts and handle network partitions gracefully.

Example

lock = RedisLock(redis, "my-resource", ttl=30)
# Non-blocking acquire
if await lock.acquire():
try:
await process()
finally:
await lock.release()
# Blocking with timeout
if await lock.acquire_blocking(timeout=5.0):
try:
await process()
finally:
await lock.release()
# Context manager (preferred)
async with lock:
await process()
async def acquire() -> bool

Attempt to acquire the lock non-blocking.

Returns immediately with True if the lock was acquired, False if the lock is already held by another process.

Returns
TypeDescription
boolTrue if lock was acquired, False otherwise
async def acquire_blocking(timeout: float | None = None) -> bool

Attempt to acquire the lock with optional timeout.

Blocks until the lock is acquired or the timeout expires.

Parameters
ParameterTypeDescription
`timeout`float | NoneMaximum seconds to wait. None means wait forever.
Returns
TypeDescription
boolTrue if lock was acquired, False if timeout expired
async def release() -> bool

Release the lock.

Returns
TypeDescription
boolTrue if lock was released by this call, False if lock was not held or already expired
async def is_held() -> bool

Check if this instance currently holds the lock.

Returns
TypeDescription
boolTrue if this instance holds the lock
async def extend(additional_time: float) -> bool

Extend the lock TTL.

Useful for long-running operations that need to prevent the lock from expiring mid-operation.

Parameters
ParameterTypeDescription
`additional_time`floatSeconds to add to the TTL
Returns
TypeDescription
boolTrue if TTL was extended, False if lock not held

Document-centric vector store API for RAG and AI packages.

This is the AI-layer contract for add/search/delete against embedded documents. For connection lifecycle and collection management, use VectorStoreProtocol (infrastructure layer).

All vector database drivers (Qdrant, ChromaDB, PGVector, etc.) are adapted to this shape via lexigram-vector so callers can swap backends.

async def add(documents: list[DocumentProtocol]) -> Result[list[str], VectorError]

Add documents to the vector store.

Parameters
ParameterTypeDescription
`documents`list[DocumentProtocol]List of documents with text and metadata.
Returns
TypeDescription
Result[list[str], VectorError]``Ok(list[str])`` with document IDs on success, or ``Err(VectorStoreError)`` on failure.
async def batch_upsert(
    documents: list[DocumentProtocol],
    batch_size: int = 100
) -> Result[int, VectorError]

Upsert documents in batches.

Parameters
ParameterTypeDescription
`documents`list[DocumentProtocol]List of documents to upsert.
`batch_size`intNumber of documents per batch. defaults to 100.
Returns
TypeDescription
Result[int, VectorError]``Ok(int)`` with the total count of documents upserted, or ``Err(VectorStoreError)`` on failure.
async def search(
    query: list[float],
    *,
    top_k: int = 10,
    filters: dict[str, Any] | None = None,
    score_threshold: float | None = None
) -> Result[list[SearchResultProtocol], VectorError]

Search for similar documents using a vector.

Parameters
ParameterTypeDescription
`query`list[float]Query embedding vector.
`top_k`intNumber of results to return.
`filters`dict[str, Any] | NoneMetadata filters.
`score_threshold`float | NoneMinimum relevance score.
Returns
TypeDescription
Result[list[SearchResultProtocol], VectorError]``Ok(list of search results)`` on success, or ``Err(VectorStoreError)`` on failure.
async def search_text(
    query: str,
    *,
    top_k: int = 10,
    filters: dict[str, Any] | None = None
) -> Result[list[SearchResultProtocol], VectorError]

Search for similar documents using text.

Parameters
ParameterTypeDescription
`query`strQuery text.
`top_k`intNumber of results to return.
`filters`dict[str, Any] | NoneMetadata filters.
Returns
TypeDescription
Result[list[SearchResultProtocol], VectorError]``Ok(list of search results)`` on success, or ``Err(VectorStoreError)`` on failure.
async def delete(ids: list[str]) -> Result[int, VectorError]

Delete documents by ID.

Parameters
ParameterTypeDescription
`ids`list[str]List of document IDs to delete.
Returns
TypeDescription
Result[int, VectorError]``Ok(int)`` with the count of deleted documents on success, or ``Err(VectorStoreError)`` on failure.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Protocol for publishing domain events.
async def publish(event: Any) -> None

Publish a domain event.

Parameters
ParameterTypeDescription
`event`AnyDomainEvent instance.

Protocol for embedding client implementations.
async def embed(texts: list[str]) -> list[list[float]]

Generate embeddings for texts.

Parameters
ParameterTypeDescription
`texts`list[str]List of strings to embed.
Returns
TypeDescription
list[list[float]]List of embedding vectors.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check.

Returns
TypeDescription
HealthCheckResultStructured health check result.
async def close() -> None

Close the client and release resources.


Protocol for episodic memory (conversation history with temporal grounding).

Episodic memory stores timestamped events and conversations, supporting hybrid retrieval based on recency and relevance.

async def record(entry: MemoryEntry) -> None

Record a new episode/conversation turn.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe episode to record
async def recall(query: MemoryQuery) -> list[MemorySearchResult]

Recall episodes based on relevance and recency.

Parameters
ParameterTypeDescription
`query`MemoryQueryQuery parameters with weighting preferences
Returns
TypeDescription
list[MemorySearchResult]Ranked list of matching episodes
async def forget(entry_id: str) -> None

Forget/delete a specific episode.

Parameters
ParameterTypeDescription
`entry_id`strID of the episode to forget
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check if the episodic memory backend is reachable and healthy.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for a response.
Returns
TypeDescription
HealthCheckResultHealth check result with status and details.

Protocol for event bus implementations.

The event bus manages event publication and subscription.

Example

```python
class InMemoryEventBus:
async def publish(self, event: DomainEvent) -> "Result[None, EventError]":
for handler in self._handlers.get(type(event), []):
await handler.handle(event)
return Ok(None)
def subscribe(self, event_type, handler):
self._handlers.setdefault(event_type, []).append(handler)
<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>async </span><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>publish</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>Any</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L97' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Publish an event to all subscribers.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Any</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>DomainEvent to publish.</td></tr></tbody></table></div>
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Returns</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:45%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Ok(None) when the event is successfully enqueued for dispatch. Err(EventError) when the event cannot be accepted (e.g.\ no handlers registered and the bus requires at least one).</td></tr></tbody></table></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>subscribe</span><span style='color: var(--lex-color-colon)'>(</span>
<span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span>
<span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>EventHandlerProtocol</span>
<span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L110' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Subscribe a handler to an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to subscribe to.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>EventHandlerProtocol</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to call when event is published.</td></tr></tbody></table></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>unsubscribe</span><span style='color: var(--lex-color-colon)'>(</span>
<span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span>
<span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>EventHandlerProtocol</span>
<span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L119' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Remove a handler subscription for an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to unsubscribe from.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>EventHandlerProtocol</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to remove.</td></tr></tbody></table></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventHandlerProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L37' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Protocol for event handlers.
<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>async </span><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>handle</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>Any</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L40' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Handle an event.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Any</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>DomainEvent to handle.</td></tr></tbody></table></div>
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Returns</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:45%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>``Ok(None)`` on success, ``Err(EventError)`` if handling fails in an expected, recoverable way.</td></tr></tbody></table></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventStoreProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L253' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Protocol for event store implementations.
The event store persists and retrieves domain events.
**Example**
```python
class PostgresEventStore:
async def append(self, stream_id, events, expected_version=None):
async with self.db.transaction():
for event in events:
await self.db.insert("events", event.to_dict())
async def append(
    stream_id: str,
    events: list[Any],
    expected_version: int | None = None
) -> int

Append events to a stream.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`events`list[Any]List of events to append.
`expected_version`int | NoneExpected stream version for optimistic concurrency.
Returns
TypeDescription
intNew stream version.
Raises
ExceptionDescription
ConcurrencyErrorIf expected version doesn't match.
async def read(
    stream_id: str,
    start: int = 0,
    count: int | None = None
) -> list[Any]

Read events from a stream.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`start`intStarting position.
`count`int | NoneMaximum events to read.
Returns
TypeDescription
list[Any]List of events.
async def read_all(
    position: int = 0,
    count: int | None = None
) -> list[Any]

Read events from all streams.

Parameters
ParameterTypeDescription
`position`intStarting global position.
`count`int | NoneMaximum events to read.
Returns
TypeDescription
list[Any]List of events.

Standard protocol for services that support graceful shutdown.

Any service that holds background tasks, open connections, or in-flight work SHOULD implement this protocol. The framework will call shutdown() on all registered services during teardown.

Behavioral Contract: - shutdown() MUST be idempotent — calling it twice is safe. - shutdown() SHOULD complete within a reasonable timeout. - shutdown() MUST NOT raise exceptions. - shutdown() SHOULD release all held resources (tasks, connections, file handles) before returning.

async def shutdown() -> None

Request graceful shutdown and wait for completion.


General-purpose hashing protocol (non-password-specific).
property algorithm() -> str

Hash algorithm name (for example sha256 or blake2b).

def digest(data: str | bytes) -> str

Hash input data and return an encoded digest.

Parameters
ParameterTypeDescription
`data`str | bytesInput string or bytes to hash.
Returns
TypeDescription
strEncoded hash string.
def verify_digest(
    data: str | bytes,
    expected: str
) -> bool

Constant-time verification against an expected digest.

Parameters
ParameterTypeDescription
`data`str | bytesInput string or bytes to hash.
`expected`strExpected encoded digest.
Returns
TypeDescription
boolTrue when the digest matches, False otherwise.
async def hash(value: str) -> str

Backward-compatible async alias for digest.

async def verify(
    value: str,
    hashed_value: str
) -> bool

Backward-compatible async alias for verify_digest.


Protocol for services that aggregate health checks from multiple providers.

Implementations collect HealthCheckProtocol instances, run them concurrently, and return a single AggregateHealthResult.

def register(
    name: str,
    check: HealthCheckProtocol,
    *,
    category: HealthCheckCategory = HealthCheckCategory.READINESS
) -> None

Register a named health check.

Parameters
ParameterTypeDescription
`name`strUnique component name for this check.
`check`HealthCheckProtocolObject implementing HealthCheckProtocol.
`category`HealthCheckCategoryProbe category for this check. Defaults to HealthCheckCategory.READINESS.
async def run_all(
    timeout: float = 5.0,
    *,
    category: HealthCheckCategory | None = None
) -> AggregateHealthResult

Run all registered checks concurrently and aggregate results.

Parameters
ParameterTypeDescription
`timeout`floatPer-check timeout in seconds.
`category`HealthCheckCategory | NoneOptional probe category filter.
Returns
TypeDescription
AggregateHealthResultAggregateHealthResult containing all component results.
async def run_liveness(timeout: float = 5.0) -> AggregateHealthResult

Run only liveness checks and aggregate results.

async def run_readiness(timeout: float = 5.0) -> AggregateHealthResult

Run only readiness checks and aggregate results.

async def run_startup(timeout: float = 5.0) -> AggregateHealthResult

Run only startup checks and aggregate results.


Protocol for health check capability.

Expected Behavior:

  • health_check: MUST return a result within a reasonable timeout (~5s).
  • health_check: SHOULD NOT raise exceptions; wrap errors in HealthCheckResult.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Protocol for a categorised health check registry.

Implementations (e.g. HealthChecker) store checks tagged with a HealthCheckCategory so that callers can query subsets independently:

  • run_liveness — is the process alive and not deadlocked?
  • run_readiness — is the process ready to accept traffic?
  • run_startup — has initial startup completed?

This maps directly to the three Kubernetes probe types.

def add(
    name: str,
    check: Callable[[], Any],
    *,
    timeout: float | None = None,
    critical: bool = True,
    category: HealthCheckCategory = HealthCheckCategory.READINESS
) -> None

Register a categorised health check.

Parameters
ParameterTypeDescription
`name`strUnique identifier for the check.
`check`Callable[[], Any]Callable that performs the check.
`timeout`float | NoneOptional per-check timeout in seconds.
`critical`boolWhether a non-healthy result should make the aggregate readiness status ``UNHEALTHY``. Defaults to ``True``.
`category`HealthCheckCategoryHealthCheckCategory value. Defaults to ``READINESS``.
async def run_all() -> tuple[Any, dict[str, Any]]

Run all registered checks regardless of category.

Returns
TypeDescription
tuple[Any, dict[str, Any]]``(aggregate_status, per_check_results)`` tuple.
async def run_liveness() -> tuple[Any, dict[str, Any]]

Run only LIVENESS checks.

Returns
TypeDescription
tuple[Any, dict[str, Any]]``(aggregate_status, per_check_results)`` tuple.
async def run_readiness() -> tuple[Any, dict[str, Any]]

Run only READINESS checks.

Returns
TypeDescription
tuple[Any, dict[str, Any]]``(aggregate_status, per_check_results)`` tuple.
async def run_startup() -> tuple[Any, dict[str, Any]]

Run only STARTUP checks.

Returns
TypeDescription
tuple[Any, dict[str, Any]]``(aggregate_status, per_check_results)`` tuple.

Protocol for HTTP request/response logging middleware.

Defines how to log completed HTTP requests with duration, status code, and optional metadata for monitoring and audit purposes.

Example

class RequestLogger:
async def log_request(
self,
method: str,
path: str,
status_code: int,
duration_ms: float,
request_id: str | None = None,
**metadata: Any,
) -> None:
logger.info(
"request_completed",
method=method,
path=path,
status=status_code,
duration_ms=duration_ms,
request_id=request_id,
**metadata,
)
async def log_request(
    method: str,
    path: str,
    status_code: int,
    duration_ms: float,
    request_id: str | None = None,
    **metadata: Any
) -> None

Log a completed HTTP request.

Parameters
ParameterTypeDescription
`method`strHTTP method (GET, POST, etc.).
`path`strRequest path URI.
`status_code`intHTTP response status code.
`duration_ms`floatRequest processing duration in milliseconds.
`request_id`str | NoneOptional request identifier for tracing. **metadata: Additional context-specific data (client_id, user_id, etc.).

Injectable identity generator.
property strategy() -> IdStrategy

Return the active ID generation strategy.

def generate() -> str

Generate a new unique identifier.

def generate_for(entity_type: str) -> str

Generate a new identifier for a specific entity type.


Protocol for HTTP idempotency deduplication middleware.
async def process(
    headers: dict[str, str],
    handler: Any,
    *args: Any,
    **kwargs: Any
) -> Any

Run the handler with idempotency deduplication.

property ttl() -> float

Default TTL (seconds) for cached idempotency results.


Protocol for storing and checking idempotency keys.
async def get(key: str) -> Result[Any | None, IdempotencyError]

Retrieve a stored result by idempotency key.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
Returns
TypeDescription
Result[Any | None, IdempotencyError]``Ok(value)`` when a cached result exists. ``Ok(None)`` when the key is not present or expired. ``Err(IdempotencyError)`` on store failures.
async def get_record(key: str) -> Result[IdempotencyRecord | None, IdempotencyError]

Retrieve the full idempotency record, including metadata.

async def set(
    key: str,
    value: Any,
    ttl: float | None = None
) -> Result[None, IdempotencyError]

Store a result with an optional time-to-live.

Parameters
ParameterTypeDescription
`key`strThe idempotency key.
`value`AnyThe result to store.
`ttl`float | NoneTime-to-live in seconds, or ``None`` for no expiry.
async def delete(key: str) -> Result[None, IdempotencyError]

Remove an idempotency record by key.

async def acquire(
    key: str,
    ttl: int
) -> Result[bool, IdempotencyError]

Atomically claim an idempotency key if it is not already held.

Parameters
ParameterTypeDescription
`key`strThe idempotency key to acquire.
`ttl`intTime-to-live in seconds for the claimed key.
Returns
TypeDescription
Result[bool, IdempotencyError]``Ok(True)`` if this caller should proceed. ``Ok(False)`` if the key is already claimed. ``Err(IdempotencyError)`` on store failures.

Protocol for JobProtocol objects.

Protocol for key derivation services.
async def derive(
    secret: str,
    *,
    salt: bytes | None = None
) -> str

Derive an encoded key from a secret.

Parameters
ParameterTypeDescription
`secret`strInput secret to derive from.
`salt`bytes | NoneOptional salt. When omitted, implementations generate one.
Returns
TypeDescription
strStable encoded key derivation string.
async def verify(
    secret: str,
    encoded: str
) -> bool

Verify a secret against a derived key payload.

Parameters
ParameterTypeDescription
`secret`strInput secret to verify.
`encoded`strStable encoded key derivation string.
Returns
TypeDescription
boolTrue when the secret matches the encoded payload, False otherwise.
async def hash(
    secret: str,
    *,
    salt: bytes | None = None
) -> str

Backward-compatible async alias for derive.


Protocol for LLM client implementations.

All LLM providers (OpenAI, Anthropic, Google, etc.) should implement this interface.

async def complete(
    messages: list[ChatMessageProtocol],
    *,
    model: str | None = None,
    temperature: float | None = None,
    max_tokens: int | None = None,
    tools: list[ToolDefinition] | None = None,
    stop_sequences: list[str] | None = None,
    **kwargs: Any
) -> Result[CompletionProtocol, LLMError]

Generate a completion from messages.

Parameters
ParameterTypeDescription
`messages`list[ChatMessageProtocol]List of chat messages.
`model`str | NoneOptional model override.
`temperature`float | NoneOptional sampling temperature.
`max_tokens`int | NoneOptional max output tokens.
`tools`list[ToolDefinition] | NoneOptional tool definitions.
`stop_sequences`list[str] | NoneOptional list of stop sequences for early termination. **kwargs: Provider-specific options.
Returns
TypeDescription
Result[CompletionProtocol, LLMError]``Ok(Completion)`` on success, ``Err(LLMError)`` on recoverable failure.
def stream_chat(
    messages: list[ChatMessageProtocol],
    *,
    model: str | None = None,
    temperature: float | None = None,
    max_tokens: int | None = None,
    tools: list[ToolDefinition] | None = None,
    stop_sequences: list[str] | None = None,
    **kwargs: Any
) -> AsyncStream[StreamChunk, LLMError]

Start a streaming completion.

This method returns an AsyncStream immediately (not wrapped in Result). The stream is established lazily when iteration begins. Setup failures and mid-stream failures are both surfaced through the stream’s typed error channel.

Parameters
ParameterTypeDescription
`messages`list[ChatMessageProtocol]List of chat messages.
`model`str | NoneOptional model override.
`temperature`float | NoneOptional sampling temperature.
`max_tokens`int | NoneOptional max output tokens.
`tools`list[ToolDefinition] | NoneOptional tool definitions.
`stop_sequences`list[str] | NoneOptional list of stop sequences for early termination. **kwargs: Provider-specific options.
Returns
TypeDescription
AsyncStream[StreamChunk, LLMError]``AsyncStream[StreamChunk, LLMError]`` that yields chunks or surfaces typed errors through terminal operations (``collect()``, ``first()``, ``drain()``).
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check.

Returns
TypeDescription
HealthCheckResultStructured health check result.
async def close() -> None

Close the client and release resources.


Protocol for a process-local or distributed lock manager.

A lock manager creates and tracks named locks for coordinating mutual exclusion of concurrent operations. Callers use acquire to obtain a per-key async context manager.

Implementations range from purely in-memory (single-process deduplication) to Redis-backed or advisory-lock-backed distributed variants.

Example

from lexigram.contracts.lock import LockManagerProtocol
async def process(manager: LockManagerProtocol) -> None:
async with manager.acquire("resource:42", timeout=30):
await do_work()

Container registration

container.singleton(LockManagerProtocol, InMemoryLockManager)
def acquire(
    key: str,
    timeout: float = 60.0
) -> AbstractAsyncContextManager[Any]

Return an async context manager that holds the named lock.

The caller must use the returned value as an async context manager. The lock is acquired on __aenter__ and released on __aexit__.

Parameters
ParameterTypeDescription
`key`strUnique, stable identifier for the resource to protect.
`timeout`floatInformational TTL in seconds; implementations may use this to automatically expire stale locks.
Returns
TypeDescription
AbstractAsyncContextManager[Any]An ``AbstractAsyncContextManager`` for the named lock.

Persistent distributed lock store.

Individual lock handles are not returned; all operations are done via the lock name and owner identifier.

async def acquire(
    lock_name: str,
    owner: str,
    ttl: int
) -> bool

Attempt to acquire the named lock.

Parameters
ParameterTypeDescription
`lock_name`strGlobally unique lock identifier.
`owner`strIdentifier of the requesting owner (e.g. host + PID).
`ttl`intLock time-to-live in seconds. The lock is automatically released after this duration to prevent deadlocks.
Returns
TypeDescription
bool``True`` if the lock was acquired; ``False`` if it is already held.
async def release(
    lock_name: str,
    owner: str
) -> bool

Release the named lock only if it is held by owner.

Parameters
ParameterTypeDescription
`lock_name`strGlobally unique lock identifier.
`owner`strMust match the owner that acquired the lock.
Returns
TypeDescription
bool``True`` if the lock was released; ``False`` if it was not held by *owner* or did not exist.
async def extend(
    lock_name: str,
    owner: str,
    ttl: int
) -> bool

Extend the TTL of a currently-held lock.

Parameters
ParameterTypeDescription
`lock_name`strGlobally unique lock identifier.
`owner`strMust match the owner that currently holds the lock.
`ttl`intNew time-to-live in seconds from now.
Returns
TypeDescription
bool``True`` if the extension was applied; ``False`` if the lock was not held by *owner*.

Protocol for the MCP server.

The server routes JSON-RPC messages to handlers and returns JSON-RPC responses.

async def handle_message(message: dict[str, Any]) -> dict[str, Any] | None

Handle a JSON-RPC message and return the response.

Returns None for notifications (no response expected).


Protocol for providing tools to the MCP server.

Satisfied by:

  • ToolRegistryAdapter (bridges lexigram-agents ToolRegistry)
  • Any custom tool provider
async def list_tools() -> list[dict[str, Any]]

List all available tools in MCP format.

Returns list of dicts with name, description, inputSchema keys.

async def call_tool(
    name: str,
    arguments: dict[str, Any]
) -> dict[str, Any]

Execute a tool by name with given arguments.


Protocol for MCP transport implementations.

A transport handles the I/O layer — reading requests and writing responses. The MCP server is transport-agnostic.

async def start() -> None

Start the transport.

async def stop() -> None

Stop the transport.


Protocol for consolidating and optimizing memory.

Consolidation compresses old entries, deduplicates, extracts entities, and manages memory size to prevent unbounded growth.

async def consolidate(entries: list[MemoryEntry]) -> ConsolidationResult

Consolidate a batch of memory entries.

Applies compression, deduplication, entity extraction, and pruning strategies to optimize memory storage and retrieval.

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to consolidate
Returns
TypeDescription
ConsolidationResultConsolidation result with statistics
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check if the memory consolidator backend is reachable and healthy.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for a response.
Returns
TypeDescription
HealthCheckResultHealth check result with status and details.

Protocol for conversation memory.
async def add_message(message: Any) -> None

Add a message to memory.

Parameters
ParameterTypeDescription
`message`AnyMessage to add.
async def get_messages() -> list[Any]

Get all messages from memory.

Returns
TypeDescription
list[Any]List of messages.
async def clear() -> None

Clear all messages from memory.


Protocol for storing and retrieving memory entries.

Implementations should provide persistent or semi-persistent storage for memory entries, supporting both sequential and search-based access.

async def store(entry: MemoryEntry) -> None

Store a single memory entry.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to store
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Search for memory entries based on a query.

Parameters
ParameterTypeDescription
`query`MemoryQueryQuery parameters
Returns
TypeDescription
list[MemorySearchResult]List of matching entries with scores, ordered by relevance
async def get_recent(n: int) -> list[MemoryEntry]

Get the N most recent entries.

Parameters
ParameterTypeDescription
`n`intNumber of entries to return
Returns
TypeDescription
list[MemoryEntry]List of recent entries in descending temporal order
async def delete(entry_id: str) -> None

Delete an entry by ID.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to delete
async def clear() -> None

Clear all entries from the store.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check.


Protocol for migration management.
async def initialize_migration_table() -> None

Initialize the migration tracking table.

async def get_applied_migrations() -> list[MigrationRecord]

Get list of applied migrations.

async def apply_migration(
    version: str,
    name: str,
    sql: str
) -> bool

Apply a migration.

async def rollback_migration(version: str) -> bool

Rollback a migration.

async def get_pending_migrations(available_migrations: list[str]) -> list[str]

Get migrations that haven’t been applied yet.


Executes database migrations in the correct order.

Implementations must be idempotent: running run_migrations() on an already up-to-date schema must be a no-op. Each migration is identified by a unique string version identifier (e.g. "20240101_001" or the Alembic revision hash).

Typical usage

runner = await container.resolve(MigrationRunnerProtocol)
result = await runner.run_migrations()
if result:
logger.info("migrations_applied", count=len(result))
else:
logger.info("schema_up_to_date")
async def run_migrations() -> list[str]

Apply all pending migrations.

Returns
TypeDescription
list[str]List of migration version identifiers that were applied. Empty list means the schema was already up-to-date.
async def rollback(target: str | None = None) -> list[str]

Roll back migrations to target.

Parameters
ParameterTypeDescription
`target`str | NoneVersion identifier to roll back to. ``None`` rolls back the most recent migration only.
Returns
TypeDescription
list[str]List of migration version identifiers that were rolled back.
async def get_current_version() -> str | None

Return the version identifier of the most recently applied migration.

Returns
TypeDescription
str | NoneVersion string, or ``None`` if no migrations have been applied.
async def get_pending_migrations() -> list[str]

Return version identifiers of migrations not yet applied.

Returns
TypeDescription
list[str]Ordered list of pending migration version identifiers.

Protocol for a registry-based object-to-object mapper.

Provides a clean map() API for converting between registered type pairs without ad-hoc conversion logic scattered through the codebase.

def map(
    source: Any,
    dest_type: type[Any],
    *,
    validate: bool = False,
    validator: Any | None = None
) -> Any

Map a source object to the destination type.

Uses a registered mapping function to transform the source instance into an instance of dest_type.

Parameters
ParameterTypeDescription
`source`AnyThe source object to transform.
`dest_type`type[Any]The target type to produce.
`validate`boolWhether to validate the result after mapping.
`validator`Any | NoneOptional validator instance to use.
Returns
TypeDescription
AnyA new instance of dest_type.
Raises
ExceptionDescription
MappingNotFoundErrorIf no mapping is registered for this type pair.
def register(
    source_type: type[Any],
    dest_type: type[Any],
    mapper_func: Any
) -> None

Register a mapping function from source_type to dest_type.

Parameters
ParameterTypeDescription
`source_type`type[Any]The type to map from.
`dest_type`type[Any]The type to map to.
`mapper_func`AnyA callable that accepts a source instance and returns a dest instance.

Protocol for offset-based page request parameters.

Implemented by offset/limit style pagination inputs. Use this for code that should work with any offset-pagination type.

property offset() -> int

Zero-based record offset.

property limit() -> int

Maximum number of records to return.


Provider implements this to run after application bootstrap.

Called after ALL providers have been booted successfully. At this point every service is fully resolved and available.

Behavioral Contract: - Implementers MUST NOT throw exceptions — doing so indicates a fatal application startup failure. - Implementers CAN resolve any service from the container. - Implementers SHOULD start background tasks, schedulers, or listeners here rather than in on_module_init().

async def on_application_bootstrap() -> None

Run after all providers are booted.


Provider implements this to run after shutdown completes.

Called after ALL providers have been shut down. Use this for final cleanup like closing log files or flushing metrics.

Behavioral Contract: - Implementers MUST NOT resolve services — the container is already disposed at this point. - Implementers MUST NOT throw exceptions. - Implementers SHOULD release any remaining OS resources.

async def on_application_shutdown(signal: str | None = None) -> None

Run after all providers are shut down.


Provider implements this to run before shutdown begins.

Called before any provider’s shutdown() method is invoked. Use this to drain queues, finish in-flight requests, or save state.

Behavioral Contract: - Implementers MUST complete within a reasonable timeout. - Implementers SHOULD NOT start new work — only finish existing work. - Implementers MUST NOT throw exceptions.

async def on_before_shutdown(signal: str | None = None) -> None

Run before shutdown begins.


Provider implements this to run initialization logic.

Called once when the provider is first booted, BEFORE other providers that depend on it.

Behavioral Contract: - Implementers MUST be idempotent — calling on_module_init() twice MUST NOT produce side effects or raise. - Implementers SHOULD complete quickly — blocking here delays the entire boot sequence. - Implementers MUST NOT resolve services from other providers that have not yet been booted.

async def on_module_init() -> None

Initialize provider after registration but before other providers boot.


Protocol for password hashing services.

Responsible for hashing passwords and verifying them against hashes.

async def hash(password: str) -> str

Hash a plain text password.

Parameters
ParameterTypeDescription
`password`strPlain text password.
Returns
TypeDescription
strHashed password string.
async def verify(
    password: str,
    hashed_password: str
) -> bool

Verify a password against a hash.

Parameters
ParameterTypeDescription
`password`strPlain text password to check.
`hashed_password`strStored hash to compare against.
Returns
TypeDescription
boolTrue if password matches hash, False otherwise.

Protocol for an individual connection pool.
def get_stats() -> PoolStats
async def close() -> None

Manager that tracks pools by name.
def get_stats() -> dict[str, PoolStats]
async def get_pool(name: str) -> Pool

Snapshot of pool statistics.

Assembles prompt layers in cache-friendly static-to-dynamic order.

The static-first ordering maximizes provider-side KV cache reuse. The assembler also injects provider-specific cache annotations (Anthropic’s cache_control breakpoints, DeepSeek’s 64-token padding, etc.).

def assemble(
    system: str,
    tools: list[ToolDefinition] | None,
    reference_docs: list[str] | None,
    few_shot: list[ChatMessage] | None,
    history: list[ChatMessage],
    query: str,
    provider: str,
    dynamic_metadata: str | None = None
) -> list[ChatMessage]

Assemble a complete prompt message list.

Parameters
ParameterTypeDescription
`system`strSystem instructions (static, cached).
`tools`list[ToolDefinition] | NoneTool/function definitions.
`reference_docs`list[str] | NoneReference documents as text (semi-static).
`few_shot`list[ChatMessage] | NoneFew-shot example messages (static, cached).
`history`list[ChatMessage]Chat history messages (dynamic).
`query`strCurrent user query (dynamic).
`provider`strProvider name for cache annotation strategy.
`dynamic_metadata`str | NoneTimestamps, user IDs, etc. (dynamic).
Returns
TypeDescription
list[ChatMessage]Ordered list of ChatMessage instances ready for the LLM client.

Compresses text to fit within a token budget.

Implementations range from learned compression (LLMLingua-2) to heuristic truncation. The protocol guarantees that the returned text fits within target_token_count.

Placement note: Lives in ai/rag.py because its primary consumer is the RAG context compression stage. Also consumed by lexigram-ai-memory.

async def compress(
    text: str,
    target_token_count: int,
    force_tokens: list[str] | None = None
) -> str

Compress text to fit within target_token_count.

Parameters
ParameterTypeDescription
`text`strThe raw text to compress.
`target_token_count`intMaximum tokens in the result.
`force_tokens`list[str] | NoneTokens that must never be removed.
Returns
TypeDescription
strCompressed text fitting within the budget.

Contract for framework providers.

Providers are responsible for:

  • Registering bindings into the container (register phase)
  • Initializing external resources (boot phase)
  • Cleaning up resources on shutdown (shutdown phase)

Expected Behavior:

  • register: MUST be declarative. No external side effects or resolution.
  • boot: CAN resolve services. MUST be idempotent if called twice.
  • shutdown: MUST gracefully close resources. SHOULD NOT raise.
property name() -> str

Unique provider identifier (e.g. ‘auth’, ‘database’).

property priority() -> ProviderPriority

Initialization order hint.

property dependencies() -> Sequence[str]

Names of providers that must be booted before this one.

async def register(container: ContainerRegistrarProtocol) -> None

Bind services into the container. declarative phase.

async def boot(container: BootContainerProtocol) -> None

Initialize and wire services. Resolution and registration allowed here.

async def shutdown() -> None

Tear down resources on application exit.

async def on_error(
    error: Exception,
    phase: str
) -> None

Called when boot() or shutdown() raises an exception.

Override to perform cleanup on startup/shutdown failure.

Parameters
ParameterTypeDescription
`error`ExceptionThe exception that was raised.
`phase`strEither 'boot' or 'shutdown'.

Protocol for query bus implementations.

Example

class QueryBusProtocol:
async def execute(self, query: Query) -> Any:
handler = self._handlers[type(query)]
return await handler.handle(query)
async def execute(query: Any) -> Any

Execute a query through its handler.

Parameters
ParameterTypeDescription
`query`AnyQuery to execute.
Returns
TypeDescription
AnyQuery result.

Protocol for logging database queries.

Implement this protocol to enable query logging in database providers. The concrete lexigram.sql.logging.BaseQueryLogger implements this interface and adds helper methods for querying the stored entries.

async def log_query(entry: QueryLogEntry) -> None

Log a query execution.

async def get_recent_queries(limit: int = 100) -> list[QueryLogEntry]

Return the most recent limit entries.

async def get_slow_queries(
    threshold_seconds: float,
    limit: int = 50
) -> list[QueryLogEntry]

Return entries whose execution time exceeds threshold_seconds.

async def get_query_stats(time_range_seconds: int = 3600) -> dict[str, Any]

Return aggregated statistics over a time window.


Protocol for read-only repository operations.

Use this for query-only access patterns (CQRS query side).

Example

class UserQueryRepository:
async def get(self, id: str) -> User | None:
return await self.db.query("users").where(id=id).first()
async def list(self, skip: int = 0, limit: int = 100) -> list[User]:
return await self.db.query("users").offset(skip).limit(limit).all()
async def get(item_id: str) -> T | None

Get entity by ID.

Parameters
ParameterTypeDescription
`item_id`strEntity identifier.
Returns
TypeDescription
T | NoneEntity if found, None otherwise.
async def list(
    skip: int = 0,
    limit: int = 100,
    **filters: Any
) -> list[T]

List entities with pagination.

Parameters
ParameterTypeDescription
`skip`intNumber of records to skip.
`limit`intMaximum records to return. **filters: Optional filter criteria.
Returns
TypeDescription
list[T]List of entities.
async def find_by_spec(spec: SpecificationProtocol[T]) -> list[T]

Find entities matching a complex specification.

Parameters
ParameterTypeDescription
`spec`SpecificationProtocol[T]The DDD specification to evaluate.
Returns
TypeDescription
list[T]List of matching entities.
async def count(**filters: Any) -> int

Count entities matching filters.

Parameters
ParameterTypeDescription
Returns
TypeDescription
intTotal count of matching entities.

Protocol for full repository operations.

Extends ReadOnlyRepositoryProtocol with write operations.

Example

class UserRepository:
async def save(self, entity: User) -> User:
if entity.id:
await self.db.update("users", entity.dict()).where(id=entity.id)
else:
entity.id = await self.db.insert("users", entity.dict())
return entity
async def delete(self, id: str) -> bool:
result = await self.db.delete("users").where(id=id)
return result.affected_rows > 0
async def save(entity: T) -> T

Save (create or update) an entity.

Parameters
ParameterTypeDescription
`entity`TEntity to save.
Returns
TypeDescription
TSaved entity with any generated fields populated.
async def delete(item_id: str) -> bool

Delete entity by ID.

Parameters
ParameterTypeDescription
`item_id`strEntity identifier.
Returns
TypeDescription
boolTrue if deleted, False if not found.
async def save_many(entities: list[T]) -> list[T]

Save (create or update) multiple entities in a single operation.

Implementations SHOULD execute this as a batch for efficiency.

Parameters
ParameterTypeDescription
`entities`list[T]Entities to save.
Returns
TypeDescription
list[T]Saved entities with any generated fields populated.
async def delete_many(item_ids: list[str]) -> int

Delete multiple entities by ID.

Implementations SHOULD execute this as a batch for efficiency.

Parameters
ParameterTypeDescription
`item_ids`list[str]Entity identifiers to delete.
Returns
TypeDescription
intNumber of entities actually deleted.

Protocol for saga lifecycle management.

The manager routes incoming events to all registered sagas and coordinates their execution.

async def process(event: Any) -> None

Process an event through all relevant sagas.

Parameters
ParameterTypeDescription
`event`AnyDomain event to route and process.

Protocol for saga / process-manager implementations.

Sagas coordinate long-running business processes that span multiple aggregates by reacting to domain events and dispatching commands.

async def handle(event: Any) -> list[Any]

Handle a domain event and produce commands.

Parameters
ParameterTypeDescription
`event`AnyDomain event to handle.
Returns
TypeDescription
list[Any]List of commands to dispatch to the command bus.

Protocol for search engine backends.
async def search(
    query: str,
    filters: dict[str, Any] | None = None,
    sort: list[dict[str, str]] | None = None,
    limit: int | None = None,
    offset: int | None = None
) -> QueryResult

Execute a search query.

Parameters
ParameterTypeDescription
`query`strSearch query string
`filters`dict[str, Any] | NoneSearch filters
`sort`list[dict[str, str]] | NoneSort specifications
`limit`int | NoneMaximum results to return
`offset`int | NoneResults offset for pagination
Returns
TypeDescription
QueryResultSearch results
async def index_document(
    document_id: str,
    document: dict[str, Any],
    index_name: str | None = None
) -> None

Index a document.

Parameters
ParameterTypeDescription
`document_id`strUnique document identifier
`document`dict[str, Any]Document data to index
`index_name`str | NoneIndex name (optional)
async def index_many(
    documents: list[tuple[str, dict[str, Any]]],
    index_name: str | None = None
) -> None

Index multiple documents in a single bulk operation.

Callers should prefer this over N repeated index_document calls when processing batches. Backends may use a native bulk API to avoid per-document round-trip overhead.

Parameters
ParameterTypeDescription
`documents`list[tuple[str, dict[str, Any]]]Sequence of ``(document_id, document)`` pairs.
`index_name`str | NoneIndex name (optional, backend implementation chooses a default when ``None``).
async def delete_document(
    document_id: str,
    index_name: str | None = None
) -> None

Delete a document from index.

Parameters
ParameterTypeDescription
`document_id`strDocument identifier to delete
`index_name`str | NoneIndex name (optional)
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Protocol for retrieving, writing, and deleting named secrets.

Secret names may use any naming convention; a hierarchical path (e.g. "database/password") is recommended for readability and to align with most provider APIs.

All mutating operations (set_secret, delete_secret) are synchronous at the protocol level — implementations may perform async I/O internally but the public contract accepts simple calls from both sync and async contexts.

Example

store = EnvSecretStore()
val = store.get_secret("MY_API_KEY")

Async-first usage via a wrapping coroutine

val = await asyncio.to_thread(store.get_secret, "MY_API_KEY")
def get_secret(name: str) -> str

Return the value of a secret by name.

Parameters
ParameterTypeDescription
`name`strUnique secret identifier (e.g. ``"stripe/api-key"``).
Returns
TypeDescription
strThe plaintext secret value.
Raises
ExceptionDescription
SecretNotFoundErrorIf no secret with that name exists.
SecretAccessErrorIf the caller lacks permission.
def set_secret(
    name: str,
    value: str
) -> None

Write or overwrite a secret.

Parameters
ParameterTypeDescription
`name`strUnique secret identifier.
`value`strPlaintext secret value to store.
Raises
ExceptionDescription
SecretAccessErrorIf the caller lacks permission to write.
def delete_secret(name: str) -> None

Delete a secret by name.

Non-existent secrets are silently ignored (idempotent delete).

Parameters
ParameterTypeDescription
`name`strUnique secret identifier.
Raises
ExceptionDescription
SecretAccessErrorIf the caller lacks permission to delete.
def has_secret(name: str) -> bool

Return True if a secret with name exists, False otherwise.

Parameters
ParameterTypeDescription
`name`strUnique secret identifier.

Embedding-based semantic similarity cache for LLM responses.

Provides three-tier lookup: exact hash match (Tier 1), vector similarity match (Tier 2), and cache miss (caller invokes LLM).

Placement note: Lives in ai/llm.py (not infra/cache/) because it carries AI-domain semantics (model tracking in store(), LLM response caching).

async def lookup(query: str) -> str | None

Look up a query in the cache.

Checks Tier 1 (exact hash) then Tier 2 (vector similarity).

Parameters
ParameterTypeDescription
`query`strThe user query string.
Returns
TypeDescription
str | NoneCached response string, or ``None`` on cache miss.
async def store(
    query: str,
    response: str,
    model: str
) -> None

Store a query-response pair in both tiers.

Parameters
ParameterTypeDescription
`query`strThe user query string.
`response`strThe LLM response to cache.
`model`strThe model that produced the response.
async def invalidate(query: str) -> bool

Invalidate a cached entry by query.

Parameters
ParameterTypeDescription
`query`strThe user query string to invalidate.
Returns
TypeDescription
bool``True`` if the entry was found and removed, ``False`` otherwise.

Protocol for semantic memory (extracted facts and entities).

Semantic memory stores generalized knowledge in the form of facts, entities, and their relationships for long-term retention.

async def store_fact(
    subject: str,
    predicate: str,
    object_: str,
    confidence: float
) -> None

Store a fact as a subject-predicate-object triple.

Parameters
ParameterTypeDescription
`subject`strThe subject entity
`predicate`strThe relationship type
`object_`strThe object entity
`confidence`floatConfidence score (0-1) for this fact
async def query_facts(subject: str) -> list[dict[str, Any]]

Query facts by subject.

Parameters
ParameterTypeDescription
`subject`strThe subject entity to query
Returns
TypeDescription
list[dict[str, Any]]List of facts with this subject
async def get_entity_facts(entity: str) -> list[dict[str, Any]]

Get all facts mentioning an entity (subject or object).

Parameters
ParameterTypeDescription
`entity`strThe entity name
Returns
TypeDescription
list[dict[str, Any]]List of facts involving this entity
async def update_fact(
    fact_id: str,
    confidence: float
) -> None

Update the confidence score of a fact.

Parameters
ParameterTypeDescription
`fact_id`strID of the fact to update
`confidence`floatNew confidence score (0-1)
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check if the semantic memory backend is reachable and healthy.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for a response.
Returns
TypeDescription
HealthCheckResultHealth check result with status and details.

General-purpose serializer/deserializer with typed round-trip support.
def serialize(obj: Any) -> bytes

Serialize obj to raw bytes.

def deserialize(
    data: bytes,
    type_: type[T]
) -> T

Deserialize data into an instance of type_.


Persistent key-value store for arbitrary application state.

Supports bulk read/write operations on top of the basic get/set/delete primitives. Implementations may use any persistent backend (SQL, Redis, blob storage) that satisfies the interface.

Typical usage

state = await container.resolve(StateStoreProtocol)
await state.set("session:abc123", {"user_id": "u-1"}, ttl=3600)
data = await state.get("session:abc123")
async def get(key: str) -> Any | None

Get a value by key.

Parameters
ParameterTypeDescription
`key`strThe key to retrieve.
Returns
TypeDescription
Any | NoneThe value if found, None otherwise.
async def set(
    key: str,
    value: Any,
    ttl: int | None = None
) -> None

Set a value with optional TTL.

Parameters
ParameterTypeDescription
`key`strThe key to set.
`value`AnyThe value to store.
`ttl`int | NoneOptional time-to-live in seconds.
async def delete(key: str) -> bool

Delete a key.

Parameters
ParameterTypeDescription
`key`strThe key to delete.
Returns
TypeDescription
boolTrue if deleted, False if not found.
async def exists(key: str) -> bool

Check if a key exists.

Parameters
ParameterTypeDescription
`key`strThe key to check.
Returns
TypeDescription
boolTrue if exists, False otherwise.
async def expire(
    key: str,
    ttl: int
) -> bool

Set expiration on a key.

Parameters
ParameterTypeDescription
`key`strThe key to expire.
`ttl`intTime-to-live in seconds.
Returns
TypeDescription
boolTrue if timeout was set, False if key doesn't exist.
async def ttl(key: str) -> int

Get remaining TTL for a key.

Parameters
ParameterTypeDescription
`key`strThe key to check.
Returns
TypeDescription
intTTL in seconds, -1 if no expiry, -2 if key doesn't exist.
async def get_many(keys: list[str]) -> dict[str, Any]

Return a mapping of keys → values for all keys that exist.

Parameters
ParameterTypeDescription
`keys`list[str]List of storage keys to fetch.
Returns
TypeDescription
dict[str, Any]Dict containing only keys that were found; absent keys are omitted.
async def set_many(
    items: dict[str, Any],
    ttl: int | None = None
) -> None

Persist multiple key-value pairs in a single operation.

Parameters
ParameterTypeDescription
`items`dict[str, Any]Mapping of storage keys to JSON-serializable values.
`ttl`int | NoneOptional time-to-live in seconds applied to all entries.

Protocol for agent reasoning strategies.

A strategy implements the reasoning loop that drives an agent’s behavior. Built-in strategies: ReActStrategy (reason → act → observe) and PlanAndExecuteStrategy (plan → execute steps).

async def execute(
    message: str,
    tools: list[ToolProtocol],
    history: list[dict[str, Any]],
    llm: Any,
    **kwargs: Any
) -> Any

Execute the reasoning strategy.

Parameters
ParameterTypeDescription
`message`strThe user's input message.
`tools`list[ToolProtocol]Tools available to the agent.
`history`list[dict[str, Any]]Conversation history as list of message dicts.
`llm`AnyLLM client for reasoning. **kwargs: Additional strategy-specific parameters (system_prompt, temperature, tool_registry, etc.)
Returns
TypeDescription
AnyResult[AgentResponse, Exception]

Protocol for task executor implementations.

Executors process tasks from a queue by dispatching them to registered handlers.

Example

class TaskExecutorProtocol:
async def execute(self, task: Task) -> Any:
handler = self._handlers.get(task.name)
if handler:
return await handler(task.payload)
raise UnknownTaskError(task.name)
async def execute(task: Any) -> Any

Execute a task using the registered handler.

Parameters
ParameterTypeDescription
`task`AnyTask to execute.
Returns
TypeDescription
AnyTask execution result.
Raises
ExceptionDescription
UnknownTaskErrorIf no handler registered for task.
def register_handler(
    task_name: str,
    handler: Any
) -> None

Register a handler for a task type.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
`handler`AnyAsync callable to handle the task.
def get_handler(task_name: str) -> Any | None

Get the handler for a task type.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
Returns
TypeDescription
Any | NoneHandler if registered, None otherwise.

Protocol for task provider implementations.

Task providers integrate task processing with the framework, providing dependency injection, lifecycle management, and health monitoring.

async def register(container: Any) -> None

Register task services with the DI container.

Parameters
ParameterTypeDescription
`container`AnyDI container to register services in.
async def boot(container: Any | None = None) -> None

Start the task provider.

Called by the framework on application startup.

Parameters
ParameterTypeDescription
`container`Any | NoneOptional DI container.
async def shutdown(app: Any) -> None

Shutdown the task provider gracefully.

Called by the framework on application shutdown.

Parameters
ParameterTypeDescription
`app`AnyApplication instance.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check task provider health.

Returns
TypeDescription
HealthCheckResultHealthCheckResult with current status and metrics.
def register_handler(
    task_name: str,
    handler: Any
) -> None

Register a task handler.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
`handler`AnyAsync handler function.
def register_scheduled_task(task_func: Any) -> None

Register a decorated task function for scheduling.

Parameters
ParameterTypeDescription
`task_func`AnyTask function decorated with @scheduled.
async def enqueue_job(job: Any) -> str

Enqueue a job for processing.

Parameters
ParameterTypeDescription
`job`AnyJobProtocol instance to enqueue.
Returns
TypeDescription
strJobProtocol ID.
def schedule_job(
    job_template: Any,
    cron_expression: str,
    job_id: str | None = None
) -> str | None

Schedule a job with cron expression.

Parameters
ParameterTypeDescription
`job_template`AnyJobProtocol or JobTemplateProtocol instance.
`cron_expression`strCron expression for scheduling.
`job_id`str | NoneOptional job ID.
Returns
TypeDescription
str | NoneScheduled job ID if successful, None otherwise.
def unschedule_job(job_id: str) -> bool

Remove a scheduled job.

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to unschedule.
Returns
TypeDescription
boolTrue if job was unscheduled, False otherwise.
def get_worker_stats() -> dict[str, Any] | None

Get worker pool statistics.

Returns
TypeDescription
dict[str, Any] | NoneDictionary with worker statistics or None.
def get_scheduled_jobs() -> dict[str, Any] | None

Get scheduled jobs information.

Returns
TypeDescription
dict[str, Any] | NoneDictionary with scheduled jobs info or None.

Protocol for task queue implementations.

This defines the contract for background task queues (Redis-based, PostgreSQL-based, in-memory, etc.).

Example

class RedisTaskQueue:
async def enqueue(self, task: Task) -> str:
task_id = str(uuid4())
await self._redis.rpush("tasks", task.to_json())
return task_id
async def dequeue(self) -> Task | None:
data = await self._redis.lpop("tasks")
return Task.from_json(data) if data else None
async def enqueue(task: Any) -> Result[str, TaskQueueError]

Add a task to the queue.

Parameters
ParameterTypeDescription
`task`AnyTask instance to enqueue.
Returns
TypeDescription
Result[str, TaskQueueError]Ok(task_id) on success; Err(TaskQueueError) if the queue reports a recoverable failure (e.g. full queue, invalid payload). Only infrastructure failures (lost connection) are raised as exceptions.
async def dequeue() -> Any | None

Remove and return the next task.

Returns
TypeDescription
Any | NoneNext Task or None if queue is empty.
async def get_task_count() -> int

Get the number of tasks in the queue.

Returns
TypeDescription
intNumber of pending tasks.
async def clear() -> None

Clear all tasks from the queue.

async def ack(task_id: str) -> None

Acknowledge successful processing of a dequeued task.

Signals that the task has been processed successfully and can be permanently discarded from any in-flight tracking.

Parameters
ParameterTypeDescription
`task_id`strID of the task to acknowledge.
async def nack(
    task_id: str,
    requeue: bool = True
) -> None

Negative-acknowledge a dequeued task.

Signals that the task could not be processed. The task is optionally requeued for another processing attempt.

Parameters
ParameterTypeDescription
`task_id`strID of the task to negative-acknowledge.
`requeue`boolIf True, return the task to the queue for retry. If False, discard the task permanently.
async def close() -> None

Close the queue connection and cleanup resources.


Per-tenant configuration key-value store.

Provides a low-level get/set interface. The higher-level TenantConfigService adds default fallback and event emission on top of this protocol.

async def get_config(
    tenant_id: str,
    key: str
) -> Any | None

Retrieve a single configuration value for a tenant.

Parameters
ParameterTypeDescription
`tenant_id`strThe tenant whose configuration is queried.
`key`strThe configuration key.
Returns
TypeDescription
Any | NoneThe stored value, or ``None`` if the key is not set for this tenant.
async def get_all_config(tenant_id: str) -> dict[str, Any]

Retrieve all configuration entries for a tenant.

Parameters
ParameterTypeDescription
`tenant_id`strThe tenant whose configuration is retrieved.
Returns
TypeDescription
dict[str, Any]A dictionary of all key-value pairs for the tenant. Returns an empty dict if no overrides are set.
async def set_config(
    tenant_id: str,
    key: str,
    value: Any
) -> None

Set a configuration value for a tenant.

Parameters
ParameterTypeDescription
`tenant_id`strThe tenant whose configuration is updated.
`key`strThe configuration key.
`value`AnyThe new value (must be JSON-serialisable).

Pluggable data isolation strategy.

Implementations provide the mechanics of isolating tenant data at the database layer (row-level, schema-per-tenant, or database-per-tenant).

Attributes: name: Strategy identifier used by the registry ("row_level", "schema", "database").

async def apply_isolation(
    tenant_id: str,
    context: dict[str, Any]
) -> None

Apply tenant isolation to the given execution context.

For row-level isolation this is a no-op; for schema isolation this sets the search_path in context.

Parameters
ParameterTypeDescription
`tenant_id`strThe active tenant.
`context`dict[str, Any]Mutable execution context dict to annotate.
async def remove_isolation(tenant_id: str) -> None

Remove any active isolation for the tenant.

Parameters
ParameterTypeDescription
`tenant_id`strThe tenant whose isolation context to tear down.
async def provision_isolation(tenant_id: str) -> Result[None, TenantError]

Provision isolation resources for a newly created tenant.

For row-level isolation this is a no-op. For schema isolation this creates the schema.

Parameters
ParameterTypeDescription
`tenant_id`strThe newly created tenant.
Returns
TypeDescription
Result[None, TenantError]``Ok(None)`` on success, ``Err(TenantError)`` on failure.
async def deprovision_isolation(tenant_id: str) -> Result[None, TenantError]

Tear down isolation resources for a deactivated tenant.

Parameters
ParameterTypeDescription
`tenant_id`strThe tenant being deactivated.
Returns
TypeDescription
Result[None, TenantError]``Ok(None)`` on success, ``Err(TenantError)`` on failure.

Storage-agnostic tenant CRUD operations.

Applications may supply their own implementation by binding a class to this protocol in the DI container. The default implementations are InMemoryTenantProvider (for testing/dev) and SQLTenantProvider (when lexigram-tenancy[sql] is installed).

async def get_tenant(tenant_id: str) -> TenantInfo | None

Retrieve a tenant by its unique identifier.

Parameters
ParameterTypeDescription
`tenant_id`strThe unique tenant identifier.
Returns
TypeDescription
TenantInfo | NoneThe TenantInfo record, or ``None`` if no tenant with that ID exists.
async def get_tenant_by_slug(slug: str) -> TenantInfo | None

Retrieve a tenant by its URL-safe slug.

Parameters
ParameterTypeDescription
`slug`strThe tenant slug (e.g. ``acme-corp``).
Returns
TypeDescription
TenantInfo | NoneThe matching TenantInfo, or ``None`` if not found.
async def list_tenants(
    *,
    active_only: bool = True
) -> list[TenantInfo]

List tenants, optionally filtering to active ones only.

Parameters
ParameterTypeDescription
`active_only`boolWhen ``True`` (default), return only tenants with ``status == ACTIVE``.
Returns
TypeDescription
list[TenantInfo]List of matching TenantInfo records.
async def create_tenant(command: CreateTenantCommand) -> Result[TenantInfo, TenantError]

Persist a new tenant record.

Parameters
ParameterTypeDescription
`command`CreateTenantCommandCreateTenantCommand with the new tenant's attributes.
Returns
TypeDescription
Result[TenantInfo, TenantError]``Ok(TenantInfo)`` on success, ``Err(TenantError)`` on failure.
async def update_tenant(
    tenant_id: str,
    command: UpdateTenantCommand
) -> Result[TenantInfo, TenantError]

Update mutable fields on an existing tenant record.

Parameters
ParameterTypeDescription
`tenant_id`strIdentifier of the tenant to update.
`command`UpdateTenantCommandUpdateTenantCommand with the fields to apply.
Returns
TypeDescription
Result[TenantInfo, TenantError]``Ok(TenantInfo)`` with the updated record, or ``Err(TenantError)``.
async def deactivate_tenant(tenant_id: str) -> Result[None, TenantError]

Mark a tenant as inactive.

Parameters
ParameterTypeDescription
`tenant_id`strIdentifier of the tenant to deactivate.
Returns
TypeDescription
Result[None, TenantError]``Ok(None)`` on success, ``Err(TenantError)`` on failure.
async def activate_tenant(tenant_id: str) -> Result[None, TenantError]

Mark a tenant as active.

Parameters
ParameterTypeDescription
`tenant_id`strIdentifier of the tenant to activate.
Returns
TypeDescription
Result[None, TenantError]``Ok(None)`` on success, ``Err(TenantError)`` on failure.
async def suspend_tenant(
    tenant_id: str,
    reason: str | None = None
) -> Result[None, TenantError]

Mark a tenant as suspended.

Parameters
ParameterTypeDescription
`tenant_id`strIdentifier of the tenant to suspend.
`reason`str | NoneOptional human-readable reason for the suspension.
Returns
TypeDescription
Result[None, TenantError]``Ok(None)`` on success, ``Err(TenantError)`` on failure.

Resolves tenant identity from request context.

Implementations are tried in priority order by CompositeResolver. Lower priority value = tried first = higher trust level.

Attributes: name: Unique resolver name (e.g. "header", "jwt_claim"). priority: Ordering weight; lower = tried first.

async def resolve(context: TenantResolutionContext) -> str | None

Attempt to resolve the tenant identifier from the given context.

Parameters
ParameterTypeDescription
`context`TenantResolutionContextImmutable snapshot of request data.
Returns
TypeDescription
str | NoneThe resolved ``tenant_id`` string, or ``None`` if this resolver cannot determine the tenant from the provided context.

Model-aware token counter.

Implementations use the exact tokenizer for the target model (tiktoken for OpenAI, AutoTokenizer for HuggingFace, mistral-common for Mistral). A character-estimate fallback is always available.

Note: Methods are synchronous because tokenization is CPU-bound with no I/O.

def count(text: str) -> int

Count tokens in a text string.

def count_messages(messages: list[ChatMessageProtocol]) -> int

Count tokens in a list of chat messages, including message overhead.

property model() -> str

The model this counter is calibrated for.


Protocol for token management implementations.

Defines the contract for JWT token creation, verification, and refresh operations.

Example

class JWTTokenManager:
def create_token(self, user: AuthenticatedUserProtocol) -> AuthToken:
payload = {"sub": user.user_id, "roles": user.roles}
access = jwt.encode(payload, self._secret, algorithm="HS256")
return AuthToken(access_token=access, ...)
def create_token(user: AuthenticatedUserProtocol) -> Any

Create an authentication token for a user.

Parameters
ParameterTypeDescription
`user`AuthenticatedUserProtocolThe authenticated user.
Returns
TypeDescription
AnyAuthToken containing access and refresh tokens.
def verify_token(token: str) -> Result[VerifiedToken, TokenError]

Verify and decode a token.

Returns a Result rather than raising domain exceptions so that the caller can pattern-match on success vs. failure explicitly. Infrastructure errors (cache unavailable, network failure) are still raised as exceptions and must not be wrapped in Result.

Parameters
ParameterTypeDescription
`token`strThe JWT token string.
Returns
TypeDescription
Result[VerifiedToken, TokenError]``Ok(VerifiedToken)`` if the token is valid and not revoked, or ``Err(TokenError)`` for any expected domain failure.
def refresh_token(refresh_token: str) -> Result[Any, TokenError]

Refresh an access token using a refresh token.

Parameters
ParameterTypeDescription
`refresh_token`strThe refresh token string.
Returns
TypeDescription
Result[Any, TokenError]``Ok(AuthToken)`` if the refresh token is valid, or ``Err(TokenError)`` for expected domain failures.

Protocol for agent tools.

Tools are the atomic capabilities an agent can invoke during reasoning. Each tool has a name, description, JSON parameter schema (for LLM function calling), and an async execute method.

Satisfied by:

  • @tool decorated functions (FunctionTool)
  • Classes extending Tool base class
property name() -> str

Unique tool identifier.

property description() -> str

Human-readable description for the LLM.

property parameters_schema() -> dict[str, Any]

JSON Schema describing the tool’s parameters.

Auto-generated from type hints by the @tool decorator, or manually defined for class-based tools.

Format follows OpenAI function calling schema

{
"type": "object",
"properties": {
"order_id": {"type": "string"},
"reason": {"type": "string"}
},
"required": ["order_id"]
}
async def execute(**kwargs: Any) -> Any

Execute the tool with the given arguments.

Returns the tool’s result. Errors should be raised as exceptions — the executor wraps them in Result.


Protocol for tool registries.

A registry stores tools by name and provides execution with error handling. When module visibility is enabled, tool access is checked against the compiled module graph.

def register(
    tool: ToolProtocol,
    module_class: type | None = None
) -> None

Register a tool.

def get(name: str) -> ToolProtocol | None

Get a tool by name.

def list_tools() -> list[ToolProtocol]

List all registered tools.

def list_visible_tools() -> list[ToolProtocol]

List tools visible to the current caller module.

def list_visible_tool_names() -> list[str]

List names of tools visible to the current caller module.

def set_module_graph(graph: CompiledModuleGraphProtocol | None) -> None

Set the compiled module graph for visibility enforcement.

def set_caller_module(module_class: type | None) -> None

Set the calling module for visibility checks.

async def execute(
    name: str,
    **kwargs: Any
) -> Result[Any, ToolError]

Execute a tool by name.

Returns Result[Any, ToolError].


Protocol for Unit of Work pattern.

Coordinates multiple repository operations within a single transaction boundary.

Example

async with uow:
user = await uow.users.get(user_id)
user.email = new_email
await uow.users.save(user)
await uow.audit_log.save(AuditEntry(...))
await uow.commit()
async def commit() -> None

Commit all changes in this unit of work.

async def rollback() -> None

Rollback all changes in this unit of work.

def register_new(entity: Any) -> None

Register a new entity for insertion.

def register_event(event: DomainEvent) -> None

Register a domain event with this unit of work.

Handlers or repositories may call this method when they detect that a domain event needs to be published once the transaction successfully commits. The unit of work is responsible for retaining the events until they are gathered by the application and dispatched via an event bus.

def collect_events() -> list[DomainEvent]

Return and clear all domain events registered with this unit of work.

Calling this method yields the list of events that have been registered either explicitly via register_event or implicitly by the UoW when entities with collect_events semantics are registered. The returned list is cleared from the unit of work; subsequent calls will return an empty list until more events are registered.

def register_dirty(entity: Any) -> None

Register an entity for update.

def register_deleted(entity: Any) -> None

Register an entity for deletion.


Protocol for user entity.

Defines the minimal contract for a user in the system.

property user_id() -> str

Unique user identifier.

property email() -> str

User email address.

property is_active() -> bool

Whether user account is active.


Top-level vector store lifecycle and collection management.

Implementations manage connections, collection CRUD, and delegate vector operations to VectorCollectionProtocol instances.

async def connect() -> None

Establish connection to the vector store.

async def disconnect() -> None

Close all connections and release resources.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check store connectivity and readiness.

async def list_collections() -> list[CollectionInfo]

List all collections with metadata.

async def create_collection(config: CollectionConfig) -> None

Create a new vector collection.

Raises
ExceptionDescription
CollectionAlreadyExistsErrorIf collection name is taken.
VectorConfigErrorIf config is invalid for this backend.
async def delete_collection(name: str) -> None

Delete a collection and all its vectors.

Raises
ExceptionDescription
CollectionNotFoundErrorIf collection does not exist.
async def collection_exists(name: str) -> bool

Check whether a collection exists.

async def get_collection(name: str) -> VectorCollectionProtocol

Get a handle to an existing collection.

Raises
ExceptionDescription
CollectionNotFoundErrorIf collection does not exist.

Verifies the authenticity of inbound webhook payloads.

Implementations must provide both a verification method and a signature computation method so callers can independently validate or generate signatures without exposing the underlying algorithm.

The canonical implementation is HMAC-SHA256, but any MAC or asymmetric scheme that fulfils this protocol is acceptable.

Typical usage

verifier = HMACWebhookVerifier()
if not verifier.verify(payload=body, signature=sig_header, secret=secret):
raise PermissionError("Invalid webhook signature")
def verify(
    payload: bytes,
    signature: str,
    secret: str
) -> bool

Return True when the signature matches the payload.

Implementations must use a constant-time comparison to prevent timing side-channel attacks.

Parameters
ParameterTypeDescription
`payload`bytesRaw request body bytes.
`signature`strSignature string as received in the request header (may include algorithm prefix such as ``"sha256=..."``).
`secret`strShared secret used to compute the expected signature.
Returns
TypeDescription
bool``True`` if the signature is valid, ``False`` otherwise.
def compute_signature(
    payload: bytes,
    secret: str
) -> str

Compute the expected signature for payload using secret.

Parameters
ParameterTypeDescription
`payload`bytesRaw request body bytes.
`secret`strShared secret.
Returns
TypeDescription
strHex-encoded signature string in the same format that verify expects as input.

Protocol for individual workflow step nodes.

A workflow node represents a single unit of work in a directed graph. Nodes receive the current shared state, perform their work, and return a state update dict that is merged into the global state.

property name() -> str

Unique node identifier within the workflow.

async def execute(state: dict[str, Any]) -> dict[str, Any]

Execute this node with the current workflow state.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current shared workflow state.
Returns
TypeDescription
dict[str, Any]Dict of state updates to merge into the shared state.

Protocol for working memory that assembles context for the current request.

Working memory is the in-context window that gets passed to the LLM, assembled from episodic and semantic memory based on token budgets.

async def assemble(
    query: str,
    token_budget: int
) -> list[MemoryEntry]

Assemble context window from available memory tiers.

Retrieves relevant entries from episodic and semantic memory and fits them within the token budget.

Parameters
ParameterTypeDescription
`query`strThe current query/prompt
`token_budget`intMaximum tokens available for context
Returns
TypeDescription
list[MemoryEntry]Ordered list of memory entries for context
async def add(entry: MemoryEntry) -> None

Add a new entry to the working memory stream.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe entry to add
async def get_context_entries() -> list[MemoryEntry]

Get current assembled context entries.

Returns
TypeDescription
list[MemoryEntry]List of entries currently in context window
async def flush() -> None

Clear the current context assembly.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check if the working memory backend is reachable and healthy.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for a response.
Returns
TypeDescription
HealthCheckResultHealth check result with status and details.

Complete response from an agent execution.

Contains the final message, the full reasoning trace (steps), all tool calls made, token usage, cost, and timing metadata.

Note: ToolCall and ReasoningStep are defined in lexigram-ai-agents and imported here for use in this type’s field annotations.

property tool_call_count() -> int

Number of tool calls made.

property step_count() -> int

Number of reasoning steps taken.

property successful_tool_calls() -> list[Any]

Tool calls that completed without error.

property failed_tool_calls() -> list[Any]

Tool calls that failed.

def to_dict() -> dict[str, Any]

Serialize to a JSON-compatible dict.


Composite health result that collects checks from multiple components.

The aggregate status follows a worst-case rule:

  • Any UNHEALTHY check → overall UNHEALTHY
  • Any DEGRADED check (no UNHEALTHY) → overall DEGRADED
  • All HEALTHY → overall HEALTHY
  • No checks registered → UNKNOWN
property status() -> HealthStatus

Overall status, computed from component checks.

def is_healthy() -> bool

Return True only if all components are healthy.

def to_dict() -> dict[str, Any]

Convert to a dictionary suitable for a /health HTTP response.


Immutable record of a security-relevant or compliance-relevant operation.

Canonical audit type used by all packages. Supersedes the former AuditEvent (simpler, no old/new values) and the AuditEntry from contracts/observability/audit.

Attributes: action: Dot-notation action identifier (e.g. "user.update"). actor_id: ID of the user or service that performed the action. resource_type: Kind of affected resource (e.g. "User"). resource_id: ID of the affected resource. outcome: "success" or "failure". severity: Severity level (defaults to MEDIUM). occurred_at: UTC datetime of the event (defaults to now). metadata: Arbitrary additional context. old_values: Field values before the action (optional). new_values: Field values after the action (optional). source: Originating subsystem (e.g. "sql", "admin", "ai"). tenant_id: Multi-tenant scoping (optional).


Configuration for circuit breaker.

Attributes: failure_threshold: Number of failures before opening the circuit. recovery_timeout: Seconds in open state before attempting half-open. expected_exception: Exception types that count as failures. success_threshold: Consecutive successes required to close from half-open. timeout: Per-call timeout in seconds. name: Human-readable identifier for this breaker instance. sliding_window_seconds: Window for computing the failure rate. failure_rate_threshold: Fraction of calls that must fail to trip. backend: State store backend for distributed circuit breaker coordination. "memory" (default) — in-process state, no coordination. "redis" — uses a CacheBackendProtocol resolved from the DI container. "consul" — uses Consul KV for cross-datacenter coordination.


Cursor-based pagination for large datasets.

Attributes: items: The entities in this page. next_cursor: Opaque cursor for the next page, or None if no more. prev_cursor: Opaque cursor for the previous page, or None. has_more: Whether there are more results after this page. has_previous: Whether there are results before this page. total_count: Optional total count (may be None for performance).

def to_dict() -> dict[str, Any]

Serialize to API-friendly dictionary.


Result of a delete operation.

Base class for domain events.
def __init__(**kwargs: Any) -> None

Initialize the event setting base fields and any extra subclass fields.

Declared dataclass fields receive their default values when not supplied. Extra keyword arguments (e.g. fields declared on plain subclasses without @dataclass) are set directly on the instance via object.__setattr__, which bypasses the frozen=True guard safely during construction.

def to_dict() -> dict[str, Any]

Return a JSON-serializable dictionary representation.

def from_dict(
    cls,
    data: dict[str, Any]
) -> DomainEvent
def for_aggregate(
    aggregate_id: UUID,
    aggregate_type: str
) -> DomainEvent

Immutable duration value object.

The class stores total seconds internally and provides lightweight parsing, formatting, and arithmetic helpers for configuration and API boundaries.

def seconds(
    cls,
    value: float
) -> Duration

Create a duration from seconds.

def minutes(
    cls,
    value: float
) -> Duration

Create a duration from minutes.

def hours(
    cls,
    value: float
) -> Duration

Create a duration from hours.

def days(
    cls,
    value: float
) -> Duration

Create a duration from days.

def zero(cls) -> Duration

Create a zero duration.

def parse(
    cls,
    value: str
) -> Duration

Parse a human-readable duration string.

Supported units are seconds (s), minutes (m), hours (h), and days (d). Chained forms like 1h30m are supported.

property seconds_value() -> float

Backward-compatible alias for the total seconds.

def to_timedelta() -> timedelta

Convert the duration to datetime.timedelta.


Single error detail in an HTTP error response.

Attributes: code: Machine-readable error code (e.g., “INVALID_INPUT”). message: Human-readable error message. field: Optional field name that caused the error (for form validation).


Standard HTTP error response envelope.

Provides a consistent structure for error responses across all endpoints. Should be used with appropriate HTTP status codes (4xx, 5xx).

Attributes: error: High-level error category. message: Human-readable summary of what went wrong. details: List of specific error details (empty for simple errors). request_id: Optional request identifier for tracing and support.


The result of evaluating a single feature flag.

Attributes: key: The flag identifier. value: The evaluated value (type depends on FlagType). flag_type: The type of flag evaluated. reason: Machine-readable reason code (e.g. “DEFAULT”, “TARGETING”). variant: Variant key for VARIANT-type flags, None otherwise. metadata: Arbitrary evaluation metadata from the provider.


The evaluation strategy of a feature flag.

Describes a single executable scaffolding generator contributed to the CLI.

Generators are discovered via CliContributorProtocol.get_generators() and assembled into the unified gen command by CommandAssembler. generator_path is required so every definition can be executed once it has been discovered and registered.


A single CLI option/flag accepted by a generator command.

Used for auto-generated help text and input validation.


Result of a health check.
def to_dict() -> dict[str, Any]

Convert to dictionary format.

def is_healthy() -> bool

Check if status is healthy.

def is_degraded() -> bool

Check if status is degraded.


Unified health status.

Available ID generation strategies.

Result of an insert operation.

JobProtocol execution status.

Provider lifecycle stages.

Immutable snapshot of a distributed lock's state.

Attributes: resource: The resource being locked. holder: Identifier of the current holder. acquired_at: Unix timestamp when the lock was acquired. expires_at: Unix timestamp when the lock expires (TTL). metadata: Additional implementation-specific data.

property is_expired() -> bool

Check if the lock has expired.

property ttl_remaining() -> float | None

Get remaining TTL in seconds.


Record of a database migration.

Standard paginated HTTP response envelope.

Encapsulates a page of results with metadata for navigation. Supports both offset-based and cursor-based pagination.

Attributes: items: The entities in this page. total: Total count of items across all pages. page: Current page number (1-indexed, 0-indexed for cursor). page_size: Maximum items returned per page. has_next: Whether there are more pages after this one. has_prev: Whether there are pages before this one. next_cursor: Opaque cursor for the next page (for cursor-based pagination).


Provider initialization priority.

Providers are booted in ascending order of priority. Lower values run first; the typical ordering is:

  • CRITICAL (0) – absolutely foundational services that others depend on (e.g. configuration, diagnostics).
  • INFRASTRUCTURE (10) – low-level plumbing such as database connections, messaging clients, and cache providers.
  • SECURITY (20) – authentication/authorization infrastructure.
  • NORMAL (30) – everyday domain services with no special ordering.
  • APPLICATION (40) – application-level tools (CLI, admin utilities) that depend on domain services but are not themselves domain logic.
  • DOMAIN (50) – business-logic providers that may depend on earlier layers.
  • PRESENTATION (80) – web/API layers and other entry points.
  • COMMS (90) – outbound communication providers (email, SMS, webhooks) often run late to avoid interfering with core initialization.
  • LOW (100) – lowest-priority, optional providers that can boot last.

Represents a database query log entry.

This is a concrete dataclass rather than a Protocol so that callers can instantiate it directly.


Result of a database query.

Implements the iterator and sequence protocols so that code expecting a plain list[dict] from a query result continues to work after the return type is normalised to QueryResult.

Example

result = await provider.execute_query("SELECT * FROM users")
for row in result: # iterate directly
print(row["email"])
if not result: # bool coercion
raise LookupError("no rows")
first = result[0] # index access
rows = list(result) # convert to plain list

Base Result type. Not abstract — Ok and Err are the only variants.
def is_ok() -> bool
def is_err() -> bool
def unwrap() -> T
def unwrap_err() -> E
def unwrap_or(default: T) -> T
def unwrap_or_else(op: Callable[[E], T]) -> T
def map_sync(op: Callable[[T], U]) -> Result[U, E]
def map_err(op: Callable[[E], F]) -> Result[T, F]
def and_then_sync(op: Callable[[T], Result[U, E]]) -> Result[U, E]
def or_else_sync(op: Callable[[E], Result[T, F]]) -> Result[T, F]
def expect(message: str) -> T
def match(
    ok: Callable[[T], U],
    err: Callable[[E], U]
) -> U
async def map(op: Callable[[T], Awaitable[U]]) -> Result[U, E]
async def and_then(op: Callable[[T], Awaitable[Result[U, E]]]) -> Result[U, E]
async def or_else(op: Callable[[E], Awaitable[Result[T, F]]]) -> Result[T, F]
def flatten() -> Result[Any, E]
def filter(
    predicate: Callable[[T], bool],
    error: E
) -> Result[T, E]
def ok_or(default: U) -> T | U
def from_exception(
    cls,
    exc: Exception,
    ok_type: type[T] = type(None)
) -> Result[T, Exception]

Wrap a caught exception into an Err result.

def to_optional() -> T | None
def inspect(op: Callable[[T], None]) -> Result[T, E]
def inspect_err(op: Callable[[E], None]) -> Result[T, E]

Configuration for retry policy.

Dependency injection scopes.

Protocol for the SpecificationProtocol pattern.

This class was previously defined in lexigram.contracts.specification before the 2026 reorg. It has been moved here alongside the other DDD primitives. A migration mapping ensures imports are rewritten.

def is_satisfied_by(candidate: T) -> bool

Core tenant identity record.

The config field here is the tenant’s static provisioning config (plan limits, feature flags set at creation time). Runtime per-tenant overrides are managed separately by TenantConfigProviderProtocol / TenantConfigService. The two are independent — TenantInfo.config is a snapshot stored alongside the tenant record; TenantConfigService is a live, cached, key-value overlay.

Attributes: tenant_id: Unique identifier for the tenant. slug: URL-safe, human-readable identifier (e.g. acme-corp). name: Display name of the tenant. status: Current lifecycle status. plan: Subscription plan name (optional). config: Static provisioning configuration snapshot. metadata: Arbitrary application-defined metadata. created_at: Timestamp when the tenant was created.


Immutable snapshot of request data available for tenant resolution.

Passed to every TenantResolverProtocol during the resolution chain so resolvers can inspect request metadata without depending on any web framework type.

Attributes: headers: HTTP request headers (lowercased keys). host: The Host header value (e.g. acme.app.com). path: The request path (e.g. /api/tenants/acme/users). claims: Decoded JWT claims from the current authentication context.


Lifecycle status of a tenant.

Attributes: ACTIVE: Tenant is fully operational and accepts requests. INACTIVE: Tenant has been deactivated and no longer accepts requests. SUSPENDED: Tenant has been suspended (e.g. for non-payment) and is temporarily blocked from accessing the system. PROVISIONING: Tenant is being created; isolation resources are being set up.


Immutable token budget that pipeline stages consult.

Every field is calculated once at the start of a request. Stages read remaining capacity via properties — they never mutate the budget. Builder methods return new instances (immutable value semantics).

property total_used() -> int

Total tokens consumed by all components.

property remaining() -> int

Tokens available for additional content.

property remaining_for_history() -> int

Tokens available specifically for chat history.

property over_budget() -> bool

True if total consumption exceeds the model context limit.

def with_rag_tokens(rag_tokens: int) -> TokenBudget

Return a new budget with updated RAG token count.

def with_history_tokens(history_tokens: int) -> TokenBudget

Return a new budget with updated history token count.


Result of an update operation.

Base exception for all agent errors.
def __init__(
    message: str = 'Agent error',
    **kwargs: Any
) -> None

Duplicate handler registration.

Raised when attempting to register a handler for a type that already has one.

def __init__(
    message: str = 'Duplicate handler',
    message_type: str | None = None,
    **kwargs: Any
) -> None

Event/command handler not found.

Raised when no handler is registered for a specific event or command type.

def __init__(
    message: str = 'Handler not found',
    handler_type: str | None = None,
    message_type: str | None = None,
    **kwargs: Any
) -> None

Base exception for all MCP errors.
def __init__(
    message: str = 'MCP error',
    **kwargs: Any
) -> None

MCP server initialization failed.

Raised when the MCP server cannot be initialized, usually due to missing required handlers or configuration errors.

def __init__(
    message: str = 'MCP initialization failed',
    *,
    reason: str | None = None,
    **kwargs: Any
) -> None

Unknown MCP method requested by client.

Raised when the client requests an MCP method that the server does not support.

def __init__(
    message: str = 'MCP method not found',
    *,
    method: str | None = None,
    **kwargs: Any
) -> None

Prompt retrieval or list failed.

Raised when a prompt operation fails, such as when a prompt is not found or arguments are invalid.

def __init__(
    message: str = 'MCP prompt error',
    *,
    prompt_name: str | None = None,
    **kwargs: Any
) -> None

Protocol violation (malformed message, invalid state).

Raised when an MCP message violates the protocol specification, such as missing required fields or invalid JSON-RPC structure.

def __init__(
    message: str = 'MCP protocol error',
    *,
    details: dict[str, Any] | None = None,
    **kwargs: Any
) -> None

Resource read or list failed.

Raised when a resource operation fails, such as when a resource is not found or cannot be read.

def __init__(
    message: str = 'MCP resource error',
    *,
    uri: str | None = None,
    **kwargs: Any
) -> None

Tool call failed during MCP execution.

Raised when a tool invocation fails, either due to the tool not existing or raising an exception.

def __init__(
    message: str = 'MCP tool call failed',
    *,
    tool_name: str | None = None,
    **kwargs: Any
) -> None

Raised when the caller lacks permission to access or modify a secret.

Attributes: secret_name: Name of the secret that was denied access. operation: The operation that was denied (read, write, delete, etc.).

def __init__(
    secret_name: str,
    operation: str = 'read',
    **kwargs: Any
) -> None

Raised when a requested secret does not exist.
def __init__(
    secret_name: str,
    **kwargs: Any
) -> None

Reasoning strategy failed.

Raised when the agent’s strategy encounters an error during reasoning (LLM failure, invalid response, etc.).

def __init__(
    message: str = 'Strategy execution failed',
    *,
    strategy_name: str | None = None,
    **kwargs: Any
) -> None

Raised when per-tenant configuration access or mutation fails.
def __init__(
    message: str = 'Tenant config error',
    **kwargs: Any
) -> None

Initialise a TenantConfigError.

Parameters
ParameterTypeDescription
`message`strHuman-readable description of the config error. **kwargs: Additional context forwarded to the base class.

Base class for all tenancy-related domain errors.
def __init__(
    message: str = 'Tenant error',
    **kwargs: Any
) -> None

Initialise a TenantError.

Parameters
ParameterTypeDescription
`message`strHuman-readable description of the error. **kwargs: Additional context forwarded to the base class.

Raised when an operation is attempted on an inactive tenant.
def __init__(
    tenant_id: str = '',
    **kwargs: Any
) -> None

Initialise a TenantInactiveError.

Parameters
ParameterTypeDescription
`tenant_id`strThe inactive tenant identifier. **kwargs: Additional context forwarded to the base class.

Raised when a requested tenant does not exist.
def __init__(
    tenant_id: str = '',
    **kwargs: Any
) -> None

Initialise a TenantNotFoundError.

Parameters
ParameterTypeDescription
`tenant_id`strThe tenant identifier that was not found. **kwargs: Additional context forwarded to the base class.

Raised when tenant provisioning (isolation setup) fails.
def __init__(
    message: str = 'Tenant provisioning failed',
    **kwargs: Any
) -> None

Initialise a TenantProvisioningError.

Parameters
ParameterTypeDescription
`message`strHuman-readable description of the provisioning failure. **kwargs: Additional context forwarded to the base class.

Raised when tenant resolution fails unexpectedly.
def __init__(
    message: str = 'Tenant resolution failed',
    **kwargs: Any
) -> None

Initialise a TenantResolutionError.

Parameters
ParameterTypeDescription
`message`strHuman-readable description of the resolution failure. **kwargs: Additional context forwarded to the base class.

Raised when a tenant slug is already in use.
def __init__(
    slug: str = '',
    **kwargs: Any
) -> None

Initialise a TenantSlugConflictError.

Parameters
ParameterTypeDescription
`slug`strThe conflicting tenant slug. **kwargs: Additional context forwarded to the base class.

Raised when an operation is attempted on a suspended tenant.
def __init__(
    tenant_id: str = '',
    **kwargs: Any
) -> None

Initialise a TenantSuspendedError.

Parameters
ParameterTypeDescription
`tenant_id`strThe suspended tenant identifier. **kwargs: Additional context forwarded to the base class.

Base exception for tool errors.
def __init__(
    message: str = 'Tool error',
    **kwargs: Any
) -> None

Data validation error.
def __init__(
    message: str = 'Validation failed',
    errors: list[FieldError] | None = None,
    **kwargs: Any
) -> None
property errors() -> list[FieldError]

Return the list of field errors.

def add_error(
    field: str,
    message: str,
    code: str = 'invalid'
) -> ValidationError

Add a field error and return self for chaining.