Skip to content
GitHubDiscord

API Reference

BM25-based keyword retrieval.

Example

retriever = BM25Retriever() docs = [ … Document(id=“1”, text=“Python programming”), … Document(id=“2”, text=“Java programming”) … ] retriever.add_documents(docs) results = retriever.search(“python”, k=5)

def __init__(
    tokenizer: Tokenizer | None = None,
    k1: float = 1.5,
    b: float = 0.75
)

Initialize BM25 retriever.

Parameters
ParameterTypeDescription
`tokenizer`Tokenizer | NoneText tokenizer (default: SimpleTokenizer)
`k1`floatBM25 k1 parameter
`b`floatBM25 b parameter
def add_documents(documents: list[Document]) -> None

Add documents to the index.

Parameters
ParameterTypeDescription
`documents`list[Document]Documents to add
def search(
    query: str,
    k: int = 10,
    min_score: float = 0.0
) -> list[SearchResult]

Search for documents using BM25.

Parameters
ParameterTypeDescription
`query`strSearch query
`k`intNumber of results to return
`min_score`floatMinimum score threshold
Returns
TypeDescription
list[SearchResult]List of search results
def clear() -> None

Clear all documents.


Common logic for all vector collection implementations.
def __init__(
    name: str,
    dimension: int,
    distance_metric: DistanceMetric
)
property name() -> str
property dimension() -> int
property distance_metric() -> DistanceMetric

Common logic for all vector store implementations.
async def connect() -> None

Establish connection.

async def disconnect() -> None

Close connection.


ChromaDB vector store backend.

Supports two operating modes:

  • In-process (use_http_client=False): Uses EphemeralClient for fast in-memory storage. Ideal for testing and local development.
  • HTTP (use_http_client=True): Connects to a running ChromaDB server via HttpClient.

Usage

store = ChromaStore(config)
await store.connect()
col = await store.get_collection("my-collection")
await col.upsert([VectorRecord(id="1", vector=[0.1, 0.2], metadata={})])
results = await col.search(SearchQuery(vector=[0.1, 0.2], top_k=5))
await store.disconnect()
def __init__(config: ChromaConfig) -> None
async def connect() -> None

Initialize the ChromaDB client.

async def disconnect() -> None

Release the ChromaDB client.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check ChromaDB connectivity.

async def list_collections() -> list[CollectionInfo]

List all ChromaDB collections.

async def create_collection(config: CollectionConfig) -> None

Create a new ChromaDB collection.

async def delete_collection(name: str) -> None

Delete a ChromaDB collection.

async def collection_exists(name: str) -> bool

Check if a collection exists.

async def get_collection(name: str) -> ChromaCollection

Get a handle to an existing collection.

async def get_or_create_collection(config: CollectionConfig) -> ChromaCollection

Get or create a collection.


Cross-encoder pattern for reranking.

This class provides the structure and interface for cross-encoder reranking but doesn’t include actual model loading (reserved for enterprise).

In the community edition, this serves as a protocol/pattern that can be extended with actual models in the enterprise edition.

Example

reranker = CrossEncoderReranker()

reranker = CrossEncoderReranker(model=“cross-encoder/ms-marco-MiniLM-L-6-v2”)

Section titled “reranker = CrossEncoderReranker(model=“cross-encoder/ms-marco-MiniLM-L-6-v2”)”
def __init__(
    model: str | None = None,
    batch_size: int = 32
)

Initialize cross-encoder reranker.

Parameters
ParameterTypeDescription
`model`str | NoneModel name/path (enterprise only)
`batch_size`intBatch size for inference
async def rerank(
    query: str,
    results: list[SearchResult],
    top_k: int | None = None
) -> list[SearchResult]

Rerank using cross-encoder model.

Parameters
ParameterTypeDescription
`query`strSearch query
`results`list[SearchResult]Initial results
`top_k`int | NoneNumber of results to return
Returns
TypeDescription
list[SearchResult]Reranked results
Raises
ExceptionDescription
NotImplementedErrorIf not implemented

Diversity-based reranking.

Reranks results to maximize diversity while maintaining relevance. Uses Maximal Marginal Relevance (MMR) approach.

Example

reranker = DiversityReranker(lambda_param=0.7) results = await reranker.rerank(“python”, search_results)

def __init__(
    lambda_param: float = 0.7,
    similarity_threshold: float = 0.8
)

Initialize diversity reranker.

Parameters
ParameterTypeDescription
`lambda_param`floatTrade-off between relevance and diversity (0-1) 1.0 = pure relevance, 0.0 = pure diversity
`similarity_threshold`floatSimilarity threshold for considering docs similar
async def rerank(
    query: str,
    results: list[SearchResult],
    top_k: int | None = None
) -> list[SearchResult]

Rerank results using MMR for diversity.

Parameters
ParameterTypeDescription
`query`strSearch query
`results`list[SearchResult]Initial results
`top_k`int | NoneNumber of results to return
Returns
TypeDescription
list[SearchResult]Diverse reranked results

Adapts PgVectorStore + EmbeddingClientProtocol into DocumentVectorStoreProtocol.

This adapter is the bridge between the AI-layer contract (DocumentVectorStoreProtocol) and the infrastructure layer (PgVectorStore + PgVectorCollection).

Responsibilities:

  • Call the embedding client to convert text → vector before upsert / search.
  • Map Document objects to VectorRecord objects for storage.
  • Map SearchResult rows back to RAGSearchResult for consumers.
  • Honour Document.embedding when pre-computed embeddings are provided (skips the embedding API call for those documents).

Lifecycle

adapter = DocumentVectorStoreAdapter(
store=pgvector_store,
embedding_client=embedding_client,
collection_name="pet_knowledge",
dimension=768,
)
await adapter.connect() # creates collection if absent
...
await adapter.close() # closes embedding client session

Example

docs = [Document(text="dog limping after run", metadata={"species": "dog"})]
result = await adapter.add(docs)
if result.is_ok():
ids = result.unwrap()
search_result = await adapter.search_text("cruciate ligament", top_k=5)
if search_result.is_ok():
for hit in search_result.unwrap():
logger.info("search_result", score=hit.score, text=hit.document.text)
def __init__(
    store: PgVectorStore,
    embedding_client: OpenAICompatibleEmbeddingClient,
    collection_name: str,
    dimension: int,
    distance_metric: DistanceMetric = DistanceMetric.COSINE
) -> None

Initialise the adapter.

Parameters
ParameterTypeDescription
`store`PgVectorStoreConnected ``PgVectorStore`` (``connect()`` already called).
`embedding_client`OpenAICompatibleEmbeddingClientEmbedding client for text → vector conversion.
`collection_name`strName of the pgvector table / collection to use.
`dimension`intVector dimension — must match the embedding model output.
`distance_metric`DistanceMetricSimilarity metric for search (default: cosine).
async def connect() -> None

Ensure the vector collection (table) exists and cache the handle.

Safe to call multiple times — uses CREATE TABLE IF NOT EXISTS under the hood via PgVectorStore.create_collection().

async def close() -> None

Release the embedding client’s HTTP session.

async def add(documents: list[DocumentProtocol]) -> Result[list[str], VectorError]

Embed and store documents in the vector collection.

Documents with a pre-computed .embedding field skip the API call. Documents without an id receive a generated UUID.

Parameters
ParameterTypeDescription
`documents`list[DocumentProtocol]Documents to embed and store.
Returns
TypeDescription
Result[list[str], VectorError]``Ok(list[str])`` with the stored document IDs, or ``Err(VectorUpsertError)`` on failure.
async def batch_upsert(
    documents: list[DocumentProtocol],
    batch_size: int = 100
) -> Result[int, VectorError]

Upsert documents in batches to avoid overwhelming the embedding API.

Parameters
ParameterTypeDescription
`documents`list[DocumentProtocol]Documents to embed and store.
`batch_size`intDocuments per batch.
Returns
TypeDescription
Result[int, VectorError]``Ok(int)`` total count upserted, or ``Err(VectorUpsertError)`` on failure.
async def search(
    query: list[float],
    *,
    top_k: int = 10,
    filters: MetadataFilter | None = None,
    score_threshold: float | None = None
) -> Result[list[SearchResultProtocol], VectorError]

Search using a pre-computed query vector.

Parameters
ParameterTypeDescription
`query`list[float]Query embedding vector.
`top_k`intMaximum results to return.
`filters`MetadataFilter | NoneOptional metadata filter (``MetadataFilter`` from contracts).
`score_threshold`float | NoneMinimum similarity score (0–1 for cosine).
Returns
TypeDescription
Result[list[SearchResultProtocol], VectorError]``Ok(list[RAGSearchResult])`` or ``Err(VectorSearchError)``.
async def search_text(
    query: str,
    *,
    top_k: int = 10,
    filters: MetadataFilter | None = None
) -> Result[list[SearchResultProtocol], VectorError]

Embed query text then perform similarity search.

Parameters
ParameterTypeDescription
`query`strNatural language query string.
`top_k`intMaximum results to return.
`filters`MetadataFilter | NoneOptional metadata filter.
Returns
TypeDescription
Result[list[SearchResultProtocol], VectorError]``Ok(list[RAGSearchResult])`` or ``Err(VectorSearchError)``.
async def delete(ids: list[str]) -> Result[int, VectorError]

Delete documents by ID.

Parameters
ParameterTypeDescription
`ids`list[str]Document IDs to remove.
Returns
TypeDescription
Result[int, VectorError]``Ok(int)`` count deleted, or ``Err(VectorError)`` on failure.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check both the vector store and embedding client health.

Parameters
ParameterTypeDescription
`timeout`floatTimeout for each check.
Returns
TypeDescription
HealthCheckResultAggregated ``HealthCheckResult``.

An embedding vector.

Cache layer for embedding operations.

Caches embedding vectors to avoid recomputing expensive operations. Integrates with lexigram-cache for distributed caching support.

Example

from lexigram.vector.embedding.cache import EmbeddingCache

cache = EmbeddingCache(cache_service=cache_service)

embedding = await cache.get(“My text to embed”) if embedding is None: … # Cache miss - compute embedding … embedding = await embedding_model.embed(“My text to embed”) … await cache.set(“My text to embed”, embedding)

def __init__(
    cache_service: CacheBackendProtocol | None,
    ttl: int = 86400,
    key_prefix: str = 'embedding:',
    enabled: bool = True
)

Initialize embedding cache.

Parameters
ParameterTypeDescription
`cache_service`CacheBackendProtocol | NoneCache backend implementing CacheBackendProtocol protocol
`ttl`intTime-to-live for cache entries in seconds
`key_prefix`strPrefix for cache keys
`enabled`boolWhether caching is enabled
async def get(
    text: str,
    model: str | None = None
) -> Embedding | None

Get embedding from cache.

Parameters
ParameterTypeDescription
`text`strText to get embedding for
`model`str | NoneOptional model name
Returns
TypeDescription
Embedding | NoneCached embedding vector or None if not found
async def set(
    text: str,
    embedding: Embedding | list[float],
    model: str | None = None,
    ttl: int | None = None
) -> bool

Set embedding in cache.

Parameters
ParameterTypeDescription
`text`strText that was embedded
`embedding`Embedding | list[float]Embedding vector to cache (either an ``Embedding`` or ``list[float]``)
`model`str | NoneOptional model name
`ttl`int | NoneOptional custom TTL (uses default if not provided)
Returns
TypeDescription
boolTrue if cached successfully, False otherwise
async def delete(
    text: str,
    model: str | None = None
) -> bool

Delete embedding from cache.

Parameters
ParameterTypeDescription
`text`strText to delete embedding for
`model`str | NoneOptional model name
Returns
TypeDescription
boolTrue if deleted successfully, False otherwise
async def clear(model: str | None = None) -> bool

Clear all embeddings from cache.

Parameters
ParameterTypeDescription
`model`str | NoneIf provided, only clear embeddings for this model
Returns
TypeDescription
boolTrue if cleared successfully, False otherwise
async def get_stats() -> dict[str, Any]

Get cache statistics.

Returns
TypeDescription
dict[str, Any]Dictionary with cache statistics

Configuration for the OpenAI-compatible embedding HTTP client.

Reads from LEX_VECTOR__EMBEDDING__* environment variables. Supports any backend that implements the OpenAI /v1/embeddings API: text-embeddings-inference (local), LM Studio, OpenAI, Cohere, etc.

Example

# Local (docker-compose, infinity embedding server):
LEX_VECTOR__EMBEDDING__API_BASE=http://fastembed
LEX_VECTOR__EMBEDDING__MODEL=nomic-ai/nomic-embed-text-v1.5
LEX_VECTOR__EMBEDDING__DIMENSION=768
# Production (OpenAI — uses /v1/embeddings):
LEX_VECTOR__EMBEDDING__API_BASE=https://api.openai.com/v1
LEX_VECTOR__EMBEDDING__MODEL=text-embedding-3-small
LEX_VECTOR__EMBEDDING__DIMENSION=1536
LEX_VECTOR__EMBEDDING__API_KEY=sk-...

Hybrid retrieval combining BM25 and vector search.

Uses Reciprocal Rank Fusion (RRF) to combine results from both methods.

Example

retriever = HybridRetriever( … vector_store=my_vector_store, … bm25_weight=0.5, … vector_weight=0.5 … ) results = await retriever.search(“python programming”, k=10)

def __init__(
    vector_store: VectorRetriever,
    bm25_weight: float = 0.5,
    vector_weight: float = 0.5,
    rrf_k: int = 60,
    tokenizer: Tokenizer | None = None
)

Initialize hybrid retriever.

Parameters
ParameterTypeDescription
`vector_store`VectorRetrieverVector store for semantic search
`bm25_weight`floatWeight for BM25 results (default: 0.5)
`vector_weight`floatWeight for vector results (default: 0.5)
`rrf_k`intRRF k parameter (default: 60)
`tokenizer`Tokenizer | NoneText tokenizer for BM25
async def add_documents(documents: list[Document]) -> None

Add documents to both BM25 and vector stores.

Parameters
ParameterTypeDescription
`documents`list[Document]Documents to add
async def search(
    query: str,
    k: int = 10,
    bm25_k: int | None = None,
    vector_k: int | None = None,
    filter: dict[str, Any] | None = None
) -> list[SearchResult]

Perform hybrid search.

Parameters
ParameterTypeDescription
`query`strSearch query
`k`intNumber of final results to return
`bm25_k`int | NoneNumber of BM25 results (default: k * 2)
`vector_k`int | NoneNumber of vector results (default: k * 2)
`filter`dict[str, Any] | NoneOptional metadata filter for vector search
Returns
TypeDescription
list[SearchResult]Fused and re-ranked search results
def clear() -> None

Clear BM25 index.

Note: Vector store should be cleared separately.


Configuration for hybrid search.

Example

config = HybridSearchConfig( … bm25_weight=0.6, … vector_weight=0.4, … rrf_k=60 … )


Simple in-memory embedding cache for development/testing.

This is a lightweight alternative when lexigram-cache is not available. Should not be used in production for distributed systems.

def __init__(
    max_size: int = 1000,
    ttl: int = 86400
)

Initialize in-memory cache.

Parameters
ParameterTypeDescription
`max_size`intMaximum number of entries to cache
`ttl`intTime-to-live (not enforced in this simple implementation)
async def get(
    text: str,
    model: str | None = None
) -> Embedding | None

Get embedding from cache.

async def set(
    text: str,
    embedding: Embedding,
    model: str | None = None,
    ttl: int | None = None
) -> bool

Set embedding in cache.

async def delete(
    text: str,
    model: str | None = None
) -> bool

Delete embedding from cache.

async def clear(model: str | None = None) -> bool

Clear cache.

async def get_stats() -> dict[str, Any]

Get cache statistics.


Configuration for the in-memory backend (testing/development).

Non-persistent in-memory vector store.
def __init__(config: MemoryConfig | None = None)
async def connect() -> None
async def disconnect() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult
async def list_collections() -> list[CollectionInfo]
async def create_collection(config: CollectionConfig) -> None
async def delete_collection(name: str) -> None
async def collection_exists(name: str) -> bool
async def get_collection(name: str) -> MemoryVectorCollection

Mock vector store for testing.

Provides simple in-memory storage with simulated similarity search. Does not require actual embeddings - uses text similarity instead.

Example

store = MockVectorStore() await store.add([ … Document(text=“Python is a programming language”, id=“1”), … Document(text=“JavaScript is for web development”, id=“2”) … ]) results = await store.search( … query_vector=[], # Ignored in mock … top_k=1 … )

def __init__(
    config: Any | None = None,
    **kwargs: Any
)

Initialize mock vector store.

Parameters
ParameterTypeDescription
`config`Any | NoneOptional vector store configuration **kwargs: Dependencies, including: - similarity_threshold: Minimum similarity score for results - dimension_size: Dimension size of embeddings to validate
async def add(documents: list[Document]) -> Result[list[str], VectorStoreError]

Add documents to store.

Parameters
ParameterTypeDescription
`documents`list[Document]Documents to add
Returns
TypeDescription
Result[list[str], VectorStoreError]``Ok(list[str])`` with document IDs on success.
async def batch_upsert(
    documents: list[Document],
    batch_size: int = 100
) -> Result[int, VectorStoreError]

Upsert documents in batches.

Parameters
ParameterTypeDescription
`documents`list[Document]List of documents to upsert.
`batch_size`intNumber of documents per batch. defaults to 100.
Returns
TypeDescription
Result[int, VectorStoreError]``Ok(int)`` with the total count of documents upserted, or ``Err(VectorStoreError)`` on failure.
async def search(
    query_vector: list[float] | None = None,
    query: list[float] | str | None = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict | None = None,
    filters: dict[str, Any] | None = None,
    filter_: dict | None = None,
    **kwargs: Any
) -> Result[list[SearchResult], VectorStoreError]

Search for similar documents.

Uses simple text matching instead of vector similarity. This is sufficient for testing most RAG workflows.

Parameters
ParameterTypeDescription
`query_vector`list[float] | NoneQuery embedding (ignored in mock) k/top_k: Number of results filters/filter_: Metadata filters
Returns
TypeDescription
Result[list[SearchResult], VectorStoreError]``Ok(list[SearchResult])`` on success.
async def delete(ids: list[str]) -> Result[int, VectorStoreError]

Delete documents by ID.

Parameters
ParameterTypeDescription
`ids`list[str]Document IDs to delete
Returns
TypeDescription
Result[int, VectorStoreError]``Ok(int)`` with the count of deleted documents.
def get_document_count() -> int

Get total number of documents.

Returns
TypeDescription
intDocument count
def clear() -> None

Clear all documents.

async def close() -> None

Close store.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.
async def add_texts(
    texts: list[str],
    embeddings: list[list[float]] | None = None,
    metadatas: list[dict[str, Any]] | None = None,
    collection_name: str | None = None
) -> list[str]

Convenience wrapper to add raw texts with optional embeddings/metadata.

Raises
ExceptionDescription
VectorStoreErrorIf adding documents fails (unwraps the Result).

Mock vector store that can simulate errors.

Useful for testing error handling.

Example

store = MockVectorStoreWithErrors(fail_on_search=True) await store.search([0.1, 0.2], top_k=5) # Returns Err(VectorStoreError)

def __init__(
    fail_on_add: bool = False,
    fail_on_search: bool = False,
    fail_on_delete: bool = False,
    error_rate: float | None = None,
    error_message: str = 'Mock vector store error'
)

Initialize error-simulating mock.

Parameters
ParameterTypeDescription
`fail_on_add`boolWhether to fail on add()
`fail_on_search`boolWhether to fail on search()
`fail_on_delete`boolWhether to fail on delete()
`error_rate`float | NoneProbabilistic error rate (0-1), compatible with fixtures
`error_message`strError message to raise
async def add(documents: list[Document]) -> Result[list[str], VectorStoreError]

Add with possible error.

async def search(
    query_vector: list[float] | None = None,
    query: list[float] | str | None = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict | None = None,
    filters: dict[str, Any] | None = None,
    filter_: dict | None = None,
    **kwargs: Any
) -> Result[list[SearchResult], VectorStoreError]

Search with possible error.

async def delete(ids: list[str]) -> Result[int, VectorStoreError]

Delete with possible error.


Mock vector store with actual similarity calculation.

Uses cosine similarity on provided embeddings if available, falls back to text matching otherwise.

Example

store = MockVectorStoreWithSimilarity() await store.add([ … Document( … text=“Python programming”, … embedding=[0.1, 0.2, 0.3], … id=“1” … ) … ]) results = await store.search( … query_vector=[0.1, 0.2, 0.3], … top_k=1 … ) results[0].score # High similarity 1.0

async def search(
    query_vector: list[float] | None = None,
    query: list[float] | str | None = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict | None = None,
    filters: dict[str, Any] | None = None,
    filter_: dict | None = None,
    **kwargs: Any
) -> Result[list[SearchResult], VectorStoreError]

Search with actual similarity calculation.

Parameters
ParameterTypeDescription
`query_vector`list[float] | NoneQuery embedding vector k/top_k: Number of results filters/filter_: Metadata filters
Returns
TypeDescription
Result[list[SearchResult], VectorStoreError]``Ok(list[SearchResult])`` sorted by similarity.

Configuration for a single named vector store backend.

Used in VectorConfig.backends to declare multiple vector stores that the framework registers as named DI bindings.

Example

backends:
- name: primary
primary: true
backend: qdrant
qdrant:
url: http://qdrant-primary:6333
- name: rag
backend: pgvector
pgvector:
database: rag
Parameters
ParameterTypeDescription
`name`Unique backend identifier used as the ``Named()`` DI key.
`primary`Whether this backend also receives the unnamed ``VectorStoreProtocol`` binding for backward compatibility.
`backend`Which vector store driver to use for this entry.
`pgvector`pgvector-specific connection configuration.
`pinecone`Pinecone-specific connection configuration.
`qdrant`Qdrant-specific connection configuration.
`memory`In-memory backend configuration.

Async embedding client for any OpenAI-compatible ``/v1/embeddings`` endpoint.

Implements EmbeddingClientProtocol from lexigram-contracts so it can be registered in the DI container and consumed by DocumentVectorStoreAdapter or any other service that needs text embeddings.

Supports any backend exposing the OpenAI embeddings API:

  • text-embeddings-inference (self-hosted, local docker-compose)
  • LM Studio
  • OpenAI
  • Cohere (OpenAI-compat mode)

Texts are split into config.batch_size chunks and embedded in parallel. The underlying aiohttp.ClientSession is created lazily on the first call and reused across all requests until close() is called.

Example

config = EmbeddingClientConfig(
api_base="http://fastembed/v1",
model="nomic-ai/nomic-embed-text-v1.5",
dimension=768,
)
client = OpenAICompatibleEmbeddingClient(config)
vectors = await client.embed(["dog limping", "cat vomiting"])
await client.close()
def __init__(config: EmbeddingClientConfig) -> None

Initialise the embedding client.

Parameters
ParameterTypeDescription
`config`EmbeddingClientConfigClient configuration (URL, model, dimension, timeout, batch size).
async def embed(texts: list[str]) -> list[list[float]]

Generate embeddings for a list of texts.

Splits input into batches of config.batch_size, sends each batch to the /embeddings endpoint, then concatenates results preserving order.

Parameters
ParameterTypeDescription
`texts`list[str]Texts to embed. Empty list returns immediately.
Returns
TypeDescription
list[list[float]]One embedding vector per input text, in the same order.
Raises
ExceptionDescription
RuntimeErrorIf the HTTP request fails or the response is malformed.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Verify the embedding endpoint is reachable.

Embeds a short test string with a reduced timeout.

Parameters
ParameterTypeDescription
`timeout`floatConnection timeout in seconds.
Returns
TypeDescription
HealthCheckResultHealthy if the endpoint responds correctly, Unhealthy otherwise.
async def close() -> None

Close the underlying aiohttp session and release connections.


Configuration for the pgvector backend.

PostgreSQL vector store using pgvector extension.
def __init__(
    provider: DatabaseProviderProtocol,
    config: PgVectorConfig | None = None
)
async def connect() -> None
async def disconnect() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult
async def list_collections() -> list[CollectionInfo]
async def create_collection(config: CollectionConfig) -> None
async def delete_collection(name: str) -> None
async def collection_exists(name: str) -> bool
async def get_collection(name: str) -> PgVectorCollection

Configuration for the Pinecone backend.

Pinecone managed vector store driver (SDK v6+).
def __init__(config: PineconeConfig)
async def connect() -> None
async def disconnect() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult
async def list_collections() -> list[CollectionInfo]
async def create_collection(config: CollectionConfig) -> None
async def delete_collection(name: str) -> None
async def collection_exists(name: str) -> bool
async def get_collection(name: str) -> PineconeCollection

Configuration for the Qdrant backend.

Qdrant vector database driver.
def __init__(config: QdrantConfig)
async def connect() -> None
async def disconnect() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult
async def list_collections() -> list[CollectionInfo]
async def create_collection(config: CollectionConfig) -> None
async def delete_collection(name: str) -> None
async def collection_exists(name: str) -> bool
async def get_collection(name: str) -> QdrantCollection

Reciprocal Rank Fusion (RRF) for combining ranked lists.

RRF is a simple and effective method for combining multiple ranked lists. It gives higher scores to items that appear near the top of multiple lists.

Example

reranker = RRFReranker(k=60) fused = reranker.fuse([results1, results2])

def __init__(k: int = 60)

Initialize RRF reranker.

Parameters
ParameterTypeDescription
`k`intRRF constant parameter (default: 60)
def fuse(
    ranked_lists: list[list[SearchResult]],
    weights: list[float] | None = None
) -> list[SearchResult]

Fuse multiple ranked lists using RRF.

Parameters
ParameterTypeDescription
`ranked_lists`list[list[SearchResult]]List of ranked result lists
`weights`list[float] | NoneOptional weights for each list (default: equal weights)
Returns
TypeDescription
list[SearchResult]Fused and re-ranked results

Pipeline for applying multiple rerankers in sequence.

Example

pipeline = RerankerPipeline([ … SimilarityReranker(), … DiversityReranker() … ]) results = await pipeline.rerank(“python”, search_results)

def __init__(rerankers: list[Reranker])

Initialize reranker pipeline.

Parameters
ParameterTypeDescription
`rerankers`list[Reranker]List of rerankers to apply in sequence
async def rerank(
    query: str,
    results: list[SearchResult],
    top_k: int | None = None
) -> list[SearchResult]

Apply all rerankers in sequence.

Parameters
ParameterTypeDescription
`query`strSearch query
`results`list[SearchResult]Initial results
`top_k`int | NoneNumber of final results to return
Returns
TypeDescription
list[SearchResult]Reranked results after all stages

Configuration for reranking.

Example

config = RerankingConfig( … strategy=RerankingStrategy.SIMILARITY, … score_boost=0.15, … top_k=10 … )


Reranking strategy types.

Similarity-based reranking.

Reranks results based on text similarity between query and documents. Uses simple scoring strategies suitable for the community edition.

Example

reranker = SimilarityReranker(score_boost=0.1) results = await reranker.rerank(“python programming”, search_results)

def __init__(
    score_boost: float = 0.1,
    exact_match_boost: float = 0.5,
    case_sensitive: bool = False
)

Initialize similarity reranker.

Parameters
ParameterTypeDescription
`score_boost`floatScore boost for term matches (default: 0.1)
`exact_match_boost`floatAdditional boost for exact matches (default: 0.5)
`case_sensitive`boolWhether matching is case-sensitive (default: False)
async def rerank(
    query: str,
    results: list[SearchResult],
    top_k: int | None = None
) -> list[SearchResult]

Rerank results based on similarity.

Parameters
ParameterTypeDescription
`query`strSearch query
`results`list[SearchResult]Initial results
`top_k`int | NoneNumber of results to return
Returns
TypeDescription
list[SearchResult]Reranked results

Top-level vector store configuration.
def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Check config is safe for the target environment.

def from_named(
    cls,
    entry: NamedVectorConfig,
    base: VectorConfig | None = None
) -> VectorConfig

Build a single-backend VectorConfig from a NamedVectorConfig entry.

Used internally by VectorProvider to create per-backend configs from a multi-backend declaration. Global settings (distance metric, index type, dimension, batch sizes, retry policy) are copied from base; per-backend driver config and backend selector come from entry.

Parameters
ParameterTypeDescription
`entry`NamedVectorConfigThe named backend entry to materialise.
`base`VectorConfig | NoneOptional base config to inherit top-level settings from. Defaults to a freshly constructed ``VectorConfig``.
Returns
TypeDescription
VectorConfigA ``VectorConfig`` configured for the single named backend with ``backends=[]`` to prevent recursive resolution.

Raised when a document is removed from a vector store.

Downstream: audit logging, cache invalidation.


Raised when a document is successfully indexed into a vector store.

Downstream: observability, audit logging.


Payload fired when a vector embedding is written to the store.

Attributes: collection: Name of the vector collection that was written to. document_id: Identifier of the document whose embedding was stored.


Vector storage module — registers VectorStoreProtocol and VectorCollectionProtocol.

Registers VectorStoreProtocol and VectorCollectionProtocol for constructor injection.

Call configure to configure the vector backend (Qdrant, ChromaDB, PGVector, or in-memory), or stub for an isolated setup with no external service dependencies.

Usage

from lexigram.vector.config import VectorConfig
@module(
imports=[VectorModule.configure(VectorConfig(backend="qdrant"))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: VectorConfig | Any | None = None,
    enable_reranking: bool = False
) -> DynamicModule

Create a VectorModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`VectorConfig | Any | NoneVectorConfig, a plain ``dict`` of config values, or ``None`` to use environment variable defaults.
`enable_reranking`boolEnable cross-encoder reranking of retrieval results to improve relevance scoring. Defaults to ``False``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
Raises
ExceptionDescription
TypeErrorIf *config* is not a ``VectorConfig``, ``dict``, or ``None``.
def stub(
    cls,
    config: VectorConfig | None = None
) -> DynamicModule

Create a VectorModule suitable for unit and integration testing.

Uses an in-memory backend with no external service dependencies.

Parameters
ParameterTypeDescription
`config`VectorConfig | NoneOptional VectorConfig override. Uses safe in-memory defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Register vector store services into the DI container.

Selects the backend based on VectorConfig.backend and registers the appropriate VectorStoreProtocol implementation.

Supports single-backend and multi-backend (VectorConfig.backends) modes. In multi-backend mode each entry is registered under its name via container.singleton(..., name=entry.name). The primary backend (primary=True or the first entry) also receives the unnamed binding for backward compatibility.

def __init__(config: VectorConfig | None = None) -> None

Initialize with optional config.

async def register(container: ContainerRegistrarProtocol) -> None

Register the vector store config and (lazy) singleton bindings.

async def boot(container: ContainerResolverProtocol) -> None

Connect to the vector store(s).

Multi-backend mode: All stores are connected in parallel via asyncio.gather. pgvector entries whose store could not be created at register time are instantiated here using the fully-resolved DI container. On partial failure the successfully-connected stores are disconnected before re-raising the first error.

Single-backend mode: Preserves the original sequential boot behaviour.

async def shutdown() -> None

Disconnect from all vector store(s).

Multi-backend mode: stores are disconnected in LIFO order; errors are suppressed so all stores get a chance to clean up.

Single-backend mode: original behaviour is preserved.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check provider health across all registered backends.

Multi-backend mode: health checks run in parallel; the overall status is the worst individual status (UNHEALTHY > DEGRADED > HEALTHY).

Single-backend mode: original behaviour is preserved.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for each health check response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult with status and per-backend component details.

Raised when a similarity search is performed against a vector collection.

Downstream: analytics, quota tracking.


Payload fired after a vector similarity search completes.

Attributes: collection: Name of the vector collection that was searched. result_count: Number of nearest-neighbour results returned.


Adapts infra ``VectorCollectionProtocol`` to AI ``VectorStoreProtocol``.

Receives a VectorStoreProtocol from lexigram-vector via constructor injection. Collection access is lazy — get_collection is called on the first vector operation so this class is safe to construct before the infra store has booted.

Embedding generation is delegated to an optional embedding_fn (typically EmbeddingClientProtocol.embed). When embedding_fn is None, add() requires doc.embedding to be pre-set and search_text() returns an error.

def __init__(
    infra_store: InfraVectorStoreProtocol,
    collection_name: str,
    dimension: int,
    config: VectorConfig,
    embedding_cache: EmbeddingCache | None = None,
    embedding_fn: EmbeddingFn | None = None
) -> None

Initialize the adapter.

Parameters
ParameterTypeDescription
`infra_store`InfraVectorStoreProtocolInfra-level vector store (from lexigram-vector's provider).
`collection_name`strName of the target vector collection.
`dimension`intVector dimensionality, used when creating the collection.
`config`VectorConfigVector configuration block.
`embedding_cache`EmbeddingCache | NoneOptional cache for generated embeddings.
`embedding_fn`EmbeddingFn | NoneOptional async callable ``(texts) -> list[list[float]]``.
async def add(documents: list[Document]) -> Result[list[str], VectorStoreError]

Add documents to the vector store.

Documents without embedding will be embedded via embedding_fn if provided; otherwise an Err is returned.

Parameters
ParameterTypeDescription
`documents`list[Document]Documents to add. Must each have ``text`` set.
Returns
TypeDescription
Result[list[str], VectorStoreError]``Ok(list[str])`` with document IDs, or ``Err(VectorStoreError)``.
async def batch_upsert(
    documents: list[Document],
    batch_size: int = 100
) -> Result[int, VectorStoreError]

Upsert documents in fixed-size batches.

Parameters
ParameterTypeDescription
`documents`list[Document]Documents to upsert.
`batch_size`intDocuments per batch.
Returns
TypeDescription
Result[int, VectorStoreError]``Ok(int)`` with total upserted count, or ``Err(VectorStoreError)``.
async def search(
    query: list[float],
    *,
    top_k: int = 10,
    filters: dict[str, Any] | None = None,
    score_threshold: float | None = None
) -> Result[list[SearchResult], VectorStoreError]

Search for similar documents using a pre-computed query vector.

Parameters
ParameterTypeDescription
`query`list[float]Query embedding vector.
`top_k`intMaximum number of results.
`filters`dict[str, Any] | NoneMetadata equality filters.
`score_threshold`float | NoneMinimum similarity score to include.
Returns
TypeDescription
Result[list[SearchResult], VectorStoreError]``Ok(list[SearchResult])`` ordered by similarity, or ``Err(VectorStoreError)``.
async def search_text(
    query: str,
    *,
    top_k: int = 10,
    filters: dict[str, Any] | None = None
) -> Result[list[SearchResult], VectorStoreError]

Search for similar documents using a text query.

Embeds query via embedding_fn then delegates to search.

Parameters
ParameterTypeDescription
`query`strQuery text to embed and search.
`top_k`intMaximum number of results.
`filters`dict[str, Any] | NoneMetadata equality filters.
Returns
TypeDescription
Result[list[SearchResult], VectorStoreError]``Ok(list[SearchResult])`` on success, or ``Err(VectorStoreError)`` if no ``embedding_fn`` is configured.
async def delete(ids: list[str]) -> Result[int, VectorStoreError]

Delete documents by ID.

Parameters
ParameterTypeDescription
`ids`list[str]Document IDs to delete.
Returns
TypeDescription
Result[int, VectorStoreError]``Ok(int)`` with deletion count, or ``Err(VectorStoreError)``.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a health check by querying the collection document count.

Returns
TypeDescription
HealthCheckResultStructured health check result.

def create_hybrid_retriever(
    vector_store: VectorRetriever,
    config: HybridSearchConfig | None = None
) -> HybridRetriever

Create a hybrid retriever with configuration.

Parameters
ParameterTypeDescription
`vector_store`VectorRetrieverVector store for semantic search
`config`HybridSearchConfig | NoneHybrid search configuration
Returns
TypeDescription
HybridRetrieverConfigured hybrid retriever

Example

config = HybridSearchConfig(bm25_weight=0.6, vector_weight=0.4) retriever = create_hybrid_retriever(my_vector_store, config)


def create_reranker(
    strategy: RerankingStrategy = RerankingStrategy.SIMILARITY,
    config: RerankingConfig | None = None,
    **kwargs: Any
) -> Reranker

Create a reranker based on strategy.

Parameters
ParameterTypeDescription
`strategy`RerankingStrategyReranking strategy
`config`RerankingConfig | NoneOptional configuration **kwargs: Additional arguments for specific rerankers
Returns
TypeDescription
RerankerConfigured reranker

Example

reranker = create_reranker(RerankingStrategy.SIMILARITY) results = await reranker.rerank(“query”, search_results)


async def create_vector_store(
    config: VectorConfig,
    *,
    infra_store: InfraVectorStoreProtocol | None = None,
    cache_backend: CacheBackendProtocol | None = None,
    embedding_fn: EmbeddingFn | None = None
) -> Any

Create an AI-layer vector store, delegating to the infra store when available.

When infra_store is provided (registered by lexigram-vector’s provider), a VectorStoreAdapter is returned that wraps it. Otherwise, a MockVectorStore is returned — suitable for testing and environments without an infra store.

Parameters
ParameterTypeDescription
`config`VectorConfigVector configuration block.
`infra_store`InfraVectorStoreProtocol | NoneInfra-level vector store (from lexigram-vector), if present.
`cache_backend`CacheBackendProtocol | NonePlatform cache backend for embedding caching.
`embedding_fn`EmbeddingFn | NoneAsync callable ``(texts) -> list[list[float]]`` for embedding.
Returns
TypeDescription
AnyA configured vector store instance satisfying the AI-layer protocol.

Attempted to create a collection that already exists.
def __init__(collection_name: str) -> None

Initialize with the conflicting collection name.


Requested collection does not exist.
def __init__(collection_name: str) -> None

Initialize with the missing collection name.


Vector dimensionality does not match the collection.
def __init__(
    expected: int,
    actual: int,
    record_id: str | None = None
) -> None

Initialize with expected and actual dimensions.


Failed to compile a metadata filter for the target backend.
def __init__(
    message: str,
    backend: str
) -> None

Initialize with error details and backend name.


Invalid vector store configuration.

Failed to connect to the vector store.

Failed to delete vectors.

Base exception for all vector store operations.

Failed to execute similarity search.

Vector store operation timed out.

Failed to upsert vectors.