API Reference
Protocols
Section titled “Protocols”ChunkerProtocol
Section titled “ChunkerProtocol”Splits documents into chunks for embedding and indexing.
ContextCompressorProtocol
Section titled “ContextCompressorProtocol”Compresses retrieved context to fit within token limits.
RerankerProtocol
Section titled “RerankerProtocol”Re-ranks retrieved documents by relevance.
Classes
Section titled “Classes”A chunk of text with metadata.
Attributes: text: The chunk text content source: Source document identifier start_index: Starting character position in original document end_index: Ending character position in original document chunk_index: Sequential index of this chunk metadata: Optional metadata dictionary
ChunkingConfig
Section titled “ChunkingConfig”Configuration for chunking.
Example
config = ChunkingConfig( … strategy=ChunkingStrategy.FIXED_SIZE, … chunk_size=1000, … overlap=200 … )
Context
Section titled “Context”Retrieved context for RAG generation.
Example
context = Context( … query=“What is Lexigram?”, … documents=[doc1, doc2], … metadata={“retrieval_time”: 0.123} … )
IngestionConfig
Section titled “IngestionConfig”Configuration for document ingestion stage.
PipelineBuilder
Section titled “PipelineBuilder”Builder for constructing RAG pipelines with fluent API.
The builder provides a convenient way to configure and build pipelines programmatically or from configuration files.
Initialize the pipeline builder.
def with_name(name: str) -> PipelineBuilder
Set pipeline name.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Pipeline name |
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_description(description: str) -> PipelineBuilder
Set pipeline description.
| Parameter | Type | Description |
|---|---|---|
| `description` | str | Pipeline description |
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_ingestion(**kwargs: Any) -> PipelineBuilder
Configure ingestion stage.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_query_processing(**kwargs: Any) -> PipelineBuilder
Configure query processing stage.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_retrieval(**kwargs: Any) -> PipelineBuilder
Configure retrieval stage.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_context_optimization(**kwargs: Any) -> PipelineBuilder
Configure context optimization stage.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_synthesis(**kwargs: Any) -> PipelineBuilder
Configure synthesis stage.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_quality_assurance(**kwargs: Any) -> PipelineBuilder
Configure quality assurance stage.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_post_processing(**kwargs: Any) -> PipelineBuilder
Configure post-processing stage.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_error_strategy( strategy: ErrorStrategy, max_retries: int = 3, retry_delay: float = 1.0 ) -> PipelineBuilder
Configure global error handling.
| Parameter | Type | Description |
|---|---|---|
| `strategy` | ErrorStrategy | Default error handling strategy |
| `max_retries` | int | Maximum number of retries |
| `retry_delay` | float | Initial delay between retries |
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_stages(stages: list[PipelineStageType]) -> PipelineBuilder
Set the ordered list of pipeline stages.
| Parameter | Type | Description |
|---|---|---|
| `stages` | list[PipelineStageType] | List of stages |
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def with_custom_stage(stage: PipelineStageProtocol) -> PipelineBuilder
Add a custom pipeline stage.
| Parameter | Type | Description |
|---|---|---|
| `stage` | PipelineStageProtocol | Custom pipeline stage |
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def from_dict(config_dict: dict[str, Any]) -> PipelineBuilder
Load configuration from dictionary.
| Parameter | Type | Description |
|---|---|---|
| `config_dict` | dict[str, Any] | Configuration dictionary |
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
async def from_yaml(yaml_path: str | Path) -> PipelineBuilder
Load configuration from YAML file.
| Parameter | Type | Description |
|---|---|---|
| `yaml_path` | str | Path | Path to YAML configuration file |
| Type | Description |
|---|---|
| PipelineBuilder | Self for chaining |
def retrieve( strategy: str = 'hybrid', top_k: int = 10, **kwargs: Any ) -> PipelineBuilder
Configure retrieval with high-level parameters.
| Parameter | Type | Description |
|---|---|---|
| `strategy` | str | Retrieval strategy name (``"hybrid"``, ``"dense"``, ``"sparse"``). |
| `top_k` | int | Number of chunks to retrieve. **kwargs: Additional retrieval parameters. |
def rerank( strategy: str = 'cross-encoder', top_k: int = 5, **kwargs: Any ) -> PipelineBuilder
Configure context optimization / reranking.
| Parameter | Type | Description |
|---|---|---|
| `strategy` | str | Reranking strategy name. |
| `top_k` | int | Number of chunks to keep after reranking. **kwargs: Additional reranking parameters. |
def synthesize( strategy: str = 'abstractive', model: str | None = None, **kwargs: Any ) -> PipelineBuilder
Configure synthesis with high-level parameters.
| Parameter | Type | Description |
|---|---|---|
| `strategy` | str | Synthesis strategy (``"abstractive"``, ``"extractive"``). |
| `model` | str | None | LLM model identifier for generation. **kwargs: Additional synthesis parameters. |
def with_citations(required: bool = True) -> PipelineBuilder
Enable citation tracking in the pipeline.
| Parameter | Type | Description |
|---|---|---|
| `required` | bool | Whether citations are required (pipeline fails without them if set to ``True``). |
def with_evaluation( evaluator: RAGEvaluatorProtocol | None = None, every_n: int = 1 ) -> PipelineBuilder
Enable automatic evaluation of pipeline outputs.
| Parameter | Type | Description |
|---|---|---|
| `evaluator` | RAGEvaluatorProtocol | None | Optional pre-built evaluator instance. |
| `every_n` | int | Evaluate every *n*-th request (default: every request). |
def with_timeout(**stage_timeouts: float) -> PipelineBuilder
Set per-stage timeouts (in seconds).
Keyword arguments map stage names to their timeout values
builder.with_timeout(retrieval=10.0, synthesis=30.0)| Parameter | Type | Description |
|---|
def with_working_memory(memory: WorkingMemoryProtocol) -> PipelineBuilder
Attach working memory for context enrichment.
| Parameter | Type | Description |
|---|---|---|
| `memory` | WorkingMemoryProtocol | Working memory instance. |
def build() -> RAGPipeline
Build the RAG pipeline.
| Type | Description |
|---|---|
| RAGPipeline | Configured RAG pipeline |
| Exception | Description |
|---|---|
| ValueError | If configuration is invalid |
PipelineConfig
Section titled “PipelineConfig”Complete pipeline configuration.
def from_dict( cls, config_dict: dict[str, Any] ) -> PipelineConfig
Create configuration from dictionary.
Convert configuration to dictionary.
RAGAnswerSynthesizedHook
Section titled “RAGAnswerSynthesizedHook”Payload fired after the synthesis stage produces a final answer.
Attributes: pipeline_name: Name or identifier of the pipeline that synthesised the answer.
RAGConfig
Section titled “RAGConfig”Configuration for RAG (Retrieval Augmented Generation) pipeline.
Example
config = RAGConfig( … vector_store_type=“chroma”, … collection_name=“pet_knowledge”, … top_k=5, … enable_citations=True … )
RAGDocumentsRetrievedHook
Section titled “RAGDocumentsRetrievedHook”Payload fired after the retrieval stage returns candidate chunks.
Attributes: chunk_count: Number of chunks returned by the retrieval step.
RAGModule
Section titled “RAGModule”Retrieval-Augmented Generation (RAG) pipeline integration.
Call configure to register the RAG pipeline, strategy registries, and supporting services (knowledge graph, HyDE, compression, reasoning) for injection.
Usage
from lexigram.ai.rag.config import RAGConfig
@module( imports=[ RAGModule.configure(RAGConfig(chunk_size=512)) ])class AppModule(Module): passError Handling
RAG pipeline failures surface as typed exceptions that can be caughtdirectly or handled via the Result pattern::
from lexigram.ai.rag.exceptions import ( RAGError, # base — catch-all PreprocessingError, # document preprocessing failure RetrievalError, # retrieval / vector-store failure SynthesisError, # response synthesis failure ChunkingError, # document chunking failure )Exports: RAGPipelineProtocol, RetrievalStrategyProtocol, RAGError, PreprocessingError, RetrievalError, SynthesisError, ChunkingError
def configure( cls, config: RAGConfig | None = None ) -> DynamicModule
Create a RAGModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | RAGConfig | None | RAGConfig or ``None`` to use defaults (reads from environment variables). |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def stub( cls, config: RAGConfig | None = None ) -> DynamicModule
Create a RAGModule suitable for unit and integration testing.
Uses in-memory or no-op implementations with minimal side effects.
| Parameter | Type | Description |
|---|---|---|
| `config` | RAGConfig | None | Optional config override. Uses safe test defaults when None. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
RAGPipeline
Section titled “RAGPipeline”Main RAG pipeline that orchestrates all stages.
This class provides a simple interface for executing the complete RAG pipeline with configurable stages and error handling.
def __init__( config: PipelineConfig, stages: list[PipelineStageProtocol], evaluator: RAGEvaluatorProtocol | None = None, working_memory: WorkingMemoryProtocol | None = None )
Initialize the RAG pipeline.
| Parameter | Type | Description |
|---|---|---|
| `config` | PipelineConfig | Pipeline configuration |
| `stages` | list[PipelineStageProtocol] | List of pipeline stages |
| `evaluator` | RAGEvaluatorProtocol | None | Optional evaluator implementing RAGEvaluatorProtocol for automatic per-request quality evaluation. Evaluation frequency is controlled by auto_evaluate_every_n. |
| `working_memory` | WorkingMemoryProtocol | None | Optional working memory for context enrichment. |
async def run( query: str, documents: list[str] | None = None, document_paths: list[str] | None = None, metadata: dict[str, Any] | None = None ) -> PipelineContext
Execute the RAG pipeline.
| Parameter | Type | Description |
|---|---|---|
| `query` | str | User query |
| `documents` | list[str] | None | Optional list of document content strings |
| `document_paths` | list[str] | None | Optional list of document file paths |
| `metadata` | dict[str, Any] | None | Optional custom metadata |
| Type | Description |
|---|---|
| PipelineContext | Pipeline context with results |
async def execute(context: RAGContext) -> Result[RAGResponse, RAGError]
Execute the RAG pipeline per the contract protocol.
| Parameter | Type | Description |
|---|---|---|
| `context` | RAGContext | Pipeline context with query and optional config/filters. |
| Type | Description |
|---|---|
| Result[RAGResponse, RAGError] | Ok(RAGResponse) on success, Err(RAGError) on failure. |
async def run_parallel( query: str, stages: list[PipelineStageProtocol] | None = None, **kwargs: Any ) -> PipelineContext
Execute pipeline stages in parallel.
| Parameter | Type | Description |
|---|---|---|
| `query` | str | User query |
| `stages` | list[PipelineStageProtocol] | None | Stages to execute in parallel (default: all stages) **kwargs: Additional context parameters |
| Type | Description |
|---|---|
| PipelineContext | Pipeline context with results |
RAGPipelineStartedHook
Section titled “RAGPipelineStartedHook”Payload fired when a RAG pipeline begins processing a query.
Attributes: pipeline_name: Name or identifier of the pipeline that started.
RAGProvider
Section titled “RAGProvider”Registers RAG pipeline services and strategy registries with the DI container.
def __init__(config: RAGConfig | None = None) -> None
async def register(container: ContainerRegistrarProtocol) -> None
Boot RAG provider — wire optional integrations.
Check RAG provider health — verifies embedding service and vector store.
| Type | Description |
|---|---|
| HealthCheckResult | HealthCheckResult with status ``healthy`` when all configured dependencies are reachable, or ``degraded``/``unhealthy`` otherwise. |
RerankResult
Section titled “RerankResult”Result of a reranking operation.
Attributes: documents: Reranked documents (most relevant first). scores: Relevance scores (parallel to documents). original_count: Number of documents passed to reranker. reranked_count: Number of documents returned (may be < original if top_k applied). model_name: Name of the reranking model used. metadata: Additional reranking metadata.
RerankingStrategyRegistry
Section titled “RerankingStrategyRegistry”Registry of reranking strategy handlers.
Reranking strategies reorder documents after initial retrieval using cross-encoders, LLM-based scoring, or fusion techniques.
Uses a handler-based dispatch pattern where handlers implement can_handle(strategy: str) and create_and_rerank() methods.
Usage
registry = RerankingStrategyRegistry()registry.register(FlashRankStrategyHandler())handler = registry.get("flashrank")result = await handler.create_and_rerank(strategy="flashrank", ...)Initialize an empty handler registry.
def with_defaults(cls) -> RerankingStrategyRegistry
Create a RerankingStrategyRegistry with default handlers.
| Type | Description |
|---|---|
| RerankingStrategyRegistry | A RerankingStrategyRegistry with no default handlers. Handlers are registered conditionally by the provider. |
Register a handler instance.
| Parameter | Type | Description |
|---|---|---|
| `handler` | object | A handler instance with can_handle(strategy) method. |
Get a handler that can handle the given strategy.
| Parameter | Type | Description |
|---|---|---|
| `strategy` | str | Strategy name to look up. |
| Type | Description |
|---|---|
| object | None | First handler where can_handle(strategy) is True, or None. |
RetrievalCompletedEvent
Section titled “RetrievalCompletedEvent”Emitted when the retrieval stage of a RAG pipeline completes.
Consumed by: quality metrics, retrieval analytics, feedback loops.
RetrievalConfig
Section titled “RetrievalConfig”Configuration for retrieval stage.
RetrievalStrategyRegistry
Section titled “RetrievalStrategyRegistry”Registry of retrieval strategy implementations.
Strategies take a query and a set of candidate documents and return an ordered subset ranked by relevance.
Usage
registry = RetrievalStrategyRegistry.with_defaults()strategy = registry.instantiate("mmr", lambda_param=0.7)results = await strategy.retrieve(query, candidates, top_k=5)def with_defaults(cls) -> RetrievalStrategyRegistry
Create a registry pre-loaded with built-in strategies.
Registered keys:
- "vector": VectorRetrievalStrategy
- "mmr": MMRRetrievalStrategy
| Type | Description |
|---|---|
| RetrievalStrategyRegistry | A new registry instance with default strategies registered. |
SynthesisCompletedEvent
Section titled “SynthesisCompletedEvent”Emitted when the synthesis stage of a RAG pipeline completes.
Consumed by: quality metrics, answer analytics, audit.
SynthesisConfig
Section titled “SynthesisConfig”Configuration for response synthesis.
Attributes: strategy: Synthesis strategy to use max_context_length: Maximum context length in tokens max_response_length: Maximum response length in tokens include_citations: Whether to include citations output_format: Desired output format quality_check: Whether to run quality checks min_confidence: Minimum confidence threshold metadata: Additional configuration metadata
Functions
Section titled “Functions”create_chunker
Section titled “create_chunker”
def create_chunker( strategy: ChunkingStrategy = ChunkingStrategy.FIXED_SIZE, config: ChunkingConfig | None = None, **kwargs: Any ) -> AbstractChunker
Create a chunker instance for the given strategy.
Convenience wrapper around ChunkingStrategyRegistry.
| Parameter | Type | Description |
|---|---|---|
| `strategy` | ChunkingStrategy | Which chunking strategy to use. |
| `config` | ChunkingConfig | None | Optional chunking configuration. **kwargs: Additional keyword arguments forwarded to the chunker constructor (override config defaults). |
| Type | Description |
|---|---|
| AbstractChunker | A configured Chunker instance. |
| Exception | Description |
|---|---|
| ValueError | If no chunker is registered for the given *strategy*. |
Exceptions
Section titled “Exceptions”RAGError
Section titled “RAGError”Base exception for RAG errors.