Relevance-based conversation history pruning.
Unlike sliding-window truncation, implementations score each turn for
relevance to the current query and preserve high-value turns regardless
of their position in the history.
async def prune (
history : list[ ChatMessageProtocol] ,
current_query : str ,
max_turns : int
) -> list[ ChatMessageProtocol]
Prune history to at most max_turns, preserving relevant turns.
Parameters Parameter Type Description `history` list[ ChatMessageProtocol] Full conversation history as ChatMessageProtocol instances. `current_query` str The current user query (used for relevance scoring). `max_turns` int Maximum number of turns to retain.
Returns Type Description list[ ChatMessageProtocol] Pruned history in chronological order.
Protocol for merging a branch session back into a parent session.
Implementors decide which turns and variables from the branch are
incorporated into the parent when a branch is merged.
async def merge (
parent : SessionState ,
branch : SessionState
) -> SessionState
Merge branch into parent and return the combined state.
Parameters Parameter Type Description `parent` SessionState The original session state. `branch` SessionState The forked branch to merge back.
Returns Type Description SessionState A new ``SessionState`` combining both sessions.
Protocol for per-request session context.
Provides access to the current session within a request scope,
with ability to bind and retrieve session state.
property session_id ( ) -> str
Get the current session ID.
Returns Type Description str The current session ID
Raises Exception Description SessionError If no session is currently bound
property state ( ) -> SessionState
Get the current session state.
Returns Type Description SessionState The current session state
Raises Exception Description SessionError If no session is currently bound
async def get_or_create ( user_id : str ) -> SessionState
Get an existing session or create a new one.
Parameters Parameter Type Description `user_id` str The user ID
Returns Type Description SessionState An existing or newly created session
Protocol for managing session lifecycle.
Handles session creation, resumption, state management,
checkpointing, and restoration.
async def create (
user_id : str ,
metadata : dict[ str, Any] | None = None
) -> SessionState
Create a new session.
Parameters Parameter Type Description `user_id` str The user ID `metadata` dict[ str, Any] | None Optional session metadata
Returns Type Description SessionState The newly created session
async def resume ( session_id : str ) -> SessionState | None
Resume an existing session.
Parameters Parameter Type Description `session_id` str The session ID to resume
Returns Type Description SessionState | None The session state, or None if not found or closed
async def add_turn (
session_id : str ,
turn : SessionTurn
) -> None
Add a turn to a session.
Parameters Parameter Type Description `session_id` str The session to update `turn` SessionTurn The turn to add
async def get_state ( session_id : str ) -> SessionState | None
Get the current state of a session.
Parameters Parameter Type Description `session_id` str The session ID
Returns Type Description SessionState | None The session state, or None if not found
async def checkpoint ( session_id : str ) -> SessionCheckpoint
Create a checkpoint of the session.
Parameters Parameter Type Description `session_id` str The session to checkpoint
Returns Type Description SessionCheckpoint The created checkpoint
async def restore ( checkpoint_id : str ) -> SessionState
Restore session state from a checkpoint.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint ID to restore from
Returns Type Description SessionState The restored session state
async def close ( session_id : str ) -> None
Close/terminate a session.
Parameters Parameter Type Description `session_id` str The session to close
async def suspend ( session_id : str ) -> SessionState
Suspend an active session without closing it.
Parameters Parameter Type Description `session_id` str The session to suspend.
Returns Type Description SessionState Updated session state with status ``suspended``.
Protocol for storing and retrieving session state.
Implementations provide persistence for session state, supporting
creation, loading, deletion, and enumeration of sessions.
async def save ( state : SessionState ) -> None
Save or update a session state.
Parameters Parameter Type Description `state` SessionState The session state to save
async def load ( session_id : str ) -> SessionState | None
Load a session state by ID.
Parameters Parameter Type Description `session_id` str The session ID to load
Returns Type Description SessionState | None The session state, or None if not found
async def delete ( session_id : str ) -> None
Delete a session.
Parameters Parameter Type Description `session_id` str The session ID to delete
async def list_sessions ( user_id : str ) -> list[ SessionState]
List all sessions for a user.
Parameters Parameter Type Description `user_id` str The user ID to query
Returns Type Description list[ SessionState] List of sessions owned by this user
async def save_checkpoint ( checkpoint : SessionCheckpoint ) -> None
Persist an immutable session checkpoint.
Parameters Parameter Type Description `checkpoint` SessionCheckpoint The checkpoint to save.
async def load_checkpoint ( checkpoint_id : str ) -> SessionCheckpoint | None
Load a checkpoint by ID.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint ID to load.
Returns Type Description SessionCheckpoint | None The checkpoint, or None if not found.
async def list_checkpoints ( session_id : str ) -> list[ SessionCheckpoint]
List all checkpoints for a session.
Parameters Parameter Type Description `session_id` str The session to list checkpoints for.
Returns Type Description list[ SessionCheckpoint] All checkpoints in chronological order.
Protocol for multi-agent turn selection.
Implementations decide which agent should speak next and whether
the conversation should continue.
def register (
agent_name : str ,
role : str
) -> None
Register an agent with this turn manager.
Parameters Parameter Type Description `agent_name` str Unique name of the agent. `role` str Role of the agent (e.g. ``participant``, ``coordinator``).
async def select_next ( session_id : str ) -> str | None
Return the name of the next agent to act, or None to stop.
Parameters Parameter Type Description `session_id` str The currently active group session.
Returns Type Description str | None Agent name, or ``None`` when the conversation should end.
async def is_complete ( session_id : str ) -> bool
Return True when the group conversation should terminate.
Parameters Parameter Type Description `session_id` str The group session ID.
Returns Type Description bool True if the conversation is complete.
def filter_visible (
turns : list[ SessionTurn] ,
* ,
agent_name : str
) -> list[ SessionTurn]
Filter to only the turns visible to the given agent.
Parameters Parameter Type Description `turns` list[ SessionTurn] Full turn list from the session. `agent_name` str The agent whose visibility to apply.
Returns Type Description list[ SessionTurn] Subset of turns the agent is allowed to see.
Append the branch's new turns (after the fork point) to the parent.
Variables from the branch are shallow-merged into the parent, with
branch values taking precedence for conflicting keys.
async def merge (
parent : SessionState ,
branch : SessionState
) -> SessionState
Append new branch turns after the parent’s last turn.
Parameters Parameter Type Description `parent` SessionState The parent session state. `branch` SessionState The branch session state.
Returns Type Description SessionState Merged ``SessionState`` with branch turns appended.
Fork and merge session timelines.
Enables exploration of alternative conversation paths by creating
independent branches from a session (or a specific checkpoint).
Branches can later be merged back into the parent.
Parameters Parameter Type Description `store` Session store that persists session states and checkpoints. `max_branches_per_session` Maximum allowed forks from a single session.
async def fork (
session_id : str ,
branch_name : str ,
* ,
from_checkpoint : str | None = None
) -> SessionState
Fork a session into a new independent branch.
The branch inherits all turns from the source session (or checkpoint)
and is assigned a new session ID with parent_session_id pointing
back to the origin.
Parameters Parameter Type Description `session_id` str The session to fork from. `branch_name` str A human-readable name for the branch. `from_checkpoint` str | None If provided, fork from this checkpoint instead of the current live state.
Returns Type Description SessionState Newly created branch ``SessionState``.
async def merge (
parent_session_id : str ,
branch_session_id : str ,
* ,
strategy : MergeStrategy | None = None
) -> SessionState
Merge a branch back into its parent session.
Parameters Parameter Type Description `parent_session_id` str The parent session to merge into. `branch_session_id` str The branch session to merge from. `strategy` MergeStrategy | None Merge strategy to use; defaults to ``AppendMerge``.
Returns Type Description SessionState Updated parent ``SessionState`` with branch content merged in.
Redis-backed session store for multi-process deployments.
Uses the CacheBackendProtocol protocol from lexigram-contracts so the
concrete Redis client is never imported directly. Sessions and
checkpoints are JSON-serialised. A per-user set index tracks which
sessions belong to a given user.
Parameters Parameter Type Description `cache` Any object implementing ``CacheBackendProtocol`` (get/set/delete). `ttl` Session TTL in seconds (default 24 h).
def __init__ (
cache : CacheBackendProtocol ,
ttl : int = 86400
) -> None
async def save ( state : SessionState ) -> None
Persist state in the cache.
Parameters Parameter Type Description `state` SessionState Session state to save.
async def load ( session_id : str ) -> SessionState | None
Return the session state for session_id , or None.
Parameters Parameter Type Description `session_id` str The session to load.
Returns Type Description SessionState | None Deserialised ``SessionState`` or ``None``.
async def delete ( session_id : str ) -> None
Remove a session from the cache.
Parameters Parameter Type Description `session_id` str The session to delete.
async def list_sessions ( user_id : str ) -> list[ SessionState]
List all sessions for user_id .
Parameters Parameter Type Description `user_id` str The user to query.
Returns Type Description list[ SessionState] All sessions found for that user.
async def save_checkpoint ( checkpoint : SessionCheckpoint ) -> None
Persist a checkpoint in the cache.
Parameters Parameter Type Description `checkpoint` SessionCheckpoint The checkpoint to save.
async def load_checkpoint ( checkpoint_id : str ) -> SessionCheckpoint | None
Return the checkpoint for checkpoint_id , or None.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to load.
Returns Type Description SessionCheckpoint | None Deserialised ``SessionCheckpoint`` or ``None``.
async def list_checkpoints ( session_id : str ) -> list[ SessionCheckpoint]
List all checkpoints for session_id , oldest-first.
Parameters Parameter Type Description `session_id` str The session to query.
Returns Type Description list[ SessionCheckpoint] Checkpoints in chronological order.
async def delete_checkpoint ( checkpoint_id : str ) -> None
Remove a checkpoint from the cache.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to delete.
Creates, loads, and prunes session checkpoints.
Wraps the store’s checkpoint methods with pruning logic so the
per-session checkpoint count never exceeds the configured maximum.
Parameters Parameter Type Description `store` Session store providing checkpoint persistence. `max_checkpoints_per_session` Maximum checkpoints to retain per session.
async def create (
state : SessionState ,
* ,
metadata : dict[ str, Any] | None = None
) -> SessionCheckpoint
Snapshot the current session state as an immutable checkpoint.
Prunes the oldest checkpoints if the per-session limit is exceeded.
Parameters Parameter Type Description `state` SessionState The session state to snapshot. `metadata` dict[ str, Any] | None Optional metadata to attach to the checkpoint.
Returns Type Description SessionCheckpoint The newly created ``SessionCheckpoint``.
async def restore ( checkpoint_id : str ) -> SessionState
Restore a session state from a checkpoint.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint ID to restore.
Returns Type Description SessionState Deep copy of the session state at checkpoint time.
async def list ( session_id : str ) -> list[ SessionCheckpoint]
Return all checkpoints for session_id , oldest first.
Parameters Parameter Type Description `session_id` str The session to query.
Returns Type Description list[ SessionCheckpoint] List of checkpoints in chronological order.
SQL-backed session store for durable production storage.
Uses DatabaseProviderProtocol from lexigram-contracts so the
concrete driver (asyncpg, aiosqlite, …) is never imported directly.
All state is JSON-serialised for the SQL TEXT columns.
Tables used:
ai_sessions — one row per session
ai_checkpoints — one row per checkpoint
Parameters Parameter Type Description `db` Any object implementing ``DatabaseProviderProtocol`` (provides ``scoped_context()`` and ``get_scoped_connection()``).
def __init__ ( db : DatabaseProviderProtocol ) -> None
async def save ( state : SessionState ) -> None
Upsert state in the database.
Parameters Parameter Type Description `state` SessionState The session state to persist.
async def load ( session_id : str ) -> SessionState | None
Return the session state for session_id , or None.
Parameters Parameter Type Description `session_id` str The session to load.
Returns Type Description SessionState | None Deserialised ``SessionState`` or ``None``.
async def delete ( session_id : str ) -> None
Remove a session from the database.
Parameters Parameter Type Description `session_id` str The session to delete.
async def list_sessions ( user_id : str ) -> list[ SessionState]
List all sessions belonging to user_id .
Parameters Parameter Type Description `user_id` str User to filter by.
Returns Type Description list[ SessionState] All sessions for that user.
async def save_checkpoint ( checkpoint : SessionCheckpoint ) -> None
Persist an immutable checkpoint.
Parameters Parameter Type Description `checkpoint` SessionCheckpoint The checkpoint to save.
async def load_checkpoint ( checkpoint_id : str ) -> SessionCheckpoint | None
Return the checkpoint for checkpoint_id , or None.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to load.
Returns Type Description SessionCheckpoint | None Deserialised ``SessionCheckpoint`` or ``None``.
async def list_checkpoints ( session_id : str ) -> list[ SessionCheckpoint]
List all checkpoints for session_id , oldest-first.
Parameters Parameter Type Description `session_id` str The session to query.
Returns Type Description list[ SessionCheckpoint] Checkpoints in chronological order.
async def delete_checkpoint ( checkpoint_id : str ) -> None
Remove a checkpoint from the database.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to delete.
A session shared by multiple agents with coordinated turn-taking.
Wraps a SessionManagerImpl and a TurnManager to run a structured
multi-agent conversation. Each agent is called in the order determined
by the turn manager, and each agent sees only the turns filtered by
RoleIsolation.
Parameters Parameter Type Description `session_manager` Session lifecycle manager. `turn_manager` Strategy for selecting the next speaking agent. `role_isolation` Optional turn-visibility filter; defaults to all-visible.
def __init__ (
session_manager : Any ,
turn_manager : TurnManager | None = None ,
role_isolation : RoleIsolation | None = None
) -> None
def add_agent (
agent : Any ,
role : str = 'participant'
) -> None
Register an agent with the group session.
Parameters Parameter Type Description `agent` Any An object with a ``name`` attribute and an async ``execute(message, history)`` method returning a ``Result``. `role` str Role label for turn management.
async def run (
session_id : str ,
initial_message : str ,
* ,
max_rounds : int = 10
) -> SessionState
Run the multi-agent conversation and return the final session state.
Posts initial_message as a user turn then iterates until all agents
are done or max_rounds cycles complete.
Parameters Parameter Type Description `session_id` str The session to run within (must already exist). `initial_message` str The opening user message. `max_rounds` int Maximum conversation rounds (passed to turn manager).
Returns Type Description SessionState Final ``SessionState`` after the conversation completes.
property agent_names ( ) -> list[ str]
Names of all registered agents in registration order.
Returns Type Description list[ str] List of agent names.
In-process session and checkpoint storage.
All data is stored in plain Python dicts and is lost when the process
exits. Suitable for development, testing, and single-process deployments.
async def save ( state : SessionState ) -> None
Persist or overwrite a session state.
Parameters Parameter Type Description `state` SessionState The session state to save.
async def load ( session_id : str ) -> SessionState | None
Return the session state for session_id , or None.
Parameters Parameter Type Description `session_id` str The session to load.
Returns Type Description SessionState | None The session state, or ``None`` if not found.
async def delete ( session_id : str ) -> None
Remove a session from the store.
Parameters Parameter Type Description `session_id` str The session to delete.
async def list_sessions ( user_id : str ) -> list[ SessionState]
List all sessions belonging to user_id .
Parameters Parameter Type Description `user_id` str The user to filter by.
Returns Type Description list[ SessionState] All sessions owned by that user.
async def save_checkpoint ( checkpoint : SessionCheckpoint ) -> None
Persist an immutable session checkpoint.
Parameters Parameter Type Description `checkpoint` SessionCheckpoint The checkpoint to store.
async def load_checkpoint ( checkpoint_id : str ) -> SessionCheckpoint | None
Return the checkpoint for checkpoint_id , or None.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to load.
Returns Type Description SessionCheckpoint | None The checkpoint, or ``None`` if not found.
async def list_checkpoints ( session_id : str ) -> list[ SessionCheckpoint]
List all checkpoints for session_id , sorted oldest-first.
Parameters Parameter Type Description `session_id` str The session to query.
Returns Type Description list[ SessionCheckpoint] Checkpoints in chronological order.
async def delete_checkpoint ( checkpoint_id : str ) -> None
Remove a checkpoint from the store.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to delete.
async def expire_old_sessions ( ttl_seconds : int ) -> int
Delete sessions whose last update is older than ttl_seconds .
Parameters Parameter Type Description `ttl_seconds` int Maximum allowed age in seconds.
Returns Type Description int Number of sessions expired.
Select agents by explicit priority; highest-priority agent goes first.
Agents with higher priority values speak first. After each full rotation
at the highest priority, lower-priority agents get a chance.
Parameters Parameter Type Description `priorities` Mapping of agent name → integer priority (higher = first). `max_rounds` Maximum full cycles before completion.
def __init__ (
priorities : dict[ str, int] | None = None ,
max_rounds : int = 10
) -> None
def register (
agent_name : str ,
role : str
) -> None
Register an agent.
Parameters Parameter Type Description `agent_name` str Agent name. `role` str Agent role label.
async def select_next ( session_id : str ) -> str | None
Return the highest-priority agent that has not yet spoken this round.
Parameters Parameter Type Description `session_id` str Unused; kept for protocol compatibility.
Returns Type Description str | None Agent name or ``None``.
async def is_complete ( session_id : str ) -> bool
Return True after max_rounds full cycles.
Parameters Parameter Type Description `session_id` str Unused; kept for protocol compatibility.
Returns Type Description bool True when the conversation is complete.
def filter_visible (
turns : list[ SessionTurn] ,
* ,
agent_name : str
) -> list[ SessionTurn]
All turns visible to all agents in priority mode.
Parameters Parameter Type Description `turns` list[ SessionTurn] Full session turn list. `agent_name` str The querying agent name.
Returns Type Description list[ SessionTurn] All turns unchanged.
Keyword-overlap relevance pruner for conversation history.
Implements ContextPrunerProtocol. When the history exceeds
max_turns, system messages are always preserved and non-system
turns are scored by keyword overlap with the current query plus a
recency bonus, then the top-scoring turns are retained in their
original chronological order.
async def prune (
history : list[ ChatMessageProtocol] ,
current_query : str ,
max_turns : int
) -> list[ ChatMessageProtocol]
Prune history to at most max_turns, keeping relevant turns.
Scoring formula per non-system turn
score = overlap_count + recency_ratio * 0.3
where overlap_count is the number of tokens shared between the
turn content and current_query, and recency_ratio is the
turn’s normalised position in the non-system history (0 = oldest,
1 = newest).
Parameters Parameter Type Description `history` list[ ChatMessageProtocol] Full conversation history. `current_query` str The current user query for relevance scoring. `max_turns` int Maximum number of turns to retain.
Returns Type Description list[ ChatMessageProtocol] Pruned history in chronological order.
Control which turns each agent can see within a group session.
By default all turns are visible to everyone. Role isolation allows
marking individual turns as private to a specific agent, so sensitive
reasoning by one agent is not leaked to peers.
Turn visibility rules (checked in order):
A turn tagged visible_to in its metadata is only shown to
the listed agent names.
A turn tagged hidden_from in its metadata is hidden from those
agent names.
Otherwise the turn is visible to everyone.
def filter_for_agent (
turns : list[ SessionTurn] ,
agent_name : str
) -> list[ SessionTurn]
Return only the turns the given agent is allowed to see.
Parameters Parameter Type Description `turns` list[ SessionTurn] Complete session turn list. `agent_name` str The agent whose view to compute.
Returns Type Description list[ SessionTurn] Filtered list of turns visible to *agent_name*.
Cycle through registered agents in registration order.
Each call to select_next advances to the next agent in the list,
wrapping around when the end is reached. Returns None after a
configurable number of total rounds.
Parameters Parameter Type Description `max_rounds` Total rounds before the conversation is considered complete.
def __init__ ( max_rounds : int = 10 ) -> None
def register (
agent_name : str ,
role : str
) -> None
Register an agent for round-robin participation.
Parameters Parameter Type Description `agent_name` str Unique agent name. `role` str Agent role label.
async def select_next ( session_id : str ) -> str | None
Return the next agent in the rotation, or None when done.
Parameters Parameter Type Description `session_id` str Unused; kept for protocol compatibility.
Returns Type Description str | None Agent name or ``None``.
async def is_complete ( session_id : str ) -> bool
Return True after max_rounds full cycles.
Parameters Parameter Type Description `session_id` str Unused; kept for protocol compatibility.
Returns Type Description bool True when the conversation is complete.
def filter_visible (
turns : list[ SessionTurn] ,
* ,
agent_name : str
) -> list[ SessionTurn]
All turns are visible to all agents in round-robin mode.
Parameters Parameter Type Description `turns` list[ SessionTurn] Full session turn list. `agent_name` str The agent querying visibility.
Returns Type Description list[ SessionTurn] All turns unchanged.
Cherry-pick specific turns from the branch by turn ID.
Parameters Parameter Type Description `turn_ids` IDs of the branch turns to include in the merge.
def __init__ ( turn_ids : list[ str] ) -> None
async def merge (
parent : SessionState ,
branch : SessionState
) -> SessionState
Copy only the specified branch turns into the parent.
Parameters Parameter Type Description `parent` SessionState The parent session state. `branch` SessionState The branch session state.
Returns Type Description SessionState Merged ``SessionState`` with selected branch turns appended.
Computed analytics for a single session.
All fields are derived from SessionState
and its SessionTurn records.
Attributes:
session_id: The session this report covers.
total_turns: Number of turns (all roles) in the session.
total_tokens: Cumulative tokens across all turns.
total_cost: Cumulative cost (USD) across all turns.
duration_seconds: Wall-clock time from first to last turn. 0 if fewer
than two turns exist.
agents_used: Deduplicated list of provider names seen across turns.
tools_invoked: Deduplicated flat list of tool names referenced in
SessionTurn.tool_calls.
avg_response_time_ms: Average milliseconds between consecutive turns.
0.0 if fewer than two turns.
models_used: Deduplicated list of model identifiers used across turns.
Payload fired when checkpointing persists a session snapshot.
Emitted when an AI session is closed or expires.
Consumed by: audit, analytics, resource cleanup.
Payload fired when a session is closed for further writes.
Configuration for the session management subsystem.
Controls lifecycle timeouts, checkpointing thresholds, multi-agent
settings, backend selection, and web middleware behaviour.
def validate_for_environment ( env : Environment | None = None ) -> list[ ConfigIssue]
Check config is safe for the target environment.
Per-request session context backed by a ``ContextVar``.
Implements SessionContextProtocol. Each asyncio task (request) has
its own isolated slot in _SESSION_STATE; no cross-request leakage
occurs even under high concurrency.
The typical lifecycle inside a request middleware is
token = ctx. bind (state) # request start
ctx. unbind (token) # request end
Parameters Parameter Type Description `manager` Session lifecycle manager used by ``get_or_create``.
property session_id ( ) -> str
Current session ID from the task-scoped context variable.
Returns Type Description str The bound session ID.
Raises Exception Description SessionError If no session has been bound for this task.
property state ( ) -> SessionState
Current session state from the task-scoped context variable.
Returns Type Description SessionState The bound ``SessionState``.
Raises Exception Description SessionError If no session has been bound for this task.
async def get_or_create ( user_id : str ) -> SessionState
Return the bound session, or create a new one if unbound.
Parameters Parameter Type Description `user_id` str Owner of the session (only used when creating).
Returns Type Description SessionState An existing or freshly created ``SessionState``.
def bind ( state : SessionState ) -> contextvars.Token[ SessionState | None ]
Bind a session state to the current asyncio task.
Should be called at the start of a request by middleware.
Parameters Parameter Type Description `state` SessionState The ``SessionState`` to bind.
Returns Type Description contextvars.Token[ SessionState | None ] A ``Token`` that must be passed to ``unbind`` to restore the previous context value.
def unbind ( token : contextvars.Token[ SessionState | None ] ) -> None
Restore the context variable to its pre-bind value.
Should be called at the end of a request (in a finally block)
by middleware.
Parameters Parameter Type Description `token` contextvars.Token[ SessionState | None ] The token returned by the corresponding ``bind`` call.
Emitted when a new AI session is created.
Consumed by: audit, analytics, billing.
Manages session lifecycle with persistence and optional memory integration.
Implements SessionManagerProtocol. Orchestrates create, resume,
suspend, close, add_turn, checkpoint, and restore operations. Publishes
domain events via an optional event bus and triggers memory consolidation
on session close when a consolidator is available.
Constructor parameters are injected by the DI container; every dependency
except config and store is optional to allow graceful degradation.
Parameters Parameter Type Description `config` Session configuration. `store` Persistence backend for session state and checkpoints. `memory` Optional working memory for recording turns. `consolidator` Optional memory consolidator triggered on close. `event_bus` Optional event bus for lifecycle events.
async def create (
user_id : str ,
metadata : dict[ str, Any] | None = None
) -> SessionState
Create a new session.
Parameters Parameter Type Description `user_id` str The owning user. `metadata` dict[ str, Any] | None Optional session metadata.
Returns Type Description SessionState Newly created ``SessionState`` with status ``active``.
async def resume ( session_id : str ) -> SessionState | None
Resume an existing session.
Transitions SUSPENDED → ACTIVE. Returns the updated state or None
if the session does not exist.
Parameters Parameter Type Description `session_id` str ID of the session to resume.
Returns Type Description SessionState | None Updated ``SessionState``, or ``None`` if not found.
async def add_turn (
session_id : str ,
turn : SessionTurn
) -> None
Add a conversation turn and update session metrics.
Also records the turn in working memory if available, and triggers
an auto-checkpoint when the configured interval is reached.
Parameters Parameter Type Description `session_id` str Target session ID. `turn` SessionTurn The turn to append.
async def get_state ( session_id : str ) -> SessionState | None
Return the current state of a session, or None if not found.
Parameters Parameter Type Description `session_id` str The session ID to look up.
Returns Type Description SessionState | None ``SessionState`` or ``None``.
async def checkpoint ( session_id : str ) -> SessionCheckpoint
Create an immutable snapshot of the current session state.
Parameters Parameter Type Description `session_id` str The session to snapshot.
Returns Type Description SessionCheckpoint The created ``SessionCheckpoint``.
async def restore ( checkpoint_id : str ) -> SessionState
Restore a session to a previous checkpoint.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to restore from.
Returns Type Description SessionState The restored ``SessionState``.
async def close ( session_id : str ) -> None
Close a session and optionally trigger memory consolidation.
Parameters Parameter Type Description `session_id` str The session to close.
async def suspend ( session_id : str ) -> SessionState
Suspend an active session without closing it.
Parameters Parameter Type Description `session_id` str The session to suspend.
Returns Type Description SessionState Updated ``SessionState`` with status ``suspended``.
Manages session lifecycle with persistence and optional memory integration.
Implements SessionManagerProtocol. Orchestrates create, resume,
suspend, close, add_turn, checkpoint, and restore operations. Publishes
domain events via an optional event bus and triggers memory consolidation
on session close when a consolidator is available.
Constructor parameters are injected by the DI container; every dependency
except config and store is optional to allow graceful degradation.
Parameters Parameter Type Description `config` Session configuration. `store` Persistence backend for session state and checkpoints. `memory` Optional working memory for recording turns. `consolidator` Optional memory consolidator triggered on close. `event_bus` Optional event bus for lifecycle events.
async def create (
user_id : str ,
metadata : dict[ str, Any] | None = None
) -> SessionState
Create a new session.
Parameters Parameter Type Description `user_id` str The owning user. `metadata` dict[ str, Any] | None Optional session metadata.
Returns Type Description SessionState Newly created ``SessionState`` with status ``active``.
async def resume ( session_id : str ) -> SessionState | None
Resume an existing session.
Transitions SUSPENDED → ACTIVE. Returns the updated state or None
if the session does not exist.
Parameters Parameter Type Description `session_id` str ID of the session to resume.
Returns Type Description SessionState | None Updated ``SessionState``, or ``None`` if not found.
async def add_turn (
session_id : str ,
turn : SessionTurn
) -> None
Add a conversation turn and update session metrics.
Also records the turn in working memory if available, and triggers
an auto-checkpoint when the configured interval is reached.
Parameters Parameter Type Description `session_id` str Target session ID. `turn` SessionTurn The turn to append.
async def get_state ( session_id : str ) -> SessionState | None
Return the current state of a session, or None if not found.
Parameters Parameter Type Description `session_id` str The session ID to look up.
Returns Type Description SessionState | None ``SessionState`` or ``None``.
async def checkpoint ( session_id : str ) -> SessionCheckpoint
Create an immutable snapshot of the current session state.
Parameters Parameter Type Description `session_id` str The session to snapshot.
Returns Type Description SessionCheckpoint The created ``SessionCheckpoint``.
async def restore ( checkpoint_id : str ) -> SessionState
Restore a session to a previous checkpoint.
Parameters Parameter Type Description `checkpoint_id` str The checkpoint to restore from.
Returns Type Description SessionState The restored ``SessionState``.
async def close ( session_id : str ) -> None
Close a session and optionally trigger memory consolidation.
Parameters Parameter Type Description `session_id` str The session to close.
async def suspend ( session_id : str ) -> SessionState
Suspend an active session without closing it.
Parameters Parameter Type Description `session_id` str The session to suspend.
Returns Type Description SessionState Updated ``SessionState`` with status ``suspended``.
ASGI middleware that resolves or creates a session for each request.
Session ID is resolved from (in priority order):
X-Session-ID header (configurable via config.header_name).
?session_id= query parameter.
Cookie named config.cookie_name (when set).
Auto-creates a new session when none of the above yield a valid ID.
The resolved SessionContext is attached to request.scope["session"]
(when the ASGI scope exposes a mutable scope dict, as Starlette/FastAPI
do) and can be injected into route handlers.
Parameters Parameter Type Description `session_manager` Session lifecycle manager. `config` Session configuration (controls cookie/header names and TTL).
AI conversation session management integration.
Call configure to register
SessionStoreProtocol and
SessionManagerProtocol
implementations for injection.
Usage
from lexigram.ai.session.config import SessionConfig
SessionConfig ( backend = "cache" ),
enable_cleanup_scheduler = True ,
Error Handling
Session operations surface typed exceptions that can be caught
directly or handled via the Result pattern::
from lexigram.ai.session.exceptions import (
SessionError, # base — catch-all
SessionNotFoundError, # session ID not in store
SessionClosedError, # write on a closed session
SessionExpiredError, # TTL exceeded
CheckpointNotFoundError, # checkpoint ID not found
Exports:
SessionStoreProtocol ,
SessionManagerProtocol ,
SessionError ,
SessionNotFoundError ,
SessionClosedError ,
SessionExpiredError ,
CheckpointNotFoundError
Create a SessionModule with the given configuration.
Parameters Parameter Type Description `config` SessionConfig | None SessionConfig , a plain ``dict`` of the same keys, or ``None`` to use defaults.`enable_cleanup_scheduler` bool Start the background expired-session cleanup loop during boot. Defaults to ``True``.
Create a SessionModule suitable for unit and integration testing.
Uses in-memory store with the cleanup scheduler disabled to avoid
background tasks during tests.
Parameters Parameter Type Description `config` SessionConfig | None Optional config override. Uses safe test defaults when None.
Registers session management services in the DI container.
Wires SessionStoreProtocol and SessionManagerProtocol based on the
configured backend. Background cleanup is started during boot().
def __init__ (
config : SessionConfig | dict | None = None ,
* ,
enable_cleanup_scheduler : bool = True
) -> None
Initialise the provider.
Register session services in the container.
async def boot ( container : BootContainerProtocol ) -> None
Pre-warm session services and start the cleanup scheduler.
async def shutdown ( ) -> None
Stop the background cleanup loop.
async def health_check ( timeout : float = 5.0 ) -> HealthCheckResult
Health check — always healthy (in-process domain provider).
No external backend to ping.
Parameters Parameter Type Description `timeout` float Ignored for in-process providers.
Returns Type Description HealthCheckResult Always HEALTHY — no external backend to ping.
Payload fired when a session manager opens a new session.
Validates and enforces session lifecycle status transitions.
Encodes the allowed state graph:
ACTIVE → SUSPENDED or CLOSED
SUSPENDED → ACTIVE or CLOSED
CLOSED / EXPIRED → terminal (no transitions allowed)
def validate (
session_id : str ,
from_status : str ,
to_status : str
) -> None
Assert that a transition is allowed, raise otherwise.
Parameters Parameter Type Description `session_id` str The session being transitioned (for error messages). `from_status` str Current status string. `to_status` str Desired target status string.
def can_transition (
from_status : str ,
to_status : str
) -> bool
Return True if the transition from from_status to to_status is valid.
Parameters Parameter Type Description `from_status` str Current status string. `to_status` str Desired target status string.
Returns Type Description bool True when the transition is allowed.
def is_terminal ( status : str ) -> bool
Return True if the given status is a terminal (no-exit) state.
Parameters Parameter Type Description `status` str Status string to check.
Returns Type Description bool True for ``closed`` and ``expired``.
Compute and apply incremental differences between session states.
Reduces checkpoint storage cost by storing only what changed between
consecutive checkpoints rather than full state copies.
def compute (
old : SessionState ,
new : SessionState
) -> dict[ str, Any]
Return a dict containing only the fields that changed.
Compares old and new and returns a minimal diff that, when
applied to old , reproduces new .
Parameters Parameter Type Description `old` SessionState The baseline session state. `new` SessionState The updated session state.
Returns Type Description dict[ str, Any] Mapping of changed field names to their new values.
def apply (
base : SessionState ,
diff : dict[ str, Any]
) -> SessionState
Apply a diff produced by compute to reproduce the new state.
Parameters Parameter Type Description `base` SessionState The baseline session state. `diff` dict[ str, Any] A dict previously returned by compute.
Returns Type Description SessionState A new ``SessionState`` with the diff applied.
Route to agents based on keyword/topic detection in the latest turn.
Parameters Parameter Type Description `topic_map` Mapping of keyword (lowercase) → agent name. `fallback_agent` Agent to use when no topic match is found. `max_rounds` Maximum total delegations before completion.
def __init__ (
topic_map : dict[ str, str] ,
fallback_agent : str | None = None ,
max_rounds : int = 10
) -> None
def register (
agent_name : str ,
role : str
) -> None
Register an agent.
Parameters Parameter Type Description `agent_name` str Agent name. `role` str Agent role label.
async def select_next ( session_id : str ) -> str | None
Select an agent based on the content of the most recent turn.
Falls back to the configured fallback agent if no keyword matches.
Parameters Parameter Type Description `session_id` str Session ID for retrieving the last known turns.
Returns Type Description str | None Agent name or ``None``.
def record_turns (
session_id : str ,
turns : list[ SessionTurn]
) -> None
Update the local turn cache used for topic detection.
Parameters Parameter Type Description `session_id` str Session ID. `turns` list[ SessionTurn] Current turn list for that session.
async def is_complete ( session_id : str ) -> bool
Return True after max_rounds delegations.
Parameters Parameter Type Description `session_id` str Unused; kept for protocol compatibility.
Returns Type Description bool True when the conversation is complete.
def filter_visible (
turns : list[ SessionTurn] ,
* ,
agent_name : str
) -> list[ SessionTurn]
All turns visible in topic-based mode.
Parameters Parameter Type Description `turns` list[ SessionTurn] Full session turn list. `agent_name` str The querying agent name.
Returns Type Description list[ SessionTurn] All turns unchanged.
Raised when a checkpoint cannot be found by its ID.
Parameters Parameter Type Description `checkpoint_id` The ID of the checkpoint that was not found.
def __init__ ( checkpoint_id : str ) -> None
Raised when a session limit is exceeded.
Parameters Parameter Type Description `detail` Human-readable description of the exceeded limit.
def __init__ ( detail : str ) -> None
Raised when an operation is attempted on a closed session.
Parameters Parameter Type Description `session_id` The ID of the closed session.
def __init__ ( session_id : str ) -> None
Base error for all session operations.
All session-specific exceptions inherit from this class.
Raised when an operation is attempted on an expired session.
Parameters Parameter Type Description `session_id` The ID of the expired session.
def __init__ ( session_id : str ) -> None
Raised when a session cannot be found by its ID.
Parameters Parameter Type Description `session_id` The ID of the session that was not found.
def __init__ ( session_id : str ) -> None
Raised when an invalid state transition is attempted.
Parameters Parameter Type Description `session_id` The session ID. `from_status` Current status. `to_status` Attempted target status.
def __init__ (
session_id : str ,
from_status : str ,
to_status : str
) -> None