Skip to content
GitHub

API Reference

Strategy for consolidating episodic memories into semantic memories.
consolidate
async def consolidate(entries: list[MemoryEntry]) -> list[MemoryEntry]

Indexes and retrieves memory entries by semantic similarity.
index
async def index(entry: MemoryEntry) -> str

Index a memory entry and return its ID.

search
async def search(
    query: str,
    top_k: int
) -> list[MemoryEntry]

Scores a MemoryEntry for pruning priority (higher = keep).
score
def score(
    entry: MemoryEntry,
    query: str | None = None
) -> float

Score a memory entry for retention.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to score.
`query`str | NoneOptional query context for relevance scoring.
Returns
TypeDescription
floatA score in range [0, 1] or higher, where higher means more important to keep.

Preserves high-importance entries; prunes low-importance stale ones.
__init__
def __init__(importance_threshold: float = 0.1) -> None

Initialise the importance threshold strategy.

Parameters
ParameterTypeDescription
`importance_threshold`floatEntries below this value are candidates for pruning.
should_prune
def should_prune(entry: MemoryEntry) -> bool

Return True if entry importance is below threshold.

Parameters
ParameterTypeDescription
`entry`MemoryEntryEntry to evaluate.
Returns
TypeDescription
boolTrue if importance is below threshold.
filter
def filter(entries: list[MemoryEntry]) -> tuple[list[MemoryEntry], list[MemoryEntry]]

Split entries into (kept, pruned).

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to evaluate.
Returns
TypeDescription
tuple[list[MemoryEntry], list[MemoryEntry]]Tuple of (kept, pruned) entry lists.

Fixed-capacity FIFO buffer of raw conversation turns.

The oldest entries are evicted when the buffer reaches max_entries.

__init__
def __init__(max_entries: int = 100) -> None

Initialise the buffer.

Parameters
ParameterTypeDescription
`max_entries`intMaximum number of entries to retain.
store
async def store(entry: MemoryEntry) -> None

Append entry to the buffer, evicting the oldest if full.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to store.
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return the top_k most recent entries as search results.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters (only top_k is used).
Returns
TypeDescription
list[MemorySearchResult]Most recent entries wrapped in search results with score 1.0.
get_recent
async def get_recent(n: int) -> list[MemoryEntry]

Return the n most recent entries.

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Entries ordered newest-first.
delete
async def delete(entry_id: str) -> None

Remove entry with entry_id from the buffer.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
clear
async def clear() -> None

Clear all entries from the buffer.


MemoryStoreProtocol backed by a CacheBackendProtocol (e.g. Redis).
__init__
def __init__(
    cache: CacheBackendProtocol,
    ttl: int = 86400 * 30
) -> None
store
async def store(entry: MemoryEntry) -> None
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
get_recent
async def get_recent(n: int) -> list[MemoryEntry]
delete
async def delete(entry_id: str) -> None
clear
async def clear() -> None
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Configuration for the background consolidation pipeline.

Attributes: enabled: Whether automatic consolidation is active. interval_seconds: How often to run a consolidation pass. age_threshold_hours: Minimum entry age before it can be consolidated. importance_prune_threshold: Entries below this importance are eligible for pruning. batch_size: Maximum entries processed per consolidation pass.


Runs MemoryConsolidator on a configurable interval.

Designed to be started once and cancelled on shutdown. Consolidation is only triggered when the interval elapses and entries are available.

__init__
def __init__(
    store: MemoryStoreProtocol,
    consolidator: MemoryConsolidatorProtocol,
    config: ConsolidationConfig | None = None
) -> None

Initialise the scheduler.

Parameters
ParameterTypeDescription
`store`MemoryStoreProtocolMemory store to read entries from.
`consolidator`MemoryConsolidatorProtocolConsolidator run on each cycle.
`config`ConsolidationConfig | NoneScheduling configuration.
start
async def start() -> None

Start the background consolidation loop.

stop
async def stop() -> None

Cancel the background consolidation loop.

run_once
async def run_once() -> ConsolidationResult

Execute a single consolidation pass immediately.

Returns
TypeDescription
ConsolidationResultResult of the consolidation pass.

FIFO buffer that keeps the most recent conversation turns.

Provides a simple, bounded working-memory strategy that auto-evicts the oldest entries when max_turns or max_tokens limits are hit.

Example

buffer = ConversationBuffer(max_turns=20, max_tokens=4096)
await buffer.add(entry)
context = buffer.get_context()
buffer = ConversationBuffer(max_turns=20, max_tokens=4096)
await buffer.add(entry)
context = buffer.get_context()
Parameters
ParameterTypeDescription
`max_turns`Maximum number of turns to retain.
`max_tokens`Soft token cap — oldest entries are evicted until the total estimated token count is at or below this limit. Set to 0 to disable token-based eviction.
__init__
def __init__(
    max_turns: int = 20,
    max_tokens: int = 4096
) -> None

Initialise the conversation buffer.

Parameters
ParameterTypeDescription
`max_turns`intMaximum number of turns to retain.
`max_tokens`intSoft token cap (0 = no token limit).
add
async def add(entry: MemoryEntry) -> None

Add a memory entry to the buffer.

If adding the entry would exceed max_turns, the oldest entry is automatically evicted. After insertion, token-based eviction is applied if max_tokens > 0.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to add.
get_context
def get_context() -> list[MemoryEntry]

Return all entries currently in the buffer, oldest-first.

Returns
TypeDescription
list[MemoryEntry]Ordered list of memory entries.
clear
def clear() -> None

Remove all entries from the buffer.

size
property size() -> int

Number of entries currently in the buffer.

total_tokens
property total_tokens() -> int

Estimated total token count across all buffered entries.


Stores complete conversation history partitioned by session_id.

Each session’s turns are appended in order. Retrieval returns the most recent turns for the requested session (filter via metadata).

__init__
def __init__(max_turns_per_session: int = 1000) -> None

Initialise the conversation store.

Parameters
ParameterTypeDescription
`max_turns_per_session`intMaximum turns kept per session.
store
async def store(entry: MemoryEntry) -> None

Persist entry under its session (from metadata).

The session is derived from entry.metadata["session_id"] when present, otherwise filed under a "_default" session.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to store.
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return entries for the session specified in query.filters.

Falls back to all entries if no session_id filter is given.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters including optional ``session_id`` filter.
Returns
TypeDescription
list[MemorySearchResult]Most recent entries for the matching session.
get_recent
async def get_recent(n: int) -> list[MemoryEntry]

Return the n globally most recent entries.

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Entries ordered newest-first.
delete
async def delete(entry_id: str) -> None

Remove entry from all sessions.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
clear
async def clear() -> None

Clear all sessions and entries.

get_session_entries
async def get_session_entries(session_id: str) -> list[MemoryEntry]

Return all entries for a given session in chronological order.

Parameters
ParameterTypeDescription
`session_id`strSession identifier.
Returns
TypeDescription
list[MemoryEntry]Entries in insertion order.

MemoryStoreProtocol backed by an SQL database.
__init__
def __init__(provider: DatabaseProviderProtocol) -> None
store
async def store(entry: MemoryEntry) -> None
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
get_recent
async def get_recent(n: int) -> list[MemoryEntry]
delete
async def delete(entry_id: str) -> None
clear
async def clear() -> None
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Removes near-duplicate entries based on content similarity.

Two entries are duplicates if their lowercased content shares more than similarity_threshold characters of the shorter one (Jaccard-like).

__init__
def __init__(similarity_threshold: float = 0.85) -> None

Initialise the deduplication strategy.

Parameters
ParameterTypeDescription
`similarity_threshold`floatJaccard-like overlap above which entries are considered duplicates.
deduplicate
def deduplicate(entries: list[MemoryEntry]) -> tuple[list[MemoryEntry], list[MemoryEntry]]

Return (unique, duplicates).

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to deduplicate.
Returns
TypeDescription
tuple[list[MemoryEntry], list[MemoryEntry]]Tuple of (unique, duplicate) entry lists.

Prunes MemoryEntry lists to fit within a token budget.

Uses pluggable scoring strategies to rank entries by importance, then greedily selects entries until the token budget is exhausted.

Attributes: token_counter: Protocol for counting tokens in text. default_strategy: Default pruning strategy when none is specified.

__init__
def __init__(
    token_counter: TokenCounterProtocol,
    default_strategy: PruningStrategy = PruningStrategy.HYBRID
) -> None

Initialize the pruner.

Parameters
ParameterTypeDescription
`token_counter`TokenCounterProtocolImplementation of TokenCounterProtocol for token counting.
`default_strategy`PruningStrategyDefault strategy to use if not overridden. Default is HYBRID.
prune
async def prune(
    entries: list[MemoryEntry],
    token_budget: int,
    query: str | None = None,
    strategy: PruningStrategy | None = None,
    **kwargs
) -> PruningResult

Prune a list of memory entries to fit within a token budget.

Scores all entries using the specified strategy, sorts them by score (descending), then greedily keeps entries until adding the next entry would exceed the remaining budget.

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]List of MemoryEntry objects to prune.
`token_budget`intMaximum number of tokens to keep.
`query`str | NoneOptional query context for relevance-based scoring.
`strategy`PruningStrategy | NoneOverride the default pruning strategy. If None, uses default_strategy. **kwargs: Additional keyword arguments (reserved for future use).
Returns
TypeDescription
PruningResultPruningResult containing kept entries (score-ordered), counts, and metadata about the pruning operation.

Extracts structured subject/predicate/object triples from memory entries.

When an LLM extract callable is injected, delegation occurs there. Otherwise a lightweight heuristic fallback is used, suitable for testing and environments without LLM access.

__init__
def __init__(extract_fn: Callable[[str], Awaitable[list[Triple]]] | None = None) -> None

Initialise the extractor.

Parameters
ParameterTypeDescription
`extract_fn`Callable[[str], Awaitable[list[Triple]]] | NoneAsync callable that returns triples from raw text. When *None*, a heuristic fallback is used.
extract
async def extract(entry: MemoryEntry) -> list[Triple]

Extract triples from entry.content.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry whose content to parse.
Returns
TypeDescription
list[Triple]List of (subject, predicate, object_) tuples.
Raises
ExceptionDescription
FactExtractionErrorIf the extraction callable raises.

Indexes memory entries by the entities they mention.

Entries are stored normally but also indexed by lowercase entity token so that entity-scoped retrieval is O(1) per entity.

__init__
def __init__() -> None

Initialise the entity store.

store
async def store(
    entry: MemoryEntry,
    entities: list[str] | None = None
) -> None

Store entry and index it under each entity in entities.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to persist.
`entities`list[str] | NoneEntity names to associate with this entry.
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return entries scored by importance.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters.
Returns
TypeDescription
list[MemorySearchResult]Top-k entries ranked by importance.
get_recent
async def get_recent(n: int) -> list[MemoryEntry]

Return the n most recent entries.

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Entries ordered newest-first.
delete
async def delete(entry_id: str) -> None

Remove entry and its entity index entries.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
clear
async def clear() -> None

Remove all entries and entity index data.

get_by_entity
async def get_by_entity(entity: str) -> list[MemoryEntry]

Return all entries mentioning entity.

Parameters
ParameterTypeDescription
`entity`strEntity name to search.
Returns
TypeDescription
list[MemoryEntry]Entries referencing the entity, newest-first.

Compresses episodic memory entries by LLM-assisted summarisation.

When the episodic store grows beyond a threshold, older entries are grouped by session and collapsed into a single summary entry.

__init__
def __init__(summarise_fn: Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | None = None) -> None

Initialise the compressor.

Parameters
ParameterTypeDescription
`summarise_fn`Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | NoneAsync callable that accepts a list of entries and returns a single summary entry. When *None*, a simple concatenation fallback is used.
compress
async def compress(
    entries: list[MemoryEntry],
    *,
    max_tokens: int = 200
) -> MemoryEntry

Compress entries into a single condensed memory entry.

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Chronologically ordered entries to compress.
`max_tokens`intHint to the summariser for output length.
Returns
TypeDescription
MemoryEntryA new MemoryEntry representing the compressed form.
Raises
ExceptionDescription
ConsolidationErrorIf the summarisation callable raises.

Configuration for episodic memory tier.

Attributes: default_top_k: Default number of episodes to retrieve. recency_weight: Weight applied to temporal recency during scoring. importance_weight: Weight applied to entry importance during scoring. relevance_weight: Weight applied to semantic similarity during scoring. ttl_seconds: Time-to-live for entries (0 = never expire).


Episodic memory layer backed by a pluggable MemoryStoreProtocol.
__init__
def __init__(backend: MemoryStoreProtocol) -> None
record
async def record(entry: MemoryEntry) -> None
recall
async def recall(query: MemoryQuery) -> list[MemorySearchResult]
forget
async def forget(entry_id: str) -> None
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

In-memory graph of subject/predicate/object facts.

Used by SemanticMemoryStore to persist extracted knowledge.

__init__
def __init__() -> None

Initialise an empty fact store.

add
def add(
    subject: str,
    predicate: str,
    object_: str,
    confidence: float = 1.0,
    metadata: dict | None = None
) -> str

Add a new fact triple.

Parameters
ParameterTypeDescription
`subject`strSubject entity.
`predicate`strRelationship type.
`object_`strObject value.
`confidence`floatConfidence score in [0.0, 1.0].
`metadata`dict | NoneOptional additional metadata.
Returns
TypeDescription
strUnique ID assigned to the stored fact.
query_by_subject
def query_by_subject(subject: str) -> list[dict]

Return all facts where subject matches (case-insensitive prefix).

Parameters
ParameterTypeDescription
`subject`strSubject to filter by.
Returns
TypeDescription
list[dict]List of fact dicts (id, subject, predicate, object_, confidence).
get_entity_facts
def get_entity_facts(entity: str) -> list[dict]

Return all facts mentioning entity as subject or object.

Parameters
ParameterTypeDescription
`entity`strEntity name to search.
Returns
TypeDescription
list[dict]List of matching fact dicts.
update_confidence
def update_confidence(
    fact_id: str,
    confidence: float
) -> None

Update the confidence of an existing fact.

Parameters
ParameterTypeDescription
`fact_id`strID of the fact to update.
`confidence`floatNew confidence value in [0.0, 1.0].
delete
def delete(fact_id: str) -> None

Remove a fact.

Parameters
ParameterTypeDescription
`fact_id`strID of the fact to remove.
clear
def clear() -> None

Remove all facts.


Weighted blend of recency and content length as a proxy for relevance.

Content length serves as a simple heuristic for information density: longer entries are assumed to contain more contextual information.

Attributes: recency_weight: Weight for the recency component (default 0.6). relevance_weight: Weight for the content length component (default 0.4).

__init__
def __init__(
    recency_weight: float = 0.6,
    relevance_weight: float = 0.4
) -> None

Initialize the hybrid scorer.

Parameters
ParameterTypeDescription
`recency_weight`floatWeight for recency in blended score. Default 0.6.
`relevance_weight`floatWeight for content length (relevance proxy). Default 0.4.
score_batch
def score_batch(
    entries: list,
    query: str | None = None
) -> list[float]

Score all entries together, normalizing recency to [0, 1].

Parameters
ParameterTypeDescription
`entries`listList of memory entries to score.
`query`str | NoneOptional query context (unused by HybridScorerImpl).
Returns
TypeDescription
list[float]List of scores parallel to the input entries list.
score
def score(
    entry: MemoryEntry,
    query: str | None = None
) -> float

Score a single entry (recency not normalized — use score_batch for batches).

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to score.
`query`str | NoneOptional query (unused by HybridScorerImpl).
Returns
TypeDescription
floatWeighted score of content length (recency omitted without batch context).

MemoryStoreProtocol backed by an in-process dictionary.
__init__
def __init__() -> None
store
async def store(entry: MemoryEntry) -> None
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
get_recent
async def get_recent(n: int) -> list[MemoryEntry]
delete
async def delete(entry_id: str) -> None
clear
async def clear() -> None
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Top-level configuration for lexigram-ai-memory.

Attributes: working: Working memory token budget configuration. episodic: Episodic memory retrieval configuration. semantic: Semantic memory fact storage configuration. consolidation: Background consolidation pipeline configuration. default_backend: Backend type to use (‘in_memory’, ‘cache’, ‘database’, ‘vector’). ttl_seconds: Default entry TTL in seconds (0 = never expire).


Payload fired after a memory consolidation pass completes.

Attributes: strategy: Name of the consolidation strategy that ran (e.g. "recency_decay" or "deduplication").


Orchestrates consolidation of a batch of MemoryEntry objects.

Applies deduplication, recency decay pruning, and importance-floor pruning in sequence. Optionally runs a summarisation pass on the remaining aged entries.

__init__
def __init__(
    config: ConsolidationConfig | None = None,
    summarise_fn: Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | None = None
) -> None

Initialise the consolidator.

Parameters
ParameterTypeDescription
`config`ConsolidationConfig | NoneConsolidation thresholds. Defaults to ``ConsolidationConfig()``.
`summarise_fn`Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | NoneOptional async callable for summarising aged entry groups.
consolidate
async def consolidate(entries: list[MemoryEntry]) -> ConsolidationResult

Consolidate entries via deduplication, decay, and importance pruning.

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to process.
Returns
TypeDescription
ConsolidationResultConsolidationResult with counts of processed, consolidated, pruned, and extracted entities.
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report consolidator health.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds for the health check.
Returns
TypeDescription
HealthCheckResultHealthCheckResult indicating HEALTHY status.

Three-tier AI memory: working, episodic, and semantic.

Call configure to register MemoryProvider and expose all memory protocol contracts for injection.

Usage

from lexigram.ai.memory.config import MemoryConfig
from lexigram.ai.memory import MemoryModule
@module(
imports=[MemoryModule.configure(MemoryConfig(...))]
)
class AppModule(Module):
pass
from lexigram.ai.memory.config import MemoryConfig
from lexigram.ai.memory import MemoryModule
@module(
imports=[MemoryModule.configure(MemoryConfig(...))]
)
class AppModule(Module):
pass
configure
def configure(
    cls,
    config: MemoryConfig | None = None,
    enable_consolidation: bool = True
) -> DynamicModule

Create a MemoryModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`MemoryConfig | NoneMemoryConfig or ``None`` for defaults (in-memory backend, standard budget fractions).
`enable_consolidation`boolRegister the ConsolidationScheduler which periodically promotes episodic memories into semantic storage. Defaults to ``True``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
stub
def stub(
    cls,
    config: MemoryConfig | None = None
) -> DynamicModule

Create a MemoryModule suitable for unit and integration testing.

Uses in-memory backends with minimal side effects. Consolidation scheduling is disabled by default to avoid background timer tasks during tests.

Parameters
ParameterTypeDescription
`config`MemoryConfig | NoneOptional MemoryConfig override. Uses safe test defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Registers all memory services in the DI container.

Wires up the MemoryStoreProtocol, EpisodicMemoryProtocol, SemanticMemoryProtocol, WorkingMemoryProtocol, and consolidation services. The default backend is InMemoryMemoryBackend; callers with persistent needs should register a MemoryStoreProtocol override after this provider runs.

__init__
def __init__(
    config: MemoryConfig | None = None,
    enable_consolidation: bool = True
) -> None

Initialise the provider.

Parameters
ParameterTypeDescription
`config`MemoryConfig | NoneMemory configuration. Defaults to ``MemoryConfig()``.
`enable_consolidation`boolStart the consolidation scheduler during boot. When ``False``, the scheduler is never started regardless of ``config.consolidation.enabled``.
register
async def register(container: ContainerRegistrarProtocol) -> None

Register memory services.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolDI container registrar.
boot
async def boot(container: ContainerResolverProtocol) -> None

Start the consolidation scheduler if enabled.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolResolved DI container.
shutdown
async def shutdown() -> None

Stop the consolidation scheduler.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check — always healthy (in-process domain provider).

No external backend to ping.

Parameters
ParameterTypeDescription
`timeout`floatIgnored for in-process providers.
Returns
TypeDescription
HealthCheckResultAlways HEALTHY — no external backend to ping.

Emitted when a memory query returns results.

Consumed by: AI audit trails, context relevance analytics.


Payload fired when entries are retrieved from a memory store.

Attributes: tier: Memory tier that was queried. result_count: Number of entries returned by the retrieval.


Queries one or more MemoryStoreProtocol backends and merges results.

Results from each backend are pooled, deduplicated by entry ID, and re-ranked by the configured RelevanceRanker. Retrieval counts are tracked per entry to enable downstream analytics and relevance decay.

__init__
def __init__(
    sources: list[MemoryStoreProtocol],
    ranker: RelevanceRanker | None = None
) -> None

Initialise the retriever.

Parameters
ParameterTypeDescription
`sources`list[MemoryStoreProtocol]Memory store backends to query in parallel.
`ranker`RelevanceRanker | NoneOptional ranker. Defaults to a new ``RelevanceRanker``.
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Retrieve and merge results from all configured sources.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch query with weights and filters.
Returns
TypeDescription
list[MemorySearchResult]De-duplicated, re-ranked results capped at ``query.top_k``.
add_source
def add_source(source: MemoryStoreProtocol) -> None

Register a new backend source.

Parameters
ParameterTypeDescription
`source`MemoryStoreProtocolAdditional MemoryStoreProtocol to query.
get_retrieval_count
def get_retrieval_count(entry_id: str) -> int

Return how many times a specific entry has been retrieved.

Parameters
ParameterTypeDescription
`entry_id`strThe memory entry ID.
Returns
TypeDescription
intNumber of times the entry appeared in retrieval results.
get_retrieval_stats
def get_retrieval_stats() -> dict[str, Any]

Return aggregate retrieval statistics.

Returns
TypeDescription
dict[str, Any]Dictionary with total retrievals, unique entries, and top-10 most-retrieved entries.
reset_stats
def reset_stats() -> None

Clear all retrieval tracking data.


Emitted when a memory entry is persisted to the memory store.

Consumed by: AI audit trails, context management, safety review.


Payload fired when an entry is written to a memory store.

Attributes: tier: Memory tier that received the write (e.g. "working", "episodic", or "semantic"). backend: Backend identifier that persisted the entry (e.g. "in_memory" or "vector").


Result of a context pruning operation.

Attributes: kept: List of MemoryEntry items kept (score-ordered, highest first). pruned_count: Number of entries that were removed. original_count: Number of entries that came in. token_budget: The token budget that was applied. strategy: The pruning strategy used. metadata: Optional metadata dictionary with additional pruning details.


Enum of pruning scoring strategies.

Attributes: RECENCY: Keep most recent entries by timestamp. RELEVANCE: Keep highest-relevance entries (placeholder for future embedding-based scoring). HYBRID: Weighted blend of recency and relevance (content length as proxy).


Prunes entries whose recency score falls below a threshold.

Uses an exponential decay model with a configurable half-life.

__init__
def __init__(
    half_life_hours: float = 24.0,
    threshold: float = 0.05
) -> None

Initialise the recency decay strategy.

Parameters
ParameterTypeDescription
`half_life_hours`floatTime (h) for importance to halve.
`threshold`floatEntries with recency below this are pruned.
should_prune
def should_prune(entry: MemoryEntry) -> bool

Return True if entry should be pruned.

Parameters
ParameterTypeDescription
`entry`MemoryEntryEntry to evaluate.
Returns
TypeDescription
boolTrue if the entry's recency score is below threshold.
filter
def filter(entries: list[MemoryEntry]) -> tuple[list[MemoryEntry], list[MemoryEntry]]

Split entries into (kept, pruned).

Parameters
ParameterTypeDescription
`entries`list[MemoryEntry]Entries to evaluate.
Returns
TypeDescription
tuple[list[MemoryEntry], list[MemoryEntry]]Tuple of (kept, pruned) entry lists.

Scores memory entries by recency — more recent entries get higher scores.

Uses the entry’s timestamp to produce a normalized score relative to the entire batch of entries being pruned.

score
def score(
    entry: MemoryEntry,
    query: str | None = None
) -> float

Score entry by recency.

Parameters
ParameterTypeDescription
`entry`MemoryEntryThe memory entry to score.
`query`str | NoneOptional query (unused by RecencyScorerImpl).
Returns
TypeDescription
floatScore based on entry's timestamp (normalized in batch context).

Re-ranks a list of MemorySearchResult by a multi-factor score.

Combines the raw retrieval score with recency and importance using the weights from the originating MemoryQuery.

rank
def rank(
    results: list[MemorySearchResult],
    query: MemoryQuery
) -> list[MemorySearchResult]

Re-rank results and return a new sorted list.

Parameters
ParameterTypeDescription
`results`list[MemorySearchResult]Raw search results to re-rank.
`query`MemoryQueryOriginal query with weighting parameters.
Returns
TypeDescription
list[MemorySearchResult]Results sorted by descending combined score.
top_k
def top_k(
    results: list[MemorySearchResult],
    query: MemoryQuery,
    k: int | None = None
) -> list[MemorySearchResult]

Re-rank and return the top k results.

Parameters
ParameterTypeDescription
`results`list[MemorySearchResult]Raw results to rank.
`query`MemoryQueryQuery parameters.
`k`int | NoneMaximum results to return. Defaults to ``query.top_k``.
Returns
TypeDescription
list[MemorySearchResult]Top-k results after re-ranking.

Configuration for semantic memory tier.

Attributes: min_confidence: Minimum confidence to store a fact. max_facts_per_entity: Hard cap on stored facts per entity.


Semantic memory backed by an in-process FactStore.
__init__
def __init__(
    fact_store: FactStore | None = None,
    extractor: EntityExtractor | None = None,
    min_confidence: float = 0.5,
    max_facts_per_entity: int = 100
) -> None
store_fact
async def store_fact(
    subject: str,
    predicate: str,
    object_: str,
    confidence: float = 1.0
) -> str
query_facts
async def query_facts(subject: str) -> list[dict[str, Any]]
get_entity_facts
async def get_entity_facts(entity: str) -> list[dict[str, Any]]
update_fact
async def update_fact(
    fact_id: str,
    confidence: float
) -> None
ingest
async def ingest(entry: MemoryEntry) -> int
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Memory store that periodically compresses older turns into summaries.

Maintains a hot buffer of recent turns and a list of compressed summary entries. When the hot buffer exceeds compress_threshold, the oldest compress_batch entries are collapsed via summarise_fn.

__init__
def __init__(
    compress_threshold: int = 20,
    compress_batch: int = 10,
    summarise_fn: Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | None = None
) -> None

Initialise the summary store.

Parameters
ParameterTypeDescription
`compress_threshold`intNumber of hot entries that triggers compression.
`compress_batch`intNumber of oldest entries to compress per run.
`summarise_fn`Callable[[list[MemoryEntry]], Awaitable[MemoryEntry]] | NoneAsync callable that reduces a batch to one entry.
store
async def store(entry: MemoryEntry) -> None

Add entry to the hot buffer, compressing if necessary.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to add.
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]

Return summaries plus most recent hot entries.

Parameters
ParameterTypeDescription
`query`MemoryQuerySearch parameters.
Returns
TypeDescription
list[MemorySearchResult]Combined results from summaries and hot buffer.
get_recent
async def get_recent(n: int) -> list[MemoryEntry]

Return the n most recent entries (hot buffer only).

Parameters
ParameterTypeDescription
`n`intMaximum entries to return.
Returns
TypeDescription
list[MemoryEntry]Most recent entries, newest-first.
delete
async def delete(entry_id: str) -> None

Remove entry with entry_id from hot buffer or summaries.

Parameters
ParameterTypeDescription
`entry_id`strID of the entry to remove.
clear
async def clear() -> None

Clear all hot entries and summaries.


Distributes a total token budget across working memory sections.

The budget is split in this order:

  1. System prompt receives a fixed allocation.
  2. The remaining budget is divided among recent turns, episodic recall, semantic facts, and tool descriptions using the configured fractions.
__init__
def __init__(config: WorkingMemoryConfig | None = None) -> None

Initialise with optional config.

Parameters
ParameterTypeDescription
`config`WorkingMemoryConfig | NoneWorking memory configuration; uses defaults if None.
allocate
def allocate(total_tokens: int) -> dict[str, int]

Compute token allocations for each memory section.

Parameters
ParameterTypeDescription
`total_tokens`intTotal token budget available.
Returns
TypeDescription
dict[str, int]Mapping of section name to token allocation.
budget_for
def budget_for(
    section: str,
    total_tokens: int
) -> int

Return the token budget for a single named section.

Parameters
ParameterTypeDescription
`section`strSection name (e.g. 'episodic', 'semantic').
`total_tokens`intTotal token budget.
Returns
TypeDescription
intToken count allocated to the requested section.

MemoryStoreProtocol that persists entries as vector-searchable documents.
__init__
def __init__(
    vector_store: DocumentVectorStoreProtocol,
    embed_fn: Callable[[str], Awaitable[list[float]]] | None = None,
    collection: str = 'memory',
    fallback: MemoryStoreProtocol | None = None
) -> None
store
async def store(entry: MemoryEntry) -> None
retrieve
async def retrieve(query: MemoryQuery) -> list[MemorySearchResult]
get_recent
async def get_recent(n: int) -> list[MemoryEntry]
delete
async def delete(entry_id: str) -> None
clear
async def clear() -> None
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Token-budget allocation for working memory assembly.

Attributes: system_prompt_tokens: Fixed token allocation for system prompt. recent_turns_fraction: Fraction of remaining budget for recent turns. episodic_fraction: Fraction of remaining budget for episodic recall. semantic_fraction: Fraction of remaining budget for semantic facts. tool_descriptions_fraction: Fraction of remaining budget for tool descriptions. max_recent_turns: Hard cap on recent turns regardless of budget.


Assembles the optimal context window from all memory tiers.

Pulls recent turns from episodic memory and relevant facts from semantic memory, fitting everything within a configurable token budget.

__init__
def __init__(
    episodic: EpisodicMemoryProtocol | None = None,
    semantic: SemanticMemoryProtocol | None = None,
    config: WorkingMemoryConfig | None = None
) -> None

Initialise the working memory manager.

Parameters
ParameterTypeDescription
`episodic`EpisodicMemoryProtocol | NoneEpisodic memory source for past interactions.
`semantic`SemanticMemoryProtocol | NoneSemantic memory source for structured facts.
`config`WorkingMemoryConfig | NoneToken budget configuration.
assemble
async def assemble(
    query: str,
    token_budget: int,
    session_id: str | None = None
) -> list[MemoryEntry]

Assemble context window from all available memory tiers.

Parameters
ParameterTypeDescription
`query`strCurrent user query used to retrieve relevant memories.
`token_budget`intTotal token budget for the assembled context.
`session_id`str | NoneOptional session scope for retrieval filtering.
Returns
TypeDescription
list[MemoryEntry]Ordered list of memory entries ready for LLM context injection.
add
async def add(entry: MemoryEntry) -> None

Add an entry to the current working memory stream.

Parameters
ParameterTypeDescription
`entry`MemoryEntryMemory entry to add.
get_context_entries
async def get_context_entries() -> list[MemoryEntry]

Return the entries currently assembled in working context.

Returns
TypeDescription
list[MemoryEntry]Current context entry list.
flush
async def flush() -> None

Clear the current context assembly.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check health of underlying memory tiers.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds for the health check.
Returns
TypeDescription
HealthCheckResultHealthCheckResult aggregating episodic and semantic tier health.

Error raised during memory consolidation.

Raised when generating or storing an embedding fails.

Raised when entity/fact extraction from text fails.

Raised when a memory store is at capacity and cannot accept new entries.
__init__
def __init__(
    message: str,
    capacity: int | None = None
) -> None

Initialise with optional capacity context.

Parameters
ParameterTypeDescription
`message`strHuman-readable description.
`capacity`int | NoneMaximum capacity that was exceeded.

Raised when a backend store operation fails.
__init__
def __init__(
    message: str,
    store: str | None = None
) -> None

Initialise with optional store name context.

Parameters
ParameterTypeDescription
`message`strHuman-readable description.
`store`str | NoneName of the store that raised the error.

Base exception for memory operations.