Skip to content
GitHub

Architecture

Internal design of the lexigram-ai-rag package.


lexigram-ai-rag implements the RAG pipeline protocol defined in lexigram-contracts. It is an AI-subsystem extension consumed by lexigram-ai (the orchestrator) and usable standalone via RAGModule.

flowchart BT
    LC[lexigram-contracts<br/>RAGPipelineProtocol · RetrievalStrategyProtocol<br/>SynthesizerProtocol · DocumentLoaderProtocol]
    LR[lexigram-ai-rag<br/>Pipeline · Ingestion · Retrieval · Synthesis<br/>Chunking · Reranking · Evaluation]
    LO[lexigram-ai<br/>Orchestrator]
    LV[lexigram-vector<br/>Backends]
    LM[lexigram-ai-memory<br/>Working memory]

    LO --> LR
    LR --> LC
    LR -.-> LV
    LR -.-> LM

Import direction: Arrows point toward the dependency. lexigram-ai-rag imports only from lexigram and lexigram-contracts. Vector store and memory are resolved via DI — never imported directly.


The RAG pipeline follows three phases: ingestionretrievalsynthesis.

flowchart LR
    subgraph Ingestion
        L[Load] --> C[Chunk]
        C --> E[Embed]
        E --> I[Index]
    end
    subgraph Retrieval
        QP[Query Process] --> VS[Vector Search]
        VS --> RR[Re-rank]
        RR --> CC[Compress]
    end
    subgraph Synthesis
        PC[Build Prompt] --> GEN[LLM Generate]
        GEN --> QA[Quality Check]
    end
    Ingestion --> Retrieval --> Synthesis

Configured via PipelineConfig (config.py:289), which defines an ordered list of PipelineStageType values:

StageConfig ClassPurpose
INGESTIONIngestionConfigLoad, preprocess, OCR, table extraction
QUERY_PROCESSINGQueryProcessingConfigTransform, HyDE, routing
RETRIEVALRetrievalConfigVector/hybrid search, KG, multi-hop
CONTEXT_OPTIMIZATIONContextOptimizationConfigRerank, compress, deduplicate
SYNTHESISSynthesisConfigLLM response generation
QUALITY_ASSURANCEQualityAssuranceConfigFaithfulness, hallucination checks
POST_PROCESSINGPostProcessingConfigCache, collect metrics

Each stage implements PipelineStageProtocol (pipeline/base.py:10) and processes a shared PipelineContext (pipeline/types.py:74).


Documents enter through the ingestion pipeline and are transformed into indexed chunks.

Auto-detects file format from extension or URL and delegates to the right loader:

LoaderFormatsDependencies
TextLoader.txt, .rstNone
MarkdownLoader.md, .markdownNone
JSONLoader.json, .jsonlNone
CSVLoader.csv, .tsvNone
PDFLoader.pdfpypdf (opt)
HTMLLoader.html, .htmbeautifulsoup4 (opt)
WebScraperLoaderURLsaiohttp (opt)
DocxLoader.docxpython-docx (opt)
ExcelLoader.xlsx, .xlsopenpyxl (opt)
CodeLoader20+ code extsNone

ChunkingStrategyRegistry (chunking/strategy_registry.py:23) maps strategy names to chunker classes:

StrategyClassMethod
fixed_sizeFixedSizeChunkerFixed character count
recursiveRecursiveChunkerRecursive separator splitting
semanticSemanticChunkerSentence/paragraph boundaries
sliding_windowSlidingWindowChunkerOverlapping windows
tokenTokenChunkerToken-count (cl100k_base)

Chunkers implement ChunkerProtocol (protocols.py:7):

@runtime_checkable
class ChunkerProtocol(Protocol):
def chunk(self, text: str, chunk_size: int, overlap: int) -> list[str]: ...

Chunks are embedded via EmbeddingClientProtocol and indexed into DocumentVectorStoreProtocol (PGVector, Chroma, Qdrant). The adapter at index/vector_store_index.py handles batch upsert.

Optional preprocessing (preprocessing/): OCR (ocr.py), table extraction (tables.py), metadata enrichment (enricher.py). Controlled via IngestionConfig.preprocessing_enabled.


The retrieval phase converts a user query into ranked context chunks.

flowchart TD
    Q[User Query] --> QP[Query Processing]
    QP --> R{Strategy}
    R -->|vector| VS[Vector Search]
    R -->|mmr| MMR[MMR]
    R -->|hybrid| H{Hybrid}
    H --> VS
    H --> KS[Keyword Search]
    VS & KS --> F[RRF Fusion]
    VS & MMR & F --> RR{Re-rank?}
    RR -->|cross-encoder| CE[FlashRank]
    RR -->|none| SKIP
    CE & SKIP --> CC{Compress?}
    CC -->|extractive| EXT[Extractive]
    CC -->|abstractive| ABS[LLMLingua-2]
    CC -->|none| PASS
    EXT & ABS & PASS --> OUT[Context]
StrategyClassBehavior
vectorVectorRetrievalStrategySort by pre-computed similarity score
mmrMMRRetrievalStrategyMMR — balance relevance & diversity

Registered in RetrievalStrategyRegistry (retrieval/strategy_registry.py:18). Implements RetrievalStrategyProtocol.

  • Query expansion — generate variants (query/transformers.py)
  • HyDE (hyde/) — Hypothetical Document Embeddings via HyDEStrategyRegistry
  • Query routing (routing/) — dispatch to strategy (rule-based, semantic, LLM)

Handler-based dispatch via RerankingStrategyRegistry (reranking/strategy_registry.py:10). FlashRank registered when optional dep is installed. Custom handlers via can_handle() + create_and_rerank().

Context Compression (context_compression/)

Section titled “Context Compression (context_compression/)”

Strategies: extractive (sentence scoring), abstractive (LLMLingua-2, opt), hybrid, token-limit, semantic dedup. Via CompressionStrategyRegistry.


The synthesis stage constructs a prompt from retrieved context and generates a response.

StrategyMethodRequires LLM
directConcatenate chunksNo
extractiveExtract relevant sentencesNo
abstractiveLLM-generated responseYes
hybridExtractive + abstractiveYes

SynthesisStrategyRegistry (pipeline/stages/synthesis_registry.py:132) dispatches to handlers. Falls back to extractive if abstractive fails and no LLM is available.

SynthesisConfig (synthesis/types.py:201): max_context_length (4000), max_response_length (500), fallback_strategy, min_confidence. Context compression runs before synthesis to fit the LLM’s token budget.

Quality Assurance (pipeline/stages/quality.py:21)

Section titled “Quality Assurance (pipeline/stages/quality.py:21)”

Validates faithfulness, relevance, coherence, hallucination detection, confidence. Below-threshold responses can be rejected (reject_low_quality) or flagged (warn_low_quality).


RAGProvider (di/provider.py:40) — priority DOMAIN.

PhaseAction
__init__(config)Store RAGConfig
register(container)Register config, 6 strategy registries (Compression, HyDE, Reasoning, Synthesis, Chunking, Reranking), knowledge graph. Discover chunking/retrieval providers via lexigram.chunking.strategies and lexigram.retrieval.strategies entry points.
boot(container)Resolve optional WorkingMemoryProtocol and GraphStoreProtocol. Non-fatal if absent.
shutdown()No-op
health_check(timeout)Ping embedding and vector store. Returns DEGRADED if either is missing.

Config key: "ai.rag" — maps to LEX_AI_RAG__* env vars (constants.py:14) or YAML.

sequenceDiagram
    participant P as RAGProvider
    participant C as Container
    P->>C: singleton(RAGConfig)
    P->>C: singleton(CompressionStrategyRegistry.with_defaults())
    P->>C: singleton(HyDEStrategyRegistry.with_defaults())
    P->>C: singleton(ReasoningStrategyRegistry.with_defaults())
    P->>C: singleton(SynthesisStrategyRegistry.with_defaults())
    P->>C: singleton(ChunkingStrategyRegistry.with_defaults())
    P->>C: singleton(RerankingStrategyRegistry.with_defaults())
    opt FlashRank available
        P->>C: register flashrank handler
    end
    opt LLMLingua-2 available
        P->>C: register llmlingua2 handler
    end
    P->>C: singleton(KnowledgeGraph)

Pipeline Executor (pipeline/executor.py:21)

Section titled “Pipeline Executor (pipeline/executor.py:21)”
Error StrategyBehavior
FAIL_FASTRaise immediately
RETRYExponential backoff (max_retries, retry_delay)
SKIPSkip stage, continue
GRACEFULWarning, continue with partial results
FALLBACKExecute fallback

Supports sequential (execute()) and parallel (execute_parallel()) execution.


steps/core.py provides fine-grained PipelineStep implementations for custom pipelines:

StepInputOutput
LoadDocumentsStepSource path/URLlist[Chunk]
SplitDocumentsStepDocumentslist[Chunk]
IndexDocumentsStepChunksint (count)
RetrieveContextStepQuerylist[RAGSearchResult]
GenerateAnswerStepQuery + contextCompletion
TranslationStepChunkslist[Chunk]

From lexigram-contracts:

ProtocolModuleUsage
RAGPipelineProtocolcontracts.ai.ragPipeline entry point
RetrievalStrategyProtocolcontracts.ai.ragPluggable retrieval
RerankingStrategyProtocolcontracts.ai.ragCross-encoder reranking
SynthesizerProtocolcontracts.ai.ragAnswer synthesis
DocumentLoaderProtocolcontracts.ai.ragDocument loading
PromptCompressorProtocolcontracts.ai.ragContext compression
RAGEvaluatorProtocolcontracts.ai.ragQuality evaluation
ChunkProtocolcontracts.ai.ragChunk interface
DocumentProtocol / DocumentVectorStoreProtocolcontracts.ai.vectorDocument + vector store
SearchResultProtocol / ChunkerProtocolcontracts.ai.vectorSearch + chunking
EmbeddingClientProtocol / LLMClientProtocolcontracts.aiEmbedding + LLM (opt)
WorkingMemoryProtocolcontracts.ai.memoryMemory (opt)
GraphStoreProtocolcontracts.data.graph.protocolsKG (opt)
RAGContext / RAGResponsecontracts.ai.ragI/O DTOs
contracts: RAGError → ChunkingError, RetrievalError, SynthesisError
package: RAGError → PreprocessingError, MissingCitationsError, MultimodalError
└─ AudioLoaderError, VideoLoaderError, ImageLoaderError, CLIPEmbeddingError

Domain errors return via Result[T, E]. Infrastructure errors propagate as exceptions.

Package-internal types (not in contracts): Chunk, Context, PipelineContext, SynthesisResult, QualityMetrics, ContextChunk, ChunkingStrategy, SynthesisStrategy.


Query result cache with configurable TTL (default: 3600s). Embedding cache (DEFAULT_EMBEDDING_CACHE_SIZE: 10_000). Metrics: ai.rag.cache.hits, ai.rag.pipeline.duration_ms, ai.rag.retrieved.chunks.

RetrievalCompletedEvent(query_id, documents_retrieved), SynthesisCompletedEvent(query_id, context_chunks) — published via EventBusProtocol.

RAGPipelineStartedHook(pipeline_name), RAGDocumentsRetrievedHook(chunk_count), RAGAnswerSynthesizedHook(pipeline_name) — fired via HookRegistryProtocol.

Hallucination detection, answer quality, context relevance, retrieval precision/recall. Auto-evaluation via PipelineConfig.auto_evaluate_every_n.


PointMechanismLocation
Custom document loaderSubclass AbstractDocumentLoader, register in LoaderRegistryloaders/registry.py
Custom chunkerRegister in ChunkingStrategyRegistrychunking/strategy_registry.py
Custom retrieval strategyEntry-point lexigram.retrieval.strategiesretrieval/strategy_registry.py
Custom reranking handlerRegister in RerankingStrategyRegistryreranking/strategy_registry.py
Custom synthesizerRegister handler in SynthesisStrategyRegistrypipeline/stages/synthesis_registry.py
Custom compressorRegister in CompressionStrategyRegistrycontext_compression/strategy_registry.py
Custom HyDE generatorRegister in HyDEStrategyRegistryhyde/strategy_registry.py
Custom reasoning strategyRegister in ReasoningStrategyRegistryreasoning/strategy_registry.py
Lifecycle hooksHook dataclass + HookRegistryProtocolhooks.py
Event subscribersevent_bus.subscribe(EventClass, handler)events.py
Custom pipeline stageImplement PipelineStageProtocolpipeline/base.py
Composable stepSubclass PipelineStepsteps/core.py

All registries are populated with defaults via with_defaults(). Custom entries are additive.


@module()
class RAGModule(Module):
@classmethod
def configure(cls, config: RAGConfig | None = None) -> DynamicModule:
return DynamicModule(
module=cls,
providers=[RAGProvider(config=config)],
exports=[RAGPipelineProtocol, RetrievalStrategyProtocol],
)
@module(imports=[RAGModule.configure(RAGConfig(chunk_size=512))])
class AppModule(Module):
pass

constants.py: ENV_PREFIX (LEX_AI_RAG__), default chunk size/overlap (512/50), DEFAULT_TOP_K (5), similarity threshold (0.7), cache TTL (3600s), embedding cache size (10_000), metric names (ai.rag.*).