Skip to content
GitHub

Retrieval-Augmented Generation

lexigram-ai-rag provides a configurable RAG pipeline that ingests documents, chunks them, generates embeddings, retrieves relevant context, and synthesizes grounded answers. The pipeline is protocol-driven: swap chunking strategies, retrieval backends, and synthesis models without changing application code.

For the full configuration reference and advanced features (HyDE, reranking, evaluation), see the lexigram-ai-rag package docs.


The RAG system is built on protocols from lexigram.contracts.ai.rag. Every pipeline operation returns a Result so failures are explicit:

from typing import Any, Protocol, runtime_checkable
from lexigram.result import Result
from lexigram.contracts.ai.rag import RAGContext, RAGResponse, RAGError
from lexigram.contracts.ai.vector import SearchResultProtocol
class RAGPipelineProtocol(Protocol):
async def execute(self, context: RAGContext) -> Result[RAGResponse, RAGError]: ...
class RetrievalStrategyProtocol(Protocol):
async def retrieve(
self,
query: str,
candidates: list[SearchResultProtocol],
*,
top_k: int = 5,
**kwargs: Any,
) -> list[SearchResultProtocol]: ...

RAGContext carries the query and optional filters, and RAGResponse contains the synthesized answer with sources and citations:

from dataclasses import dataclass
from typing import Any
from lexigram.contracts.ai.vector import SearchResultProtocol
@dataclass(frozen=True)
class RAGContext:
query: str
config: dict[str, Any] | None = None
filters: dict[str, Any] | None = None
session_id: str | None = None
@dataclass(frozen=True)
class RAGResponse:
answer: str
sources: list[SearchResultProtocol]
citations: list[Any] | None = None
confidence: float | None = None

Add the RAGModule and configure chunking, retrieval, and synthesis:

from lexigram import Application
from lexigram.ai.rag import RAGModule, RAGConfig
app = Application(name="my-app")
app.add_module(RAGModule.configure(
RAGConfig(
chunk_size=512,
chunk_overlap=50,
embedding_provider="openai",
top_k=5,
enable_citations=True,
collection_name="knowledge_base",
),
))
application.yaml
ai_rag:
enabled: true
vector_store_type: pgvector
collection_name: knowledge_base
top_k: 5
chunk_size: 512
chunk_overlap: 50
chunking_strategy: recursive
embedding_provider: openai
embedding_model: text-embedding-3-small
enable_citations: true
enable_hyde: false
enable_query_expansion: true
use_hybrid_search: true
similarity_threshold: 0.7
enable_caching: true
cache_ttl: 3600

The pipeline supports multiple chunking strategies configured via chunking_strategy:

StrategyDescription
recursiveRecursive character splitting with overlap (default)
tokenToken-aware splitting at model boundaries
semanticSemantic boundary detection using embeddings

Use the create_chunker factory for programmatic access:

from lexigram.ai.rag import create_chunker, ChunkingConfig
chunker = create_chunker(
ChunkingConfig(
strategy="recursive",
chunk_size=512,
chunk_overlap=50,
)
)
chunks = await chunker.chunk(document_text)

Ingest documents into the vector store through the pipeline. Documents are chunked, embedded, and stored automatically:

from lexigram.ai.rag import RAGModule, RAGPipeline, RAGConfig
from lexigram.contracts.ai.rag import RAGPipelineProtocol
async def index_documents() -> None:
async with Application.boot(
modules=[RAGModule.configure(RAGConfig(collection_name="kb"))]
) as app:
pipeline = await app.container.resolve(RAGPipelineProtocol)
# Documents are chunked, embedded, and indexed automatically
result = await pipeline.execute(
RAGContext(query="seed document", config={"index_only": True})
)

Run a RAG query to retrieve relevant documents and synthesize an answer:

from lexigram import Application
from lexigram.ai.rag import RAGModule, RAGConfig
from lexigram.contracts.ai.rag import RAGPipelineProtocol, RAGContext
async def ask(query: str) -> None:
async with Application.boot(
modules=[RAGModule.configure(RAGConfig(top_k=5))]
) as app:
pipeline = await app.container.resolve(RAGPipelineProtocol)
result = await pipeline.execute(RAGContext(query=query))
if result.is_ok():
response = result.unwrap()
print(f"Answer: {response.answer}")
for source in response.sources:
print(f" Source: {source}")
if response.citations:
for citation in response.citations:
print(f" Citation: {citation}")
else:
error = result.unwrap_err()
print(f"RAG failed: {error}")

When enable_citations is True, sources are cited inline in the response. The sources list contains SearchResultProtocol objects with metadata about each retrieved document.


The pipeline uses RetrievalStrategyProtocol for pluggable retrieval. The strategy registry (RetrievalStrategyRegistry via with_defaults()) provides built-in strategies:

StrategyDescription
similarityPure vector similarity search
hybridCombined vector + keyword (default)
mmrMaximum marginal relevance for diversity

Reranking is handled by RerankingStrategyProtocol implementations registered in RerankingStrategyRegistry:

from lexigram.ai.rag import RetrievalStrategyRegistry
registry = RetrievalStrategyRegistry.with_defaults()
strategy = registry.get("hybrid")

Use RAGModule.stub() for isolated tests:

from lexigram import Application
from lexigram.ai.rag import RAGModule
from lexigram.contracts.ai.rag import RAGPipelineProtocol
async def test_pipeline_resolves() -> None:
async with Application.boot(modules=[RAGModule.stub()]) as app:
pipeline = await app.container.resolve(RAGPipelineProtocol)
assert pipeline is not None