Skip to content
GitHubDiscord

API Reference

Strategy for consolidating episodic memories into semantic memories.
async def consolidate(entries: list[MemoryEntry]) -> list[MemoryEntry]

Indexes and retrieves memory entries by semantic similarity.
async def index(entry: MemoryEntry) -> str

Index a memory entry and return its ID.

async def search(
    query: str,
    top_k: int
) -> list[MemoryEntry]

Scores a MemoryEntry for pruning priority (higher = keep).
def score(
    entry: MemoryEntry,
    query: str | None = None
) -> float

Score a memory entry for retention.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to score.
`query`str | NoneOptional query context for relevance scoring.
Returns
TypeDescription
floatA score in range [0, 1] or higher, where higher means more important to keep.

Preserves high-importance entries; prunes low-importance stale ones.
def __init__(importance_threshold: float = 0.1) -> None

Initialise the importance threshold strategy.

Parameters
ParameterTypeDescription
`importance_threshold`floatEntries below this value are candidates for pruning.
def should_prune(entry: MemoryEntry) -> bool

Return True if entry importance is below threshold.

Parameters
ParameterTypeDescription
`entry`MemoryEntryEntry to evaluate.
Returns
TypeDescription
boolTrue if importance is below threshold.
def filter(entries: list[MemoryEntry]) -> tuple[list[MemoryEntry], list[MemoryEntry]]

Split entries into (kept, pruned).

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to evaluate.
Returns
TypeDescription
tuple[list[MemoryEntry], list[MemoryEntry]]Tuple of (kept, pruned) entry lists.

Fixed-capacity FIFO buffer of raw conversation turns.

The oldest entries are evicted when the buffer reaches max_entries.

def __init__(max_entries: int = 100) -> None

Initialise the buffer.

Parameters
ParameterTypeDescription
`max_entries`intMaximum number of entries to retain.
async def store(entry: MemoryEntry) -> None

Append entry to the buffer, evicting the oldest if full.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to store.
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return the top_k most recent entries as search results.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters (only top_k is used).
Returns
TypeDescription
list[MemorySearchResult]Most recent entries wrapped in search results with score 1.0.
async def get_recent(n: int) -> list[MemoryEntry]

Return the n most recent entries.

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Entries ordered newest-first.
async def delete(entry_id: str) -> None

Remove entry with entry_id from the buffer.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
async def clear() -> None

Clear all entries from the buffer.


MemoryStoreProtocol backed by a CacheBackendProtocol (e.g. Redis).
def __init__(
    cache: CacheBackendProtocol,
    ttl: int = 86400 * 30
) -> None
async def store(entry: MemoryEntry) -> None
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
async def get_recent(n: int) -> list[MemoryEntry]
async def delete(entry_id: str) -> None
async def clear() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Configuration for the background consolidation pipeline.

Attributes: enabled: Whether automatic consolidation is active. interval_seconds: How often to run a consolidation pass. age_threshold_hours: Minimum entry age before it can be consolidated. importance_prune_threshold: Entries below this importance are eligible for pruning. batch_size: Maximum entries processed per consolidation pass.


Runs MemoryConsolidator on a configurable interval.

Designed to be started once and cancelled on shutdown. Consolidation is only triggered when the interval elapses and entries are available.

def __init__(
    store: MemoryStoreProtocol,
    consolidator: MemoryConsolidatorProtocol,
    config: ConsolidationConfig | None = None
) -> None

Initialise the scheduler.

Parameters
ParameterTypeDescription
`store`MemoryStoreProtocolMemory store to read entries from.
`consolidator`MemoryConsolidatorProtocolConsolidator run on each cycle.
`config`ConsolidationConfig | NoneScheduling configuration.
async def start() -> None

Start the background consolidation loop.

async def stop() -> None

Cancel the background consolidation loop.

async def run_once() -> ConsolidationResult

Execute a single consolidation pass immediately.

Returns
TypeDescription
ConsolidationResultResult of the consolidation pass.

FIFO buffer that keeps the most recent conversation turns.

Provides a simple, bounded working-memory strategy that auto-evicts the oldest entries when max_turns or max_tokens limits are hit.

Example

buffer = ConversationBuffer(max_turns=20, max_tokens=4096)
await buffer.add(entry)
context = buffer.get_context()
Parameters
ParameterTypeDescription
`max_turns`Maximum number of turns to retain.
`max_tokens`Soft token cap — oldest entries are evicted until the total estimated token count is at or below this limit. Set to 0 to disable token-based eviction.
def __init__(
    max_turns: int = 20,
    max_tokens: int = 4096
) -> None

Initialise the conversation buffer.

Parameters
ParameterTypeDescription
`max_turns`intMaximum number of turns to retain.
`max_tokens`intSoft token cap (0 = no token limit).
async def add(entry: MemoryEntry) -> None

Add a memory entry to the buffer.

If adding the entry would exceed max_turns, the oldest entry is automatically evicted. After insertion, token-based eviction is applied if max_tokens > 0.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to add.
def get_context() -> list[MemoryEntry]

Return all entries currently in the buffer, oldest-first.

Returns
TypeDescription
list[MemoryEntry]Ordered list of memory entries.
def clear() -> None

Remove all entries from the buffer.

property size() -> int

Number of entries currently in the buffer.

property total_tokens() -> int

Estimated total token count across all buffered entries.


Stores complete conversation history partitioned by session_id.

Each session’s turns are appended in order. Retrieval returns the most recent turns for the requested session (filter via metadata).

def __init__(max_turns_per_session: int = 1000) -> None

Initialise the conversation store.

Parameters
ParameterTypeDescription
`max_turns_per_session`intMaximum turns kept per session.
async def store(entry: MemoryEntry) -> None

Persist entry under its session (from metadata).

The session is derived from entry.metadata["session_id"] when present, otherwise filed under a "_default" session.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to store.
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return entries for the session specified in query.filters.

Falls back to all entries if no session_id filter is given.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters including optional ``session_id`` filter.
Returns
TypeDescription
list[MemorySearchResult]Most recent entries for the matching session.
async def get_recent(n: int) -> list[MemoryEntry]

Return the n globally most recent entries.

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Entries ordered newest-first.
async def delete(entry_id: str) -> None

Remove entry from all sessions.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
async def clear() -> None

Clear all sessions and entries.

async def get_session_entries(session_id: str) -> list[MemoryEntry]

Return all entries for a given session in chronological order.

Parameters
ParameterTypeDescription
`session_id`strSession identifier.
Returns
TypeDescription
list[MemoryEntry]Entries in insertion order.

MemoryStoreProtocol backed by an SQL database.
def __init__(provider: DatabaseProviderProtocol) -> None
async def store(entry: MemoryEntry) -> None
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
async def get_recent(n: int) -> list[MemoryEntry]
async def delete(entry_id: str) -> None
async def clear() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Removes near-duplicate entries based on content similarity.

Two entries are duplicates if their lowercased content shares more than similarity_threshold characters of the shorter one (Jaccard-like).

def __init__(similarity_threshold: float = 0.85) -> None

Initialise the deduplication strategy.

Parameters
ParameterTypeDescription
`similarity_threshold`floatJaccard-like overlap above which entries are considered duplicates.
def deduplicate(entries: list[MemoryEntry]) -> tuple[list[MemoryEntry], list[MemoryEntry]]

Return (unique, duplicates).

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to deduplicate.
Returns
TypeDescription
tuple[list[MemoryEntry], list[MemoryEntry]]Tuple of (unique, duplicate) entry lists.

Prunes MemoryEntry lists to fit within a token budget.

Uses pluggable scoring strategies to rank entries by importance, then greedily selects entries until the token budget is exhausted.

Attributes: token_counter: Protocol for counting tokens in text. default_strategy: Default pruning strategy when none is specified.

def __init__(
    token_counter: TokenCounterProtocol,
    default_strategy: PruningStrategy = PruningStrategy.HYBRID
) -> None

Initialize the pruner.

Parameters
ParameterTypeDescription
`token_counter`TokenCounterProtocolImplementation of TokenCounterProtocol for token counting.
`default_strategy`PruningStrategyDefault strategy to use if not overridden. Default is HYBRID.
async def prune(
    entries: list[MemoryEntry],
    token_budget: int,
    query: str | None = None,
    strategy: PruningStrategy | None = None,
    **kwargs
) -> PruningResult

Prune a list of memory entries to fit within a token budget.

Scores all entries using the specified strategy, sorts them by score (descending), then greedily keeps entries until adding the next entry would exceed the remaining budget.

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]List of MemoryEntry objects to prune.
`token_budget`intMaximum number of tokens to keep.
`query`str | NoneOptional query context for relevance-based scoring.
`strategy`PruningStrategy | NoneOverride the default pruning strategy. If None, uses default_strategy. **kwargs: Additional keyword arguments (reserved for future use).
Returns
TypeDescription
PruningResultPruningResult containing kept entries (score-ordered), counts, and metadata about the pruning operation.

Extracts structured subject/predicate/object triples from memory entries.

When an LLM extract callable is injected, delegation occurs there. Otherwise a lightweight heuristic fallback is used, suitable for testing and environments without LLM access.

def __init__(extract_fn: Callable[[str], Awaitable[list[Triple]]] | None = None) -> None

Initialise the extractor.

Parameters
ParameterTypeDescription
`extract_fn`Callable[[str], Awaitable[list[Triple]]] | NoneAsync callable that returns triples from raw text. When *None*, a heuristic fallback is used.
async def extract(entry: MemoryEntry) -> list[Triple]

Extract triples from entry.content.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry whose content to parse.
Returns
TypeDescription
list[Triple]List of (subject, predicate, object_) tuples.
Raises
ExceptionDescription
FactExtractionErrorIf the extraction callable raises.

Indexes memory entries by the entities they mention.

Entries are stored normally but also indexed by lowercase entity token so that entity-scoped retrieval is O(1) per entity.

def __init__() -> None

Initialise the entity store.

async def store(
    entry: MemoryEntry,
    entities: list[str] | None = None
) -> None

Store entry and index it under each entity in entities.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to persist.
`entities`list[str] | NoneEntity names to associate with this entry.
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return entries scored by importance.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters.
Returns
TypeDescription
list[MemorySearchResult]Top-k entries ranked by importance.
async def get_recent(n: int) -> list[MemoryEntry]

Return the n most recent entries.

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Entries ordered newest-first.
async def delete(entry_id: str) -> None

Remove entry and its entity index entries.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
async def clear() -> None

Remove all entries and entity index data.

async def get_by_entity(entity: str) -> list[MemoryEntry]

Return all entries mentioning entity.

Parameters
ParameterTypeDescription
`entity`strEntity name to search.
Returns
TypeDescription
list[MemoryEntry]Entries referencing the entity, newest-first.

Compresses episodic memory entries by LLM-assisted summarisation.

When the episodic store grows beyond a threshold, older entries are grouped by session and collapsed into a single summary entry.

def __init__(summarise_fn: Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | None = None) -> None

Initialise the compressor.

Parameters
ParameterTypeDescription
`summarise_fn`Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | NoneAsync callable that accepts a list of entries and returns a single summary entry. When *None*, a simple concatenation fallback is used.
async def compress(
    entries: list[MemoryEntry],
    *,
    max_tokens: int = 200
) -> MemoryEntry

Compress entries into a single condensed memory entry.

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Chronologically ordered entries to compress.
`max_tokens`intHint to the summariser for output length.
Returns
TypeDescription
MemoryEntryA new MemoryEntry representing the compressed form.
Raises
ExceptionDescription
ConsolidationErrorIf the summarisation callable raises.

Configuration for episodic memory tier.

Attributes: default_top_k: Default number of episodes to retrieve. recency_weight: Weight applied to temporal recency during scoring. importance_weight: Weight applied to entry importance during scoring. relevance_weight: Weight applied to semantic similarity during scoring. ttl_seconds: Time-to-live for entries (0 = never expire).


Episodic memory layer backed by a pluggable MemoryStoreProtocol.
def __init__(backend: MemoryStoreProtocol) -> None
async def record(entry: MemoryEntry) -> None
async def recall(query: MemoryQuery) -> list[MemorySearchResult]
async def forget(entry_id: str) -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult

In-memory graph of subject/predicate/object facts.

Used by SemanticMemoryStore to persist extracted knowledge.

def __init__() -> None

Initialise an empty fact store.

def add(
    subject: str,
    predicate: str,
    object_: str,
    confidence: float = 1.0,
    metadata: dict | None = None
) -> str

Add a new fact triple.

Parameters
ParameterTypeDescription
`subject`strSubject entity.
`predicate`strRelationship type.
`object_`strObject value.
`confidence`floatConfidence score in [0.0, 1.0].
`metadata`dict | NoneOptional additional metadata.
Returns
TypeDescription
strUnique ID assigned to the stored fact.
def query_by_subject(subject: str) -> list[dict]

Return all facts where subject matches (case-insensitive prefix).

Parameters
ParameterTypeDescription
`subject`strSubject to filter by.
Returns
TypeDescription
list[dict]List of fact dicts (id, subject, predicate, object_, confidence).
def get_entity_facts(entity: str) -> list[dict]

Return all facts mentioning entity as subject or object.

Parameters
ParameterTypeDescription
`entity`strEntity name to search.
Returns
TypeDescription
list[dict]List of matching fact dicts.
def update_confidence(
    fact_id: str,
    confidence: float
) -> None

Update the confidence of an existing fact.

Parameters
ParameterTypeDescription
`fact_id`strID of the fact to update.
`confidence`floatNew confidence value in [0.0, 1.0].
def delete(fact_id: str) -> None

Remove a fact.

Parameters
ParameterTypeDescription
`fact_id`strID of the fact to remove.
def clear() -> None

Remove all facts.


Weighted blend of recency and content length as a proxy for relevance.

Content length serves as a simple heuristic for information density: longer entries are assumed to contain more contextual information.

Attributes: recency_weight: Weight for the recency component (default 0.6). relevance_weight: Weight for the content length component (default 0.4).

def __init__(
    recency_weight: float = 0.6,
    relevance_weight: float = 0.4
) -> None

Initialize the hybrid scorer.

Parameters
ParameterTypeDescription
`recency_weight`floatWeight for recency in blended score. Default 0.6.
`relevance_weight`floatWeight for content length (relevance proxy). Default 0.4.
def score_batch(
    entries: list,
    query: str | None = None
) -> list[float]

Score all entries together, normalizing recency to [0, 1].

Parameters
ParameterTypeDescription
`entries`listList of memory entries to score.
`query`str | NoneOptional query context (unused by HybridScorerImpl).
Returns
TypeDescription
list[float]List of scores parallel to the input entries list.
def score(
    entry: MemoryEntry,
    query: str | None = None
) -> float

Score a single entry (recency not normalized — use score_batch for batches).

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to score.
`query`str | NoneOptional query (unused by HybridScorerImpl).
Returns
TypeDescription
floatWeighted score of content length (recency omitted without batch context).

MemoryStoreProtocol backed by an in-process dictionary.
def __init__() -> None
async def store(entry: MemoryEntry) -> None
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
async def get_recent(n: int) -> list[MemoryEntry]
async def delete(entry_id: str) -> None
async def clear() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Top-level configuration for lexigram-ai-memory.

Attributes: working: Working memory token budget configuration. episodic: Episodic memory retrieval configuration. semantic: Semantic memory fact storage configuration. consolidation: Background consolidation pipeline configuration. default_backend: Backend type to use (‘in_memory’, ‘cache’, ‘database’, ‘vector’). ttl_seconds: Default entry TTL in seconds (0 = never expire).


Payload fired after a memory consolidation pass completes.

Attributes: strategy: Name of the consolidation strategy that ran (e.g. "recency_decay" or "deduplication").


Orchestrates consolidation of a batch of MemoryEntry objects.

Applies deduplication, recency decay pruning, and importance-floor pruning in sequence. Optionally runs a summarisation pass on the remaining aged entries.

def __init__(
    config: ConsolidationConfig | None = None,
    summarise_fn: Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | None = None
) -> None

Initialise the consolidator.

Parameters
ParameterTypeDescription
`config`ConsolidationConfig | NoneConsolidation thresholds. Defaults to ``ConsolidationConfig()``.
`summarise_fn`Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | NoneOptional async callable for summarising aged entry groups.
async def consolidate(entries: list[MemoryEntry]) -> ConsolidationResult

Consolidate entries via deduplication, decay, and importance pruning.

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to process.
Returns
TypeDescription
ConsolidationResultConsolidationResult with counts of processed, consolidated, pruned, and extracted entities.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report consolidator health.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds for the health check.
Returns
TypeDescription
HealthCheckResultHealthCheckResult indicating HEALTHY status.

Three-tier AI memory: working, episodic, and semantic.

Call configure to register MemoryProvider and expose all memory protocol contracts for injection.

Usage

from lexigram.ai.memory.config import MemoryConfig
from lexigram.ai.memory import MemoryModule
@module(
imports=[MemoryModule.configure(MemoryConfig(...))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: MemoryConfig | None = None,
    enable_consolidation: bool = True
) -> DynamicModule

Create a MemoryModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`MemoryConfig | NoneMemoryConfig or ``None`` for defaults (in-memory backend, standard budget fractions).
`enable_consolidation`boolRegister the ConsolidationScheduler which periodically promotes episodic memories into semantic storage. Defaults to ``True``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(
    cls,
    config: MemoryConfig | None = None
) -> DynamicModule

Create a MemoryModule suitable for unit and integration testing.

Uses in-memory backends with minimal side effects. Consolidation scheduling is disabled by default to avoid background timer tasks during tests.

Parameters
ParameterTypeDescription
`config`MemoryConfig | NoneOptional MemoryConfig override. Uses safe test defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Registers all memory services in the DI container.

Wires up the MemoryStoreProtocol, EpisodicMemoryProtocol, SemanticMemoryProtocol, WorkingMemoryProtocol, and consolidation services. The default backend is InMemoryMemoryBackend; callers with persistent needs should register a MemoryStoreProtocol override after this provider runs.

def __init__(
    config: MemoryConfig | None = None,
    enable_consolidation: bool = True
) -> None

Initialise the provider.

Parameters
ParameterTypeDescription
`config`MemoryConfig | NoneMemory configuration. Defaults to ``MemoryConfig()``.
`enable_consolidation`boolStart the consolidation scheduler during boot. When ``False``, the scheduler is never started regardless of ``config.consolidation.enabled``.
async def register(container: ContainerRegistrarProtocol) -> None

Register memory services.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolDI container registrar.
async def boot(container: ContainerResolverProtocol) -> None

Start the consolidation scheduler if enabled.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolResolved DI container.
async def shutdown() -> None

Stop the consolidation scheduler.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check — always healthy (in-process domain provider).

No external backend to ping.

Parameters
ParameterTypeDescription
`timeout`floatIgnored for in-process providers.
Returns
TypeDescription
HealthCheckResultAlways HEALTHY — no external backend to ping.

Emitted when a memory query returns results.

Consumed by: AI audit trails, context relevance analytics.


Payload fired when entries are retrieved from a memory store.

Attributes: tier: Memory tier that was queried. result_count: Number of entries returned by the retrieval.


Queries one or more MemoryStoreProtocol backends and merges results.

Results from each backend are pooled, deduplicated by entry ID, and re-ranked by the configured RelevanceRanker. Retrieval counts are tracked per entry to enable downstream analytics and relevance decay.

def __init__(
    sources: list[MemoryStoreProtocol],
    ranker: RelevanceRanker | None = None
) -> None

Initialise the retriever.

Parameters
ParameterTypeDescription
`sources`list[MemoryStoreProtocol]Memory store backends to query in parallel.
`ranker`RelevanceRanker | NoneOptional ranker. Defaults to a new ``RelevanceRanker``.
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Retrieve and merge results from all configured sources.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch query with weights and filters.
Returns
TypeDescription
list[MemorySearchResult]De-duplicated, re-ranked results capped at ``query.top_k``.
def add_source(source: MemoryStoreProtocol) -> None

Register a new backend source.

Parameters
ParameterTypeDescription
`source`MemoryStoreProtocolAdditional MemoryStoreProtocol to query.
def get_retrieval_count(entry_id: str) -> int

Return how many times a specific entry has been retrieved.

Parameters
ParameterTypeDescription
`entry_id`strThe memory entry ID.
Returns
TypeDescription
intNumber of times the entry appeared in retrieval results.
def get_retrieval_stats() -> dict[str, Any]

Return aggregate retrieval statistics.

Returns
TypeDescription
dict[str, Any]Dictionary with total retrievals, unique entries, and top-10 most-retrieved entries.
def reset_stats() -> None

Clear all retrieval tracking data.


Emitted when a memory entry is persisted to the memory store.

Consumed by: AI audit trails, context management, safety review.


Payload fired when an entry is written to a memory store.

Attributes: tier: Memory tier that received the write (e.g. "working", "episodic", or "semantic"). backend: Backend identifier that persisted the entry (e.g. "in_memory" or "vector").


Result of a context pruning operation.

Attributes: kept: List of MemoryEntry items kept (score-ordered, highest first). pruned_count: Number of entries that were removed. original_count: Number of entries that came in. token_budget: The token budget that was applied. strategy: The pruning strategy used. metadata: Optional metadata dictionary with additional pruning details.


Enum of pruning scoring strategies.

Attributes: RECENCY: Keep most recent entries by timestamp. RELEVANCE: Keep highest-relevance entries (placeholder for future embedding-based scoring). HYBRID: Weighted blend of recency and relevance (content length as proxy).


Prunes entries whose recency score falls below a threshold.

Uses an exponential decay model with a configurable half-life.

def __init__(
    half_life_hours: float = 24.0,
    threshold: float = 0.05
) -> None

Initialise the recency decay strategy.

Parameters
ParameterTypeDescription
`half_life_hours`floatTime (h) for importance to halve.
`threshold`floatEntries with recency below this are pruned.
def should_prune(entry: MemoryEntry) -> bool

Return True if entry should be pruned.

Parameters
ParameterTypeDescription
`entry`MemoryEntryEntry to evaluate.
Returns
TypeDescription
boolTrue if the entry's recency score is below threshold.
def filter(entries: list[MemoryEntry]) -> tuple[list[MemoryEntry], list[MemoryEntry]]

Split entries into (kept, pruned).

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to evaluate.
Returns
TypeDescription
tuple[list[MemoryEntry], list[MemoryEntry]]Tuple of (kept, pruned) entry lists.

Scores memory entries by recency — more recent entries get higher scores.

Uses the entry’s timestamp to produce a normalized score relative to the entire batch of entries being pruned.

def score(
    entry: MemoryEntry,
    query: str | None = None
) -> float

Score entry by recency.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to score.
`query`str | NoneOptional query (unused by RecencyScorerImpl).
Returns
TypeDescription
floatScore based on entry's timestamp (normalized in batch context).

Re-ranks a list of MemorySearchResult by a multi-factor score.

Combines the raw retrieval score with recency and importance using the weights from the originating MemoryQuery.

def rank(
    results: list[MemorySearchResult],
    query: MemoryQuery
) -> list[MemorySearchResult]

Re-rank results and return a new sorted list.

Parameters
ParameterTypeDescription
`results`list[MemorySearchResult]Raw search results to re-rank.
`query`MemoryQueryOriginal query with weighting parameters.
Returns
TypeDescription
list[MemorySearchResult]Results sorted by descending combined score.
def top_k(
    results: list[MemorySearchResult],
    query: MemoryQuery,
    k: int | None = None
) -> list[MemorySearchResult]

Re-rank and return the top k results.

Parameters
ParameterTypeDescription
`results`list[MemorySearchResult]Raw results to rank.
`query`MemoryQueryQuery parameters.
`k`int | NoneMaximum results to return. Defaults to ``query.top_k``.
Returns
TypeDescription
list[MemorySearchResult]Top-k results after re-ranking.

Configuration for semantic memory tier.

Attributes: min_confidence: Minimum confidence to store a fact. max_facts_per_entity: Hard cap on stored facts per entity.


Semantic memory backed by an in-process FactStore.
def __init__(
    fact_store: FactStore | None = None,
    extractor: EntityExtractor | None = None,
    min_confidence: float = 0.5,
    max_facts_per_entity: int = 100
) -> None
async def store_fact(
    subject: str,
    predicate: str,
    object_: str,
    confidence: float = 1.0
) -> str
async def query_facts(subject: str) -> list[dict[str, Any]]
async def get_entity_facts(entity: str) -> list[dict[str, Any]]
async def update_fact(
    fact_id: str,
    confidence: float
) -> None
async def ingest(entry: MemoryEntry) -> int
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Memory store that periodically compresses older turns into summaries.

Maintains a hot buffer of recent turns and a list of compressed summary entries. When the hot buffer exceeds compress_threshold, the oldest compress_batch entries are collapsed via summarise_fn.

def __init__(
    compress_threshold: int = 20,
    compress_batch: int = 10,
    summarise_fn: Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | None = None
) -> None

Initialise the summary store.

Parameters
ParameterTypeDescription
`compress_threshold`intNumber of hot entries that triggers compression.
`compress_batch`intNumber of oldest entries to compress per run.
`summarise_fn`Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | NoneAsync callable that reduces a batch to one entry.
async def store(entry: MemoryEntry) -> None

Add entry to the hot buffer, compressing if necessary.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to add.
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return summaries plus most recent hot entries.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters.
Returns
TypeDescription
list[MemorySearchResult]Combined results from summaries and hot buffer.
async def get_recent(n: int) -> list[MemoryEntry]

Return the n most recent entries (hot buffer only).

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Most recent entries, newest-first.
async def delete(entry_id: str) -> None

Remove entry with entry_id from hot buffer or summaries.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
async def clear() -> None

Clear all hot entries and summaries.


Distributes a total token budget across working memory sections.

The budget is split in this order:

  1. System prompt receives a fixed allocation.
  2. The remaining budget is divided among recent turns, episodic recall, semantic facts, and tool descriptions using the configured fractions.
def __init__(config: WorkingMemoryConfig | None = None) -> None

Initialise with optional config.

Parameters
ParameterTypeDescription
`config`WorkingMemoryConfig | NoneWorking memory configuration; uses defaults if None.
def allocate(total_tokens: int) -> dict[str, int]

Compute token allocations for each memory section.

Parameters
ParameterTypeDescription
`total_tokens`intTotal token budget available.
Returns
TypeDescription
dict[str, int]Mapping of section name to token allocation.
def budget_for(
    section: str,
    total_tokens: int
) -> int

Return the token budget for a single named section.

Parameters
ParameterTypeDescription
`section`strSection name (e.g. 'episodic', 'semantic').
`total_tokens`intTotal token budget.
Returns
TypeDescription
intToken count allocated to the requested section.

MemoryStoreProtocol that persists entries as vector-searchable documents.
def __init__(
    vector_store: DocumentVectorStoreProtocol,
    embed_fn: Callable[[str], Awaitable[list[float]]] | None = None,
    collection: str = 'memory',
    fallback: MemoryStoreProtocol | None = None
) -> None
async def store(entry: MemoryEntry) -> None
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
async def get_recent(n: int) -> list[MemoryEntry]
async def delete(entry_id: str) -> None
async def clear() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Token-budget allocation for working memory assembly.

Attributes: system_prompt_tokens: Fixed token allocation for system prompt. recent_turns_fraction: Fraction of remaining budget for recent turns. episodic_fraction: Fraction of remaining budget for episodic recall. semantic_fraction: Fraction of remaining budget for semantic facts. tool_descriptions_fraction: Fraction of remaining budget for tool descriptions. max_recent_turns: Hard cap on recent turns regardless of budget.


Assembles the optimal context window from all memory tiers.

Pulls recent turns from episodic memory and relevant facts from semantic memory, fitting everything within a configurable token budget.

def __init__(
    episodic: EpisodicMemoryProtocol | None = None,
    semantic: SemanticMemoryProtocol | None = None,
    config: WorkingMemoryConfig | None = None
) -> None

Initialise the working memory manager.

Parameters
ParameterTypeDescription
`episodic`EpisodicMemoryProtocol | NoneEpisodic memory source for past interactions.
`semantic`SemanticMemoryProtocol | NoneSemantic memory source for structured facts.
`config`WorkingMemoryConfig | NoneToken budget configuration.
async def assemble(
    query: str,
    token_budget: int,
    session_id: str | None = None
) -> list[MemoryEntry]

Assemble context window from all available memory tiers.

Parameters
ParameterTypeDescription
`query`strCurrent user query used to retrieve relevant memories.
`token_budget`intTotal token budget for the assembled context.
`session_id`str | NoneOptional session scope for retrieval filtering.
Returns
TypeDescription
list[MemoryEntry]Ordered list of memory entries ready for LLM context injection.
async def add(entry: MemoryEntry) -> None

Add an entry to the current working memory stream.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to add.
async def get_context_entries() -> list[MemoryEntry]

Return the entries currently assembled in working context.

Returns
TypeDescription
list[MemoryEntry]Current context entry list.
async def flush() -> None

Clear the current context assembly.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check health of underlying memory tiers.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds for the health check.
Returns
TypeDescription
HealthCheckResultHealthCheckResult aggregating episodic and semantic tier health.

Error raised during memory consolidation.

Raised when generating or storing an embedding fails.

Raised when entity/fact extraction from text fails.

Raised when a memory store is at capacity and cannot accept new entries.
def __init__(
    message: str,
    capacity: int | None = None
) -> None

Initialise with optional capacity context.

Parameters
ParameterTypeDescription
`message`strHuman-readable description.
`capacity`int | NoneMaximum capacity that was exceeded.

Raised when a backend store operation fails.
def __init__(
    message: str,
    store: str | None = None
) -> None

Initialise with optional store name context.

Parameters
ParameterTypeDescription
`message`strHuman-readable description.
`store`str | NoneName of the store that raised the error.

Base exception for memory operations.