Skip to content
GitHubDiscord

API Reference

Splits documents into chunks for embedding and indexing.
def chunk(
    text: str,
    chunk_size: int,
    overlap: int
) -> list[str]

Compresses retrieved context to fit within token limits.
async def compress(
    query: str,
    context: str,
    max_tokens: int
) -> str

Re-ranks retrieved documents by relevance.
async def rerank(
    query: str,
    documents: list[str],
    top_k: int
) -> list[tuple[str, float]]

A chunk of text with metadata.

Attributes: text: The chunk text content source: Source document identifier start_index: Starting character position in original document end_index: Ending character position in original document chunk_index: Sequential index of this chunk metadata: Optional metadata dictionary


Configuration for chunking.

Example

config = ChunkingConfig( … strategy=ChunkingStrategy.FIXED_SIZE, … chunk_size=1000, … overlap=200 … )


Retrieved context for RAG generation.

Example

context = Context( … query=“What is Lexigram?”, … documents=[doc1, doc2], … metadata={“retrieval_time”: 0.123} … )


Configuration for document ingestion stage.

Builder for constructing RAG pipelines with fluent API.

The builder provides a convenient way to configure and build pipelines programmatically or from configuration files.

def __init__() -> Any

Initialize the pipeline builder.

def with_name(name: str) -> PipelineBuilder

Set pipeline name.

Parameters
ParameterTypeDescription
`name`strPipeline name
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_description(description: str) -> PipelineBuilder

Set pipeline description.

Parameters
ParameterTypeDescription
`description`strPipeline description
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_ingestion(**kwargs: Any) -> PipelineBuilder

Configure ingestion stage.

Parameters
ParameterTypeDescription
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_query_processing(**kwargs: Any) -> PipelineBuilder

Configure query processing stage.

Parameters
ParameterTypeDescription
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_retrieval(**kwargs: Any) -> PipelineBuilder

Configure retrieval stage.

Parameters
ParameterTypeDescription
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_context_optimization(**kwargs: Any) -> PipelineBuilder

Configure context optimization stage.

Parameters
ParameterTypeDescription
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_synthesis(**kwargs: Any) -> PipelineBuilder

Configure synthesis stage.

Parameters
ParameterTypeDescription
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_quality_assurance(**kwargs: Any) -> PipelineBuilder

Configure quality assurance stage.

Parameters
ParameterTypeDescription
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_post_processing(**kwargs: Any) -> PipelineBuilder

Configure post-processing stage.

Parameters
ParameterTypeDescription
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_error_strategy(
    strategy: ErrorStrategy,
    max_retries: int = 3,
    retry_delay: float = 1.0
) -> PipelineBuilder

Configure global error handling.

Parameters
ParameterTypeDescription
`strategy`ErrorStrategyDefault error handling strategy
`max_retries`intMaximum number of retries
`retry_delay`floatInitial delay between retries
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_stages(stages: list[PipelineStageType]) -> PipelineBuilder

Set the ordered list of pipeline stages.

Parameters
ParameterTypeDescription
`stages`list[PipelineStageType]List of stages
Returns
TypeDescription
PipelineBuilderSelf for chaining
def with_custom_stage(stage: PipelineStageProtocol) -> PipelineBuilder

Add a custom pipeline stage.

Parameters
ParameterTypeDescription
`stage`PipelineStageProtocolCustom pipeline stage
Returns
TypeDescription
PipelineBuilderSelf for chaining
def from_dict(config_dict: dict[str, Any]) -> PipelineBuilder

Load configuration from dictionary.

Parameters
ParameterTypeDescription
`config_dict`dict[str, Any]Configuration dictionary
Returns
TypeDescription
PipelineBuilderSelf for chaining
async def from_yaml(yaml_path: str | Path) -> PipelineBuilder

Load configuration from YAML file.

Parameters
ParameterTypeDescription
`yaml_path`str | PathPath to YAML configuration file
Returns
TypeDescription
PipelineBuilderSelf for chaining
def retrieve(
    strategy: str = 'hybrid',
    top_k: int = 10,
    **kwargs: Any
) -> PipelineBuilder

Configure retrieval with high-level parameters.

Parameters
ParameterTypeDescription
`strategy`strRetrieval strategy name (``"hybrid"``, ``"dense"``, ``"sparse"``).
`top_k`intNumber of chunks to retrieve. **kwargs: Additional retrieval parameters.
def rerank(
    strategy: str = 'cross-encoder',
    top_k: int = 5,
    **kwargs: Any
) -> PipelineBuilder

Configure context optimization / reranking.

Parameters
ParameterTypeDescription
`strategy`strReranking strategy name.
`top_k`intNumber of chunks to keep after reranking. **kwargs: Additional reranking parameters.
def synthesize(
    strategy: str = 'abstractive',
    model: str | None = None,
    **kwargs: Any
) -> PipelineBuilder

Configure synthesis with high-level parameters.

Parameters
ParameterTypeDescription
`strategy`strSynthesis strategy (``"abstractive"``, ``"extractive"``).
`model`str | NoneLLM model identifier for generation. **kwargs: Additional synthesis parameters.
def with_citations(required: bool = True) -> PipelineBuilder

Enable citation tracking in the pipeline.

Parameters
ParameterTypeDescription
`required`boolWhether citations are required (pipeline fails without them if set to ``True``).
def with_evaluation(
    evaluator: RAGEvaluatorProtocol | None = None,
    every_n: int = 1
) -> PipelineBuilder

Enable automatic evaluation of pipeline outputs.

Parameters
ParameterTypeDescription
`evaluator`RAGEvaluatorProtocol | NoneOptional pre-built evaluator instance.
`every_n`intEvaluate every *n*-th request (default: every request).
def with_timeout(**stage_timeouts: float) -> PipelineBuilder

Set per-stage timeouts (in seconds).

Keyword arguments map stage names to their timeout values

builder.with_timeout(retrieval=10.0, synthesis=30.0)
Parameters
ParameterTypeDescription
def with_working_memory(memory: WorkingMemoryProtocol) -> PipelineBuilder

Attach working memory for context enrichment.

Parameters
ParameterTypeDescription
`memory`WorkingMemoryProtocolWorking memory instance.
def build() -> RAGPipeline

Build the RAG pipeline.

Returns
TypeDescription
RAGPipelineConfigured RAG pipeline
Raises
ExceptionDescription
ValueErrorIf configuration is invalid

Complete pipeline configuration.
def from_dict(
    cls,
    config_dict: dict[str, Any]
) -> PipelineConfig

Create configuration from dictionary.

def to_dict() -> dict[str, Any]

Convert configuration to dictionary.


Payload fired after the synthesis stage produces a final answer.

Attributes: pipeline_name: Name or identifier of the pipeline that synthesised the answer.


Configuration for RAG (Retrieval Augmented Generation) pipeline.

Example

config = RAGConfig( … vector_store_type=“chroma”, … collection_name=“pet_knowledge”, … top_k=5, … enable_citations=True … )


Payload fired after the retrieval stage returns candidate chunks.

Attributes: chunk_count: Number of chunks returned by the retrieval step.


Retrieval-Augmented Generation (RAG) pipeline integration.

Call configure to register the RAG pipeline, strategy registries, and supporting services (knowledge graph, HyDE, compression, reasoning) for injection.

Usage

from lexigram.ai.rag.config import RAGConfig
@module(
imports=[
RAGModule.configure(RAGConfig(chunk_size=512))
]
)
class AppModule(Module):
pass

Error Handling

RAG pipeline failures surface as typed exceptions that can be caught
directly or handled via the Result pattern::
from lexigram.ai.rag.exceptions import (
RAGError, # base — catch-all
PreprocessingError, # document preprocessing failure
RetrievalError, # retrieval / vector-store failure
SynthesisError, # response synthesis failure
ChunkingError, # document chunking failure
)

Exports: RAGPipelineProtocol, RetrievalStrategyProtocol, RAGError, PreprocessingError, RetrievalError, SynthesisError, ChunkingError

def configure(
    cls,
    config: RAGConfig | None = None
) -> DynamicModule

Create a RAGModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`RAGConfig | NoneRAGConfig or ``None`` to use defaults (reads from environment variables).
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(
    cls,
    config: RAGConfig | None = None
) -> DynamicModule

Create a RAGModule suitable for unit and integration testing.

Uses in-memory or no-op implementations with minimal side effects.

Parameters
ParameterTypeDescription
`config`RAGConfig | NoneOptional config override. Uses safe test defaults when None.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Main RAG pipeline that orchestrates all stages.

This class provides a simple interface for executing the complete RAG pipeline with configurable stages and error handling.

def __init__(
    config: PipelineConfig,
    stages: list[PipelineStageProtocol],
    evaluator: RAGEvaluatorProtocol | None = None,
    working_memory: WorkingMemoryProtocol | None = None
)

Initialize the RAG pipeline.

Parameters
ParameterTypeDescription
`config`PipelineConfigPipeline configuration
`stages`list[PipelineStageProtocol]List of pipeline stages
`evaluator`RAGEvaluatorProtocol | NoneOptional evaluator implementing RAGEvaluatorProtocol for automatic per-request quality evaluation. Evaluation frequency is controlled by auto_evaluate_every_n.
`working_memory`WorkingMemoryProtocol | NoneOptional working memory for context enrichment.
async def run(
    query: str,
    documents: list[str] | None = None,
    document_paths: list[str] | None = None,
    metadata: dict[str, Any] | None = None
) -> PipelineContext

Execute the RAG pipeline.

Parameters
ParameterTypeDescription
`query`strUser query
`documents`list[str] | NoneOptional list of document content strings
`document_paths`list[str] | NoneOptional list of document file paths
`metadata`dict[str, Any] | NoneOptional custom metadata
Returns
TypeDescription
PipelineContextPipeline context with results
async def execute(context: RAGContext) -> Result[RAGResponse, RAGError]

Execute the RAG pipeline per the contract protocol.

Parameters
ParameterTypeDescription
`context`RAGContextPipeline context with query and optional config/filters.
Returns
TypeDescription
Result[RAGResponse, RAGError]Ok(RAGResponse) on success, Err(RAGError) on failure.
async def run_parallel(
    query: str,
    stages: list[PipelineStageProtocol] | None = None,
    **kwargs: Any
) -> PipelineContext

Execute pipeline stages in parallel.

Parameters
ParameterTypeDescription
`query`strUser query
`stages`list[PipelineStageProtocol] | NoneStages to execute in parallel (default: all stages) **kwargs: Additional context parameters
Returns
TypeDescription
PipelineContextPipeline context with results

Payload fired when a RAG pipeline begins processing a query.

Attributes: pipeline_name: Name or identifier of the pipeline that started.


Registers RAG pipeline services and strategy registries with the DI container.
def __init__(config: RAGConfig | None = None) -> None
async def register(container: ContainerRegistrarProtocol) -> None
async def boot(container: BootContainerProtocol) -> None

Boot RAG provider — wire optional integrations.

async def shutdown() -> None
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check RAG provider health — verifies embedding service and vector store.

Returns
TypeDescription
HealthCheckResultHealthCheckResult with status ``healthy`` when all configured dependencies are reachable, or ``degraded``/``unhealthy`` otherwise.

Result of a reranking operation.

Attributes: documents: Reranked documents (most relevant first). scores: Relevance scores (parallel to documents). original_count: Number of documents passed to reranker. reranked_count: Number of documents returned (may be < original if top_k applied). model_name: Name of the reranking model used. metadata: Additional reranking metadata.


Registry of reranking strategy handlers.

Reranking strategies reorder documents after initial retrieval using cross-encoders, LLM-based scoring, or fusion techniques.

Uses a handler-based dispatch pattern where handlers implement can_handle(strategy: str) and create_and_rerank() methods.

Usage

registry = RerankingStrategyRegistry()
registry.register(FlashRankStrategyHandler())
handler = registry.get("flashrank")
result = await handler.create_and_rerank(strategy="flashrank", ...)
def __init__() -> None

Initialize an empty handler registry.

def with_defaults(cls) -> RerankingStrategyRegistry

Create a RerankingStrategyRegistry with default handlers.

Returns
TypeDescription
RerankingStrategyRegistryA RerankingStrategyRegistry with no default handlers. Handlers are registered conditionally by the provider.
def register(handler: object) -> None

Register a handler instance.

Parameters
ParameterTypeDescription
`handler`objectA handler instance with can_handle(strategy) method.
def get(strategy: str) -> object | None

Get a handler that can handle the given strategy.

Parameters
ParameterTypeDescription
`strategy`strStrategy name to look up.
Returns
TypeDescription
object | NoneFirst handler where can_handle(strategy) is True, or None.

Emitted when the retrieval stage of a RAG pipeline completes.

Consumed by: quality metrics, retrieval analytics, feedback loops.


Configuration for retrieval stage.

Registry of retrieval strategy implementations.

Strategies take a query and a set of candidate documents and return an ordered subset ranked by relevance.

Usage

registry = RetrievalStrategyRegistry.with_defaults()
strategy = registry.instantiate("mmr", lambda_param=0.7)
results = await strategy.retrieve(query, candidates, top_k=5)
def __init__() -> None
def with_defaults(cls) -> RetrievalStrategyRegistry

Create a registry pre-loaded with built-in strategies.

Registered keys: - "vector": VectorRetrievalStrategy - "mmr": MMRRetrievalStrategy

Returns
TypeDescription
RetrievalStrategyRegistryA new registry instance with default strategies registered.

Emitted when the synthesis stage of a RAG pipeline completes.

Consumed by: quality metrics, answer analytics, audit.


Configuration for response synthesis.

Attributes: strategy: Synthesis strategy to use max_context_length: Maximum context length in tokens max_response_length: Maximum response length in tokens include_citations: Whether to include citations output_format: Desired output format quality_check: Whether to run quality checks min_confidence: Minimum confidence threshold metadata: Additional configuration metadata


def create_chunker(
    strategy: ChunkingStrategy = ChunkingStrategy.FIXED_SIZE,
    config: ChunkingConfig | None = None,
    **kwargs: Any
) -> AbstractChunker

Create a chunker instance for the given strategy.

Convenience wrapper around ChunkingStrategyRegistry.

Parameters
ParameterTypeDescription
`strategy`ChunkingStrategyWhich chunking strategy to use.
`config`ChunkingConfig | NoneOptional chunking configuration. **kwargs: Additional keyword arguments forwarded to the chunker constructor (override config defaults).
Returns
TypeDescription
AbstractChunkerA configured Chunker instance.
Raises
ExceptionDescription
ValueErrorIf no chunker is registered for the given *strategy*.

Base exception for RAG errors.