Skip to content
GitHubDiscord

API Reference

Relevance-based conversation history pruning.

Unlike sliding-window truncation, implementations score each turn for relevance to the current query and preserve high-value turns regardless of their position in the history.

async def prune(
    history: list[ChatMessageProtocol],
    current_query: str,
    max_turns: int
) -> list[ChatMessageProtocol]

Prune history to at most max_turns, preserving relevant turns.

Parameters
ParameterTypeDescription
`history`list[ChatMessageProtocol]Full conversation history as ChatMessageProtocol instances.
`current_query`strThe current user query (used for relevance scoring).
`max_turns`intMaximum number of turns to retain.
Returns
TypeDescription
list[ChatMessageProtocol]Pruned history in chronological order.

Protocol for merging a branch session back into a parent session.

Implementors decide which turns and variables from the branch are incorporated into the parent when a branch is merged.

async def merge(
    parent: SessionState,
    branch: SessionState
) -> SessionState

Merge branch into parent and return the combined state.

Parameters
ParameterTypeDescription
`parent`SessionStateThe original session state.
`branch`SessionStateThe forked branch to merge back.
Returns
TypeDescription
SessionStateA new ``SessionState`` combining both sessions.

Protocol for per-request session context.

Provides access to the current session within a request scope, with ability to bind and retrieve session state.

property session_id() -> str

Get the current session ID.

Returns
TypeDescription
strThe current session ID
Raises
ExceptionDescription
SessionErrorIf no session is currently bound
property state() -> SessionState

Get the current session state.

Returns
TypeDescription
SessionStateThe current session state
Raises
ExceptionDescription
SessionErrorIf no session is currently bound
async def get_or_create(user_id: str) -> SessionState

Get an existing session or create a new one.

Parameters
ParameterTypeDescription
`user_id`strThe user ID
Returns
TypeDescription
SessionStateAn existing or newly created session

Protocol for managing session lifecycle.

Handles session creation, resumption, state management, checkpointing, and restoration.

async def create(
    user_id: str,
    metadata: dict[str, Any] | None = None
) -> SessionState

Create a new session.

Parameters
ParameterTypeDescription
`user_id`strThe user ID
`metadata`dict[str, Any] | NoneOptional session metadata
Returns
TypeDescription
SessionStateThe newly created session
async def resume(session_id: str) -> SessionState | None

Resume an existing session.

Parameters
ParameterTypeDescription
`session_id`strThe session ID to resume
Returns
TypeDescription
SessionState | NoneThe session state, or None if not found or closed
async def add_turn(
    session_id: str,
    turn: SessionTurn
) -> None

Add a turn to a session.

Parameters
ParameterTypeDescription
`session_id`strThe session to update
`turn`SessionTurnThe turn to add
async def get_state(session_id: str) -> SessionState | None

Get the current state of a session.

Parameters
ParameterTypeDescription
`session_id`strThe session ID
Returns
TypeDescription
SessionState | NoneThe session state, or None if not found
async def checkpoint(session_id: str) -> SessionCheckpoint

Create a checkpoint of the session.

Parameters
ParameterTypeDescription
`session_id`strThe session to checkpoint
Returns
TypeDescription
SessionCheckpointThe created checkpoint
async def restore(checkpoint_id: str) -> SessionState

Restore session state from a checkpoint.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint ID to restore from
Returns
TypeDescription
SessionStateThe restored session state
async def close(session_id: str) -> None

Close/terminate a session.

Parameters
ParameterTypeDescription
`session_id`strThe session to close
async def suspend(session_id: str) -> SessionState

Suspend an active session without closing it.

Parameters
ParameterTypeDescription
`session_id`strThe session to suspend.
Returns
TypeDescription
SessionStateUpdated session state with status ``suspended``.

Protocol for storing and retrieving session state.

Implementations provide persistence for session state, supporting creation, loading, deletion, and enumeration of sessions.

async def save(state: SessionState) -> None

Save or update a session state.

Parameters
ParameterTypeDescription
`state`SessionStateThe session state to save
async def load(session_id: str) -> SessionState | None

Load a session state by ID.

Parameters
ParameterTypeDescription
`session_id`strThe session ID to load
Returns
TypeDescription
SessionState | NoneThe session state, or None if not found
async def delete(session_id: str) -> None

Delete a session.

Parameters
ParameterTypeDescription
`session_id`strThe session ID to delete
async def list_sessions(user_id: str) -> list[SessionState]

List all sessions for a user.

Parameters
ParameterTypeDescription
`user_id`strThe user ID to query
Returns
TypeDescription
list[SessionState]List of sessions owned by this user
async def save_checkpoint(checkpoint: SessionCheckpoint) -> None

Persist an immutable session checkpoint.

Parameters
ParameterTypeDescription
`checkpoint`SessionCheckpointThe checkpoint to save.
async def load_checkpoint(checkpoint_id: str) -> SessionCheckpoint | None

Load a checkpoint by ID.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint ID to load.
Returns
TypeDescription
SessionCheckpoint | NoneThe checkpoint, or None if not found.
async def list_checkpoints(session_id: str) -> list[SessionCheckpoint]

List all checkpoints for a session.

Parameters
ParameterTypeDescription
`session_id`strThe session to list checkpoints for.
Returns
TypeDescription
list[SessionCheckpoint]All checkpoints in chronological order.

Protocol for multi-agent turn selection.

Implementations decide which agent should speak next and whether the conversation should continue.

def register(
    agent_name: str,
    role: str
) -> None

Register an agent with this turn manager.

Parameters
ParameterTypeDescription
`agent_name`strUnique name of the agent.
`role`strRole of the agent (e.g. ``participant``, ``coordinator``).
async def select_next(session_id: str) -> str | None

Return the name of the next agent to act, or None to stop.

Parameters
ParameterTypeDescription
`session_id`strThe currently active group session.
Returns
TypeDescription
str | NoneAgent name, or ``None`` when the conversation should end.
async def is_complete(session_id: str) -> bool

Return True when the group conversation should terminate.

Parameters
ParameterTypeDescription
`session_id`strThe group session ID.
Returns
TypeDescription
boolTrue if the conversation is complete.
def filter_visible(
    turns: list[SessionTurn],
    *,
    agent_name: str
) -> list[SessionTurn]

Filter to only the turns visible to the given agent.

Parameters
ParameterTypeDescription
`turns`list[SessionTurn]Full turn list from the session.
`agent_name`strThe agent whose visibility to apply.
Returns
TypeDescription
list[SessionTurn]Subset of turns the agent is allowed to see.

Append the branch's new turns (after the fork point) to the parent.

Variables from the branch are shallow-merged into the parent, with branch values taking precedence for conflicting keys.

async def merge(
    parent: SessionState,
    branch: SessionState
) -> SessionState

Append new branch turns after the parent’s last turn.

Parameters
ParameterTypeDescription
`parent`SessionStateThe parent session state.
`branch`SessionStateThe branch session state.
Returns
TypeDescription
SessionStateMerged ``SessionState`` with branch turns appended.

Fork and merge session timelines.

Enables exploration of alternative conversation paths by creating independent branches from a session (or a specific checkpoint). Branches can later be merged back into the parent.

Parameters
ParameterTypeDescription
`store`Session store that persists session states and checkpoints.
`max_branches_per_session`Maximum allowed forks from a single session.
def __init__(
    store: SessionStoreProtocol,
    max_branches_per_session: int = 10
) -> None
async def fork(
    session_id: str,
    branch_name: str,
    *,
    from_checkpoint: str | None = None
) -> SessionState

Fork a session into a new independent branch.

The branch inherits all turns from the source session (or checkpoint) and is assigned a new session ID with parent_session_id pointing back to the origin.

Parameters
ParameterTypeDescription
`session_id`strThe session to fork from.
`branch_name`strA human-readable name for the branch.
`from_checkpoint`str | NoneIf provided, fork from this checkpoint instead of the current live state.
Returns
TypeDescription
SessionStateNewly created branch ``SessionState``.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session cannot be found.
CheckpointNotFoundErrorIf *from_checkpoint* cannot be found.
SessionCapacityErrorIf the branch limit is exceeded.
async def merge(
    parent_session_id: str,
    branch_session_id: str,
    *,
    strategy: MergeStrategy | None = None
) -> SessionState

Merge a branch back into its parent session.

Parameters
ParameterTypeDescription
`parent_session_id`strThe parent session to merge into.
`branch_session_id`strThe branch session to merge from.
`strategy`MergeStrategy | NoneMerge strategy to use; defaults to ``AppendMerge``.
Returns
TypeDescription
SessionStateUpdated parent ``SessionState`` with branch content merged in.
Raises
ExceptionDescription
SessionNotFoundErrorIf either session cannot be found.

Redis-backed session store for multi-process deployments.

Uses the CacheBackendProtocol protocol from lexigram-contracts so the concrete Redis client is never imported directly. Sessions and checkpoints are JSON-serialised. A per-user set index tracks which sessions belong to a given user.

Parameters
ParameterTypeDescription
`cache`Any object implementing ``CacheBackendProtocol`` (get/set/delete).
`ttl`Session TTL in seconds (default 24 h).
def __init__(
    cache: CacheBackendProtocol,
    ttl: int = 86400
) -> None
async def save(state: SessionState) -> None

Persist state in the cache.

Parameters
ParameterTypeDescription
`state`SessionStateSession state to save.
async def load(session_id: str) -> SessionState | None

Return the session state for session_id, or None.

Parameters
ParameterTypeDescription
`session_id`strThe session to load.
Returns
TypeDescription
SessionState | NoneDeserialised ``SessionState`` or ``None``.
async def delete(session_id: str) -> None

Remove a session from the cache.

Parameters
ParameterTypeDescription
`session_id`strThe session to delete.
async def list_sessions(user_id: str) -> list[SessionState]

List all sessions for user_id.

Parameters
ParameterTypeDescription
`user_id`strThe user to query.
Returns
TypeDescription
list[SessionState]All sessions found for that user.
async def save_checkpoint(checkpoint: SessionCheckpoint) -> None

Persist a checkpoint in the cache.

Parameters
ParameterTypeDescription
`checkpoint`SessionCheckpointThe checkpoint to save.
async def load_checkpoint(checkpoint_id: str) -> SessionCheckpoint | None

Return the checkpoint for checkpoint_id, or None.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to load.
Returns
TypeDescription
SessionCheckpoint | NoneDeserialised ``SessionCheckpoint`` or ``None``.
async def list_checkpoints(session_id: str) -> list[SessionCheckpoint]

List all checkpoints for session_id, oldest-first.

Parameters
ParameterTypeDescription
`session_id`strThe session to query.
Returns
TypeDescription
list[SessionCheckpoint]Checkpoints in chronological order.
async def delete_checkpoint(checkpoint_id: str) -> None

Remove a checkpoint from the cache.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to delete.

Creates, loads, and prunes session checkpoints.

Wraps the store’s checkpoint methods with pruning logic so the per-session checkpoint count never exceeds the configured maximum.

Parameters
ParameterTypeDescription
`store`Session store providing checkpoint persistence.
`max_checkpoints_per_session`Maximum checkpoints to retain per session.
def __init__(
    store: SessionStoreProtocol,
    max_checkpoints_per_session: int = 50
) -> None
async def create(
    state: SessionState,
    *,
    metadata: dict[str, Any] | None = None
) -> SessionCheckpoint

Snapshot the current session state as an immutable checkpoint.

Prunes the oldest checkpoints if the per-session limit is exceeded.

Parameters
ParameterTypeDescription
`state`SessionStateThe session state to snapshot.
`metadata`dict[str, Any] | NoneOptional metadata to attach to the checkpoint.
Returns
TypeDescription
SessionCheckpointThe newly created ``SessionCheckpoint``.
async def restore(checkpoint_id: str) -> SessionState

Restore a session state from a checkpoint.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint ID to restore.
Returns
TypeDescription
SessionStateDeep copy of the session state at checkpoint time.
Raises
ExceptionDescription
CheckpointNotFoundErrorIf no checkpoint with that ID exists.
async def list(session_id: str) -> list[SessionCheckpoint]

Return all checkpoints for session_id, oldest first.

Parameters
ParameterTypeDescription
`session_id`strThe session to query.
Returns
TypeDescription
list[SessionCheckpoint]List of checkpoints in chronological order.

SQL-backed session store for durable production storage.

Uses DatabaseProviderProtocol from lexigram-contracts so the concrete driver (asyncpg, aiosqlite, …) is never imported directly. All state is JSON-serialised for the SQL TEXT columns.

Tables used:

  • ai_sessions — one row per session
  • ai_checkpoints — one row per checkpoint
Parameters
ParameterTypeDescription
`db`Any object implementing ``DatabaseProviderProtocol`` (provides ``scoped_context()`` and ``get_scoped_connection()``).
def __init__(db: DatabaseProviderProtocol) -> None
async def save(state: SessionState) -> None

Upsert state in the database.

Parameters
ParameterTypeDescription
`state`SessionStateThe session state to persist.
async def load(session_id: str) -> SessionState | None

Return the session state for session_id, or None.

Parameters
ParameterTypeDescription
`session_id`strThe session to load.
Returns
TypeDescription
SessionState | NoneDeserialised ``SessionState`` or ``None``.
async def delete(session_id: str) -> None

Remove a session from the database.

Parameters
ParameterTypeDescription
`session_id`strThe session to delete.
async def list_sessions(user_id: str) -> list[SessionState]

List all sessions belonging to user_id.

Parameters
ParameterTypeDescription
`user_id`strUser to filter by.
Returns
TypeDescription
list[SessionState]All sessions for that user.
async def save_checkpoint(checkpoint: SessionCheckpoint) -> None

Persist an immutable checkpoint.

Parameters
ParameterTypeDescription
`checkpoint`SessionCheckpointThe checkpoint to save.
async def load_checkpoint(checkpoint_id: str) -> SessionCheckpoint | None

Return the checkpoint for checkpoint_id, or None.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to load.
Returns
TypeDescription
SessionCheckpoint | NoneDeserialised ``SessionCheckpoint`` or ``None``.
async def list_checkpoints(session_id: str) -> list[SessionCheckpoint]

List all checkpoints for session_id, oldest-first.

Parameters
ParameterTypeDescription
`session_id`strThe session to query.
Returns
TypeDescription
list[SessionCheckpoint]Checkpoints in chronological order.
async def delete_checkpoint(checkpoint_id: str) -> None

Remove a checkpoint from the database.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to delete.

A session shared by multiple agents with coordinated turn-taking.

Wraps a SessionManagerImpl and a TurnManager to run a structured multi-agent conversation. Each agent is called in the order determined by the turn manager, and each agent sees only the turns filtered by RoleIsolation.

Parameters
ParameterTypeDescription
`session_manager`Session lifecycle manager.
`turn_manager`Strategy for selecting the next speaking agent.
`role_isolation`Optional turn-visibility filter; defaults to all-visible.
def __init__(
    session_manager: Any,
    turn_manager: TurnManager | None = None,
    role_isolation: RoleIsolation | None = None
) -> None
def add_agent(
    agent: Any,
    role: str = 'participant'
) -> None

Register an agent with the group session.

Parameters
ParameterTypeDescription
`agent`AnyAn object with a ``name`` attribute and an async ``execute(message, history)`` method returning a ``Result``.
`role`strRole label for turn management.
async def run(
    session_id: str,
    initial_message: str,
    *,
    max_rounds: int = 10
) -> SessionState

Run the multi-agent conversation and return the final session state.

Posts initial_message as a user turn then iterates until all agents are done or max_rounds cycles complete.

Parameters
ParameterTypeDescription
`session_id`strThe session to run within (must already exist).
`initial_message`strThe opening user message.
`max_rounds`intMaximum conversation rounds (passed to turn manager).
Returns
TypeDescription
SessionStateFinal ``SessionState`` after the conversation completes.
property agent_names() -> list[str]

Names of all registered agents in registration order.

Returns
TypeDescription
list[str]List of agent names.

In-process session and checkpoint storage.

All data is stored in plain Python dicts and is lost when the process exits. Suitable for development, testing, and single-process deployments.

def __init__() -> None
async def save(state: SessionState) -> None

Persist or overwrite a session state.

Parameters
ParameterTypeDescription
`state`SessionStateThe session state to save.
async def load(session_id: str) -> SessionState | None

Return the session state for session_id, or None.

Parameters
ParameterTypeDescription
`session_id`strThe session to load.
Returns
TypeDescription
SessionState | NoneThe session state, or ``None`` if not found.
async def delete(session_id: str) -> None

Remove a session from the store.

Parameters
ParameterTypeDescription
`session_id`strThe session to delete.
async def list_sessions(user_id: str) -> list[SessionState]

List all sessions belonging to user_id.

Parameters
ParameterTypeDescription
`user_id`strThe user to filter by.
Returns
TypeDescription
list[SessionState]All sessions owned by that user.
async def save_checkpoint(checkpoint: SessionCheckpoint) -> None

Persist an immutable session checkpoint.

Parameters
ParameterTypeDescription
`checkpoint`SessionCheckpointThe checkpoint to store.
async def load_checkpoint(checkpoint_id: str) -> SessionCheckpoint | None

Return the checkpoint for checkpoint_id, or None.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to load.
Returns
TypeDescription
SessionCheckpoint | NoneThe checkpoint, or ``None`` if not found.
async def list_checkpoints(session_id: str) -> list[SessionCheckpoint]

List all checkpoints for session_id, sorted oldest-first.

Parameters
ParameterTypeDescription
`session_id`strThe session to query.
Returns
TypeDescription
list[SessionCheckpoint]Checkpoints in chronological order.
async def delete_checkpoint(checkpoint_id: str) -> None

Remove a checkpoint from the store.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to delete.
async def expire_old_sessions(ttl_seconds: int) -> int

Delete sessions whose last update is older than ttl_seconds.

Parameters
ParameterTypeDescription
`ttl_seconds`intMaximum allowed age in seconds.
Returns
TypeDescription
intNumber of sessions expired.

Select agents by explicit priority; highest-priority agent goes first.

Agents with higher priority values speak first. After each full rotation at the highest priority, lower-priority agents get a chance.

Parameters
ParameterTypeDescription
`priorities`Mapping of agent name → integer priority (higher = first).
`max_rounds`Maximum full cycles before completion.
def __init__(
    priorities: dict[str, int] | None = None,
    max_rounds: int = 10
) -> None
def register(
    agent_name: str,
    role: str
) -> None

Register an agent.

Parameters
ParameterTypeDescription
`agent_name`strAgent name.
`role`strAgent role label.
async def select_next(session_id: str) -> str | None

Return the highest-priority agent that has not yet spoken this round.

Parameters
ParameterTypeDescription
`session_id`strUnused; kept for protocol compatibility.
Returns
TypeDescription
str | NoneAgent name or ``None``.
async def is_complete(session_id: str) -> bool

Return True after max_rounds full cycles.

Parameters
ParameterTypeDescription
`session_id`strUnused; kept for protocol compatibility.
Returns
TypeDescription
boolTrue when the conversation is complete.
def filter_visible(
    turns: list[SessionTurn],
    *,
    agent_name: str
) -> list[SessionTurn]

All turns visible to all agents in priority mode.

Parameters
ParameterTypeDescription
`turns`list[SessionTurn]Full session turn list.
`agent_name`strThe querying agent name.
Returns
TypeDescription
list[SessionTurn]All turns unchanged.

Keyword-overlap relevance pruner for conversation history.

Implements ContextPrunerProtocol. When the history exceeds max_turns, system messages are always preserved and non-system turns are scored by keyword overlap with the current query plus a recency bonus, then the top-scoring turns are retained in their original chronological order.

async def prune(
    history: list[ChatMessageProtocol],
    current_query: str,
    max_turns: int
) -> list[ChatMessageProtocol]

Prune history to at most max_turns, keeping relevant turns.

Scoring formula per non-system turn

score = overlap_count + recency_ratio * 0.3

where overlap_count is the number of tokens shared between the turn content and current_query, and recency_ratio is the turn’s normalised position in the non-system history (0 = oldest, 1 = newest).

Parameters
ParameterTypeDescription
`history`list[ChatMessageProtocol]Full conversation history.
`current_query`strThe current user query for relevance scoring.
`max_turns`intMaximum number of turns to retain.
Returns
TypeDescription
list[ChatMessageProtocol]Pruned history in chronological order.

Control which turns each agent can see within a group session.

By default all turns are visible to everyone. Role isolation allows marking individual turns as private to a specific agent, so sensitive reasoning by one agent is not leaked to peers.

Turn visibility rules (checked in order):

  1. A turn tagged visible_to in its metadata is only shown to the listed agent names.
  2. A turn tagged hidden_from in its metadata is hidden from those agent names.
  3. Otherwise the turn is visible to everyone.
def filter_for_agent(
    turns: list[SessionTurn],
    agent_name: str
) -> list[SessionTurn]

Return only the turns the given agent is allowed to see.

Parameters
ParameterTypeDescription
`turns`list[SessionTurn]Complete session turn list.
`agent_name`strThe agent whose view to compute.
Returns
TypeDescription
list[SessionTurn]Filtered list of turns visible to *agent_name*.

Cycle through registered agents in registration order.

Each call to select_next advances to the next agent in the list, wrapping around when the end is reached. Returns None after a configurable number of total rounds.

Parameters
ParameterTypeDescription
`max_rounds`Total rounds before the conversation is considered complete.
def __init__(max_rounds: int = 10) -> None
def register(
    agent_name: str,
    role: str
) -> None

Register an agent for round-robin participation.

Parameters
ParameterTypeDescription
`agent_name`strUnique agent name.
`role`strAgent role label.
async def select_next(session_id: str) -> str | None

Return the next agent in the rotation, or None when done.

Parameters
ParameterTypeDescription
`session_id`strUnused; kept for protocol compatibility.
Returns
TypeDescription
str | NoneAgent name or ``None``.
async def is_complete(session_id: str) -> bool

Return True after max_rounds full cycles.

Parameters
ParameterTypeDescription
`session_id`strUnused; kept for protocol compatibility.
Returns
TypeDescription
boolTrue when the conversation is complete.
def filter_visible(
    turns: list[SessionTurn],
    *,
    agent_name: str
) -> list[SessionTurn]

All turns are visible to all agents in round-robin mode.

Parameters
ParameterTypeDescription
`turns`list[SessionTurn]Full session turn list.
`agent_name`strThe agent querying visibility.
Returns
TypeDescription
list[SessionTurn]All turns unchanged.

Cherry-pick specific turns from the branch by turn ID.
Parameters
ParameterTypeDescription
`turn_ids`IDs of the branch turns to include in the merge.
def __init__(turn_ids: list[str]) -> None
async def merge(
    parent: SessionState,
    branch: SessionState
) -> SessionState

Copy only the specified branch turns into the parent.

Parameters
ParameterTypeDescription
`parent`SessionStateThe parent session state.
`branch`SessionStateThe branch session state.
Returns
TypeDescription
SessionStateMerged ``SessionState`` with selected branch turns appended.

Computed analytics for a single session.

All fields are derived from SessionState and its SessionTurn records.

Attributes: session_id: The session this report covers. total_turns: Number of turns (all roles) in the session. total_tokens: Cumulative tokens across all turns. total_cost: Cumulative cost (USD) across all turns. duration_seconds: Wall-clock time from first to last turn. 0 if fewer than two turns exist. agents_used: Deduplicated list of provider names seen across turns. tools_invoked: Deduplicated flat list of tool names referenced in SessionTurn.tool_calls. avg_response_time_ms: Average milliseconds between consecutive turns. 0.0 if fewer than two turns. models_used: Deduplicated list of model identifiers used across turns.


Payload fired when checkpointing persists a session snapshot.

Emitted when an AI session is closed or expires.

Consumed by: audit, analytics, resource cleanup.


Payload fired when a session is closed for further writes.

Configuration for the session management subsystem.

Controls lifecycle timeouts, checkpointing thresholds, multi-agent settings, backend selection, and web middleware behaviour.

def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Check config is safe for the target environment.


Per-request session context backed by a ``ContextVar``.

Implements SessionContextProtocol. Each asyncio task (request) has its own isolated slot in _SESSION_STATE; no cross-request leakage occurs even under high concurrency.

The typical lifecycle inside a request middleware is

token = ctx.bind(state) # request start
try:
... # handle request
finally:
ctx.unbind(token) # request end
Parameters
ParameterTypeDescription
`manager`Session lifecycle manager used by ``get_or_create``.
def __init__(manager: SessionManagerProtocol) -> None
property session_id() -> str

Current session ID from the task-scoped context variable.

Returns
TypeDescription
strThe bound session ID.
Raises
ExceptionDescription
SessionErrorIf no session has been bound for this task.
property state() -> SessionState

Current session state from the task-scoped context variable.

Returns
TypeDescription
SessionStateThe bound ``SessionState``.
Raises
ExceptionDescription
SessionErrorIf no session has been bound for this task.
async def get_or_create(user_id: str) -> SessionState

Return the bound session, or create a new one if unbound.

Parameters
ParameterTypeDescription
`user_id`strOwner of the session (only used when creating).
Returns
TypeDescription
SessionStateAn existing or freshly created ``SessionState``.
def bind(state: SessionState) -> contextvars.Token[SessionState | None]

Bind a session state to the current asyncio task.

Should be called at the start of a request by middleware.

Parameters
ParameterTypeDescription
`state`SessionStateThe ``SessionState`` to bind.
Returns
TypeDescription
contextvars.Token[SessionState | None]A ``Token`` that must be passed to ``unbind`` to restore the previous context value.
def unbind(token: contextvars.Token[SessionState | None]) -> None

Restore the context variable to its pre-bind value.

Should be called at the end of a request (in a finally block) by middleware.

Parameters
ParameterTypeDescription
`token`contextvars.Token[SessionState | None]The token returned by the corresponding ``bind`` call.

Emitted when a new AI session is created.

Consumed by: audit, analytics, billing.


Manages session lifecycle with persistence and optional memory integration.

Implements SessionManagerProtocol. Orchestrates create, resume, suspend, close, add_turn, checkpoint, and restore operations. Publishes domain events via an optional event bus and triggers memory consolidation on session close when a consolidator is available.

Constructor parameters are injected by the DI container; every dependency except config and store is optional to allow graceful degradation.

Parameters
ParameterTypeDescription
`config`Session configuration.
`store`Persistence backend for session state and checkpoints.
`memory`Optional working memory for recording turns.
`consolidator`Optional memory consolidator triggered on close.
`event_bus`Optional event bus for lifecycle events.
def __init__(
    config: SessionConfig,
    store: SessionStoreProtocol,
    memory: WorkingMemoryProtocol | None = None,
    consolidator: Any | None = None,
    event_bus: Any | None = None
) -> None
async def create(
    user_id: str,
    metadata: dict[str, Any] | None = None
) -> SessionState

Create a new session.

Parameters
ParameterTypeDescription
`user_id`strThe owning user.
`metadata`dict[str, Any] | NoneOptional session metadata.
Returns
TypeDescription
SessionStateNewly created ``SessionState`` with status ``active``.
Raises
ExceptionDescription
SessionCapacityErrorIf the per-user session limit is reached.
async def resume(session_id: str) -> SessionState | None

Resume an existing session.

Transitions SUSPENDED → ACTIVE. Returns the updated state or None if the session does not exist.

Parameters
ParameterTypeDescription
`session_id`strID of the session to resume.
Returns
TypeDescription
SessionState | NoneUpdated ``SessionState``, or ``None`` if not found.
Raises
ExceptionDescription
SessionClosedErrorIf the session is already closed.
async def add_turn(
    session_id: str,
    turn: SessionTurn
) -> None

Add a conversation turn and update session metrics.

Also records the turn in working memory if available, and triggers an auto-checkpoint when the configured interval is reached.

Parameters
ParameterTypeDescription
`session_id`strTarget session ID.
`turn`SessionTurnThe turn to append.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
SessionClosedErrorIf the session is closed.
SessionCapacityErrorIf the per-session turn limit is reached.
async def get_state(session_id: str) -> SessionState | None

Return the current state of a session, or None if not found.

Parameters
ParameterTypeDescription
`session_id`strThe session ID to look up.
Returns
TypeDescription
SessionState | None``SessionState`` or ``None``.
async def checkpoint(session_id: str) -> SessionCheckpoint

Create an immutable snapshot of the current session state.

Parameters
ParameterTypeDescription
`session_id`strThe session to snapshot.
Returns
TypeDescription
SessionCheckpointThe created ``SessionCheckpoint``.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
async def restore(checkpoint_id: str) -> SessionState

Restore a session to a previous checkpoint.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to restore from.
Returns
TypeDescription
SessionStateThe restored ``SessionState``.
Raises
ExceptionDescription
CheckpointNotFoundErrorIf the checkpoint does not exist.
async def close(session_id: str) -> None

Close a session and optionally trigger memory consolidation.

Parameters
ParameterTypeDescription
`session_id`strThe session to close.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
async def suspend(session_id: str) -> SessionState

Suspend an active session without closing it.

Parameters
ParameterTypeDescription
`session_id`strThe session to suspend.
Returns
TypeDescription
SessionStateUpdated ``SessionState`` with status ``suspended``.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
SessionTransitionErrorIf the session cannot be suspended.

Manages session lifecycle with persistence and optional memory integration.

Implements SessionManagerProtocol. Orchestrates create, resume, suspend, close, add_turn, checkpoint, and restore operations. Publishes domain events via an optional event bus and triggers memory consolidation on session close when a consolidator is available.

Constructor parameters are injected by the DI container; every dependency except config and store is optional to allow graceful degradation.

Parameters
ParameterTypeDescription
`config`Session configuration.
`store`Persistence backend for session state and checkpoints.
`memory`Optional working memory for recording turns.
`consolidator`Optional memory consolidator triggered on close.
`event_bus`Optional event bus for lifecycle events.
def __init__(
    config: SessionConfig,
    store: SessionStoreProtocol,
    memory: WorkingMemoryProtocol | None = None,
    consolidator: Any | None = None,
    event_bus: Any | None = None
) -> None
async def create(
    user_id: str,
    metadata: dict[str, Any] | None = None
) -> SessionState

Create a new session.

Parameters
ParameterTypeDescription
`user_id`strThe owning user.
`metadata`dict[str, Any] | NoneOptional session metadata.
Returns
TypeDescription
SessionStateNewly created ``SessionState`` with status ``active``.
Raises
ExceptionDescription
SessionCapacityErrorIf the per-user session limit is reached.
async def resume(session_id: str) -> SessionState | None

Resume an existing session.

Transitions SUSPENDED → ACTIVE. Returns the updated state or None if the session does not exist.

Parameters
ParameterTypeDescription
`session_id`strID of the session to resume.
Returns
TypeDescription
SessionState | NoneUpdated ``SessionState``, or ``None`` if not found.
Raises
ExceptionDescription
SessionClosedErrorIf the session is already closed.
async def add_turn(
    session_id: str,
    turn: SessionTurn
) -> None

Add a conversation turn and update session metrics.

Also records the turn in working memory if available, and triggers an auto-checkpoint when the configured interval is reached.

Parameters
ParameterTypeDescription
`session_id`strTarget session ID.
`turn`SessionTurnThe turn to append.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
SessionClosedErrorIf the session is closed.
SessionCapacityErrorIf the per-session turn limit is reached.
async def get_state(session_id: str) -> SessionState | None

Return the current state of a session, or None if not found.

Parameters
ParameterTypeDescription
`session_id`strThe session ID to look up.
Returns
TypeDescription
SessionState | None``SessionState`` or ``None``.
async def checkpoint(session_id: str) -> SessionCheckpoint

Create an immutable snapshot of the current session state.

Parameters
ParameterTypeDescription
`session_id`strThe session to snapshot.
Returns
TypeDescription
SessionCheckpointThe created ``SessionCheckpoint``.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
async def restore(checkpoint_id: str) -> SessionState

Restore a session to a previous checkpoint.

Parameters
ParameterTypeDescription
`checkpoint_id`strThe checkpoint to restore from.
Returns
TypeDescription
SessionStateThe restored ``SessionState``.
Raises
ExceptionDescription
CheckpointNotFoundErrorIf the checkpoint does not exist.
async def close(session_id: str) -> None

Close a session and optionally trigger memory consolidation.

Parameters
ParameterTypeDescription
`session_id`strThe session to close.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
async def suspend(session_id: str) -> SessionState

Suspend an active session without closing it.

Parameters
ParameterTypeDescription
`session_id`strThe session to suspend.
Returns
TypeDescription
SessionStateUpdated ``SessionState`` with status ``suspended``.
Raises
ExceptionDescription
SessionNotFoundErrorIf the session does not exist.
SessionTransitionErrorIf the session cannot be suspended.

ASGI middleware that resolves or creates a session for each request.

Session ID is resolved from (in priority order):

  1. X-Session-ID header (configurable via config.header_name).
  2. ?session_id= query parameter.
  3. Cookie named config.cookie_name (when set).
  4. Auto-creates a new session when none of the above yield a valid ID.

The resolved SessionContext is attached to request.scope["session"] (when the ASGI scope exposes a mutable scope dict, as Starlette/FastAPI do) and can be injected into route handlers.

Parameters
ParameterTypeDescription
`session_manager`Session lifecycle manager.
`config`Session configuration (controls cookie/header names and TTL).
def __init__(
    session_manager: SessionManagerProtocol,
    config: SessionConfig
) -> None

AI conversation session management integration.

Call configure to register SessionStoreProtocol and SessionManagerProtocol implementations for injection.

Usage

from lexigram.ai.session.config import SessionConfig
@module(
imports=[
SessionModule.configure(
SessionConfig(backend="cache"),
enable_cleanup_scheduler=True,
)
]
)
class AppModule(Module):
pass

Error Handling

Session operations surface typed exceptions that can be caught
directly or handled via the Result pattern::
from lexigram.ai.session.exceptions import (
SessionError, # base — catch-all
SessionNotFoundError, # session ID not in store
SessionClosedError, # write on a closed session
SessionExpiredError, # TTL exceeded
CheckpointNotFoundError,# checkpoint ID not found
)

Exports: SessionStoreProtocol, SessionManagerProtocol, SessionError, SessionNotFoundError, SessionClosedError, SessionExpiredError, CheckpointNotFoundError

def configure(
    cls,
    config: SessionConfig | None = None,
    *,
    enable_cleanup_scheduler: bool = True
) -> DynamicModule

Create a SessionModule with the given configuration.

Parameters
ParameterTypeDescription
`config`SessionConfig | NoneSessionConfig, a plain ``dict`` of the same keys, or ``None`` to use defaults.
`enable_cleanup_scheduler`boolStart the background expired-session cleanup loop during boot. Defaults to ``True``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(
    cls,
    config: SessionConfig | None = None
) -> DynamicModule

Create a SessionModule suitable for unit and integration testing.

Uses in-memory store with the cleanup scheduler disabled to avoid background tasks during tests.

Parameters
ParameterTypeDescription
`config`SessionConfig | NoneOptional config override. Uses safe test defaults when None.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Registers session management services in the DI container.

Wires SessionStoreProtocol and SessionManagerProtocol based on the configured backend. Background cleanup is started during boot().

def __init__(
    config: SessionConfig | dict | None = None,
    *,
    enable_cleanup_scheduler: bool = True
) -> None

Initialise the provider.

async def register(container: ContainerRegistrarProtocol) -> None

Register session services in the container.

async def boot(container: BootContainerProtocol) -> None

Pre-warm session services and start the cleanup scheduler.

async def shutdown() -> None

Stop the background cleanup loop.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check — always healthy (in-process domain provider).

No external backend to ping.

Parameters
ParameterTypeDescription
`timeout`floatIgnored for in-process providers.
Returns
TypeDescription
HealthCheckResultAlways HEALTHY — no external backend to ping.

Payload fired when a session manager opens a new session.

Validates and enforces session lifecycle status transitions.

Encodes the allowed state graph:

  • ACTIVE → SUSPENDED or CLOSED
  • SUSPENDED → ACTIVE or CLOSED
  • CLOSED / EXPIRED → terminal (no transitions allowed)
def validate(
    session_id: str,
    from_status: str,
    to_status: str
) -> None

Assert that a transition is allowed, raise otherwise.

Parameters
ParameterTypeDescription
`session_id`strThe session being transitioned (for error messages).
`from_status`strCurrent status string.
`to_status`strDesired target status string.
Raises
ExceptionDescription
SessionTransitionErrorIf the transition is not permitted.
def can_transition(
    from_status: str,
    to_status: str
) -> bool

Return True if the transition from from_status to to_status is valid.

Parameters
ParameterTypeDescription
`from_status`strCurrent status string.
`to_status`strDesired target status string.
Returns
TypeDescription
boolTrue when the transition is allowed.
def is_terminal(status: str) -> bool

Return True if the given status is a terminal (no-exit) state.

Parameters
ParameterTypeDescription
`status`strStatus string to check.
Returns
TypeDescription
boolTrue for ``closed`` and ``expired``.

Compute and apply incremental differences between session states.

Reduces checkpoint storage cost by storing only what changed between consecutive checkpoints rather than full state copies.

def compute(
    old: SessionState,
    new: SessionState
) -> dict[str, Any]

Return a dict containing only the fields that changed.

Compares old and new and returns a minimal diff that, when applied to old, reproduces new.

Parameters
ParameterTypeDescription
`old`SessionStateThe baseline session state.
`new`SessionStateThe updated session state.
Returns
TypeDescription
dict[str, Any]Mapping of changed field names to their new values.
def apply(
    base: SessionState,
    diff: dict[str, Any]
) -> SessionState

Apply a diff produced by compute to reproduce the new state.

Parameters
ParameterTypeDescription
`base`SessionStateThe baseline session state.
`diff`dict[str, Any]A dict previously returned by compute.
Returns
TypeDescription
SessionStateA new ``SessionState`` with the diff applied.

Route to agents based on keyword/topic detection in the latest turn.
Parameters
ParameterTypeDescription
`topic_map`Mapping of keyword (lowercase) → agent name.
`fallback_agent`Agent to use when no topic match is found.
`max_rounds`Maximum total delegations before completion.
def __init__(
    topic_map: dict[str, str],
    fallback_agent: str | None = None,
    max_rounds: int = 10
) -> None
def register(
    agent_name: str,
    role: str
) -> None

Register an agent.

Parameters
ParameterTypeDescription
`agent_name`strAgent name.
`role`strAgent role label.
async def select_next(session_id: str) -> str | None

Select an agent based on the content of the most recent turn.

Falls back to the configured fallback agent if no keyword matches.

Parameters
ParameterTypeDescription
`session_id`strSession ID for retrieving the last known turns.
Returns
TypeDescription
str | NoneAgent name or ``None``.
def record_turns(
    session_id: str,
    turns: list[SessionTurn]
) -> None

Update the local turn cache used for topic detection.

Parameters
ParameterTypeDescription
`session_id`strSession ID.
`turns`list[SessionTurn]Current turn list for that session.
async def is_complete(session_id: str) -> bool

Return True after max_rounds delegations.

Parameters
ParameterTypeDescription
`session_id`strUnused; kept for protocol compatibility.
Returns
TypeDescription
boolTrue when the conversation is complete.
def filter_visible(
    turns: list[SessionTurn],
    *,
    agent_name: str
) -> list[SessionTurn]

All turns visible in topic-based mode.

Parameters
ParameterTypeDescription
`turns`list[SessionTurn]Full session turn list.
`agent_name`strThe querying agent name.
Returns
TypeDescription
list[SessionTurn]All turns unchanged.

Raised when a checkpoint cannot be found by its ID.
Parameters
ParameterTypeDescription
`checkpoint_id`The ID of the checkpoint that was not found.
def __init__(checkpoint_id: str) -> None

Raised when a session limit is exceeded.
Parameters
ParameterTypeDescription
`detail`Human-readable description of the exceeded limit.
def __init__(detail: str) -> None

Raised when an operation is attempted on a closed session.
Parameters
ParameterTypeDescription
`session_id`The ID of the closed session.
def __init__(session_id: str) -> None

Base error for all session operations.

All session-specific exceptions inherit from this class.


Raised when an operation is attempted on an expired session.
Parameters
ParameterTypeDescription
`session_id`The ID of the expired session.
def __init__(session_id: str) -> None

Raised when a session cannot be found by its ID.
Parameters
ParameterTypeDescription
`session_id`The ID of the session that was not found.
def __init__(session_id: str) -> None

Raised when an invalid state transition is attempted.
Parameters
ParameterTypeDescription
`session_id`The session ID.
`from_status`Current status.
`to_status`Attempted target status.
def __init__(
    session_id: str,
    from_status: str,
    to_status: str
) -> None