Skip to content
GitHub

Architecture

Internal design of the lexigram-ai-llm package — the LLM client abstraction layer.


Provides a unified LLMClientProtocol across every major LLM provider, sitting between application code and provider SDKs.

flowchart BT
    App[Application Code\nService · Agent · RAG Pipeline]
    AILLM[lexigram-ai-llm\nLLMClientProtocol · StructuredExtractor\nEmbeddingClient · LLMRouter]
    SDKs[Provider SDKs\nopenai · anthropic · google-genai\nboto3 · ollama · mistralai]

    App --> AILLM
    AILLM --> SDKs

Also provides structured output extraction, embedding generation, multi-provider routing, response caching, rate limiting, circuit breakers, and token counting — all behind protocol boundaries.


The central contract. Every provider client implements:

class LLMClientProtocol(Protocol):
async def complete(self, messages, **kwargs) -> Result[Completion, LLMError]: ...
def stream_chat(self, messages, **kwargs) -> AsyncStream[StreamChunk, LLMError]: ...
async def chat(self, messages, tools=None, **kwargs) -> Result[Completion, LLMError]: ...
async def health_check(self, timeout=5.0) -> HealthCheckResult: ...
async def close(self) -> None: ...

Each maps to a concrete class in ProviderRegistry extending AbstractLLMClient:

flowchart LR
    subgraph R[ProviderRegistry]
        OpenAI · Anthropic · Gemini · Ollama · Groq · Mistral
        Cohere · OpenRouter · Bedrock · Vertex · Azure
        Cloudflare · DeepSeek · Together · Fireworks
    end
    R -->|implements| LLMClientProtocol

DeepSeek, Together, and Fireworks extend OpenAICompatibleClient.

AbstractLLMClient implements the shared retry/breaker/pre-flight in complete(), stream_chat(), chat(). Subclasses implement only:

MethodPurpose
_do_complete()Non-streaming completion call
_do_stream_chat()Return AsyncIterator[StreamChunk]
_do_chat()Completion with tool calling
health_check()Provider connectivity probe
_apply_thinking()Inject thinking params into API payload

sequenceDiagram
    participant App as Application
    participant C as AbstractLLMClient
    participant CB as CircuitBreaker
    participant API as Provider API
    participant TC as TokenCounter

    App->>C: complete(messages, model="gpt-4o")
    C->>CB: is_available()?
    CB-->>C: True
    C->>TC: _exceeds_token_limit(messages)
    TC-->>C: False
    loop Retry (up to max_retries + 1)
        C->>API: _do_complete(messages)
        API-->>C: Result[Completion, LLMError]
        alt Ok
            C->>C: _normalize_thinking()
            C->>CB: record_success()
            C-->>App: Ok(Completion)
        else Err & retryable
            C->>CB: record_failure()
            C->>C: _backoff(attempt)
        else Err & not retryable
            C-->>App: Err(LLMError)
        end
    end

Pre-flight: circuit breaker → token limit. Retryable errors (rate limits, 5xx): exponential backoff 2^attempt + 0.1*attempt. Non-retryable: return immediately.


Streaming returns AsyncStream[StreamChunk, LLMError] — a typed async iterable with lazy setup and error coercion.

sequenceDiagram
    participant App as Application
    participant C as AbstractLLMClient
    participant CB as CircuitBreaker
    participant API as Provider SSE

    App->>C: stream_chat(messages)
    C-->>App: AsyncStream (immediate)
    App->>C: async for chunk in stream
    C->>CB: is_available()?
    C->>C: _exceeds_token_limit()?
    Note over C: Lazy — errors surface on first iteration
    loop Retry (up to max_retries + 1)
        C->>API: _do_stream_chat()
        API-->>C: AsyncIterator[StreamChunk]
        loop per-chunk timeout
            API-->>C: StreamChunk(delta="Hello")
            C-->>App: StreamChunk
            API-->>C: StreamChunk(finish_reason="stop")
            C-->>App: StreamChunk (last)
        end
    end

StreamingResponse aggregates chunks (separating answer from thinking content), exposing:

  • total_chunks, total_tokens — counts across the stream
  • total_latency_ms, time_to_first_chunk_ms, chunks_per_second — timing

SSEStreamAdapter converts SSE to StreamChunk. ParallelStreamAggregator merges multiple provider streams.


AbstractEmbeddingAdapter subclasses, registered in EmbeddingProviderRegistry:

flowchart LR
    App[Application] --> Registry[EmbeddingProviderRegistry]
    Registry --> O[OpenAI] & C[Cohere] & V[Voyage] & J[Jina] & L[Local\nsentence-transformers]

All implement embed(texts) -> list[list[float]], get_models(), health_check(). Default models: OpenAI text-embedding-3-small, Cohere embed-english-v3.0, Voyage voyage-3, Jina jina-embeddings-v3, Local all-MiniLM-L6-v2.


StructuredExtractor wraps any LLMClientProtocol and extracts validated, typed data:

extractor = StructuredExtractor(llm_client, default_model="gpt-4o")
result = await extractor.extract(prompt="Analyze sentiment", output_model=Sentiment)
flowchart LR
    Prompt --> Build["Build system prompt\nwith JSON schema"]
    Build --> LLM["LLMClientProtocol.complete()"]
    LLM --> Parse["extract_json_block()\nbracket-counting + fence strip"]
    Parse --> Validate["validate_against_model()"]
    Validate --> Retry{Retry?}
    Retry -->|yes| LLM
    Retry -->|no| Ok[Ok(T)]
    Parse -->|fail| Err1[ExtractionParseError]
    Validate -->|fail| Err2[ExtractionValidationError]
    Retry -->|exhausted| Err3[ExtractionMaxRetriesError]

ParserRegistry default parsers: JSON, Pydantic, Enum, CSV.


sequenceDiagram
    participant C as DI Container
    participant P as LLMProvider
    participant F as create_llm_client
    participant Cl as LLMClient

    C->>P: LLMProvider(config)
    Note over P: __init__: store config, flags
    C->>P: register(container)
    P->>P: Create ProviderRegistry, TokenCounterRegistry, ParserRegistry
    P->>F: Look up client class by provider name
    F-->>P: Concrete client instance
    P->>C: singleton(LLMClientProtocol, client)
    P->>C: singleton(ProviderRegistry)
    alt enable_cache
        P->>C: singleton("llm_cache", cache)
    end
    C->>P: boot(container)
    P->>P: Validate API key format
    P->>Cl: register in ProviderRegistryProtocol
    C->>P: shutdown()
    P->>Cl: client.close()

__init__ stores config/flags. register() creates registries, client, and singletons. boot() validates API key. shutdown() closes connections.

LLMRoutingProvider registers LLMRouter with strategies:

StrategyBehavior
SequentialCascadeStrategyTry providers in priority order
ParallelRaceStrategyFire all providers; return first success
CostOptimizedStrategyCheapest capable provider based on pricing data
LatencyOptimizedStrategyHistorically fastest provider

ContractLocationImplementations
LLMClientProtocolcontracts/ai/llm.pyOpenAIClient, AnthropicClient, etc.
EmbeddingClientProtocolcontracts/ai/llm.pyOpenAIEmbeddingAdapter, etc.
LLMCacheProtocollexigram-ai-llm/protocols.pyLLMCache, RedisLLMCache
ProviderRegistryProtocolcontracts/ai/providers.pyProviderRegistry
TokenCounterProtocolcontracts/ai/llm.pyTiktokenCounter, HuggingFaceCounter
CacheBackendProtocolcontracts/cache/protocols.pyRedisLLMCache
RoutingStrategyProtocollexigram-ai-llm/routing/strategies/4 strategies
QuotaBackendProtocolcontracts/ai/routing.pyRateLimiter
InferenceLoggerProtocolcontracts/ai/routing.pyInferenceLogger
ModulePurpose
module.pyLLMModule entrypoint
config.pyClientConfig
types.pyChatMessage, Completion, StreamChunk
exceptions.pyLeaf exceptions
constants.pyProvider names, defaults, metrics
clients/15 provider implementations
registry/ProviderRegistry
di/LLMProvider, factories
caching/LLMCache, RedisLLMCache
embedding/5 embedding adapters
streaming/StreamingResponse, SSE
structured/StructuredExtractor
parsers/JSON, Pydantic, Enum, CSV
routing/LLMRouter, 4 strategies
pricing/PricingManager, token counting
rate_limiting/Token-bucket RateLimiter
health/ProviderCircuitBreaker
thinking/Thinking tag normalization

PointMechanism
Custom providerSubclass AbstractLLMClient, register in ProviderRegistry
OpenAI-compatibleSubclass OpenAICompatibleClient
Custom embeddingSubclass AbstractEmbeddingAdapter
Custom token counterImplement TokenCounterProtocol
Custom output parserImplement parser, register in ParserRegistry
Custom streaming adapterSubclass AbstractStreamingAdapter
Custom routing strategyImplement RoutingStrategyProtocol
Custom LLM cacheImplement LLMCacheProtocol
Caching backendImplement CacheBackendProtocol from contracts
Output filterSubclass OutputFilter, wrap in SecureLLMClient
Pricing sourceImplement PricingSourceProtocol

Domain bases in lexigram-contracts, leaf exceptions in lexigram-ai-llm. Infrastructure errors raise; domain errors return as Result.

flowchart LR
    subgraph C[contracts]
        AIError --> LLMErr[LLMError]
    end
    subgraph P[lexigram-ai-llm]
        LLMErr --> Auth[LLMAuthenticationError] & Rate[LLMRateLimitError]
        LLMErr --> NotFound[LLMModelNotFoundError] & Quota[LLMQuotaExceededError]
        LLMErr --> Filter[LLMContentFilterError] & Token[TokenLimitError]
        LLMErr --> Conn[ProviderConnectionError] & Stream[StreamError]
        LLMErr --> Extract[ExtractionError]
        Extract --> ParseErr[ExtractionParseError] & ValErr[ExtractionValidationError] & MaxRetry[ExtractionMaxRetriesError]
    end
ErrorTriggerRecovery
LLMAuthenticationErrorInvalid API keyConfig fix
LLMRateLimitError429 responseBackoff + retry
LLMModelNotFoundErrorModel unavailableFallback
LLMQuotaExceededErrorBilling limitSwitch provider
LLMContentFilterErrorSafety filterReformulate
TokenLimitErrorContext windowReduce messages
ProviderConnectionErrorNetwork failureRetry

Circuit breaker: ProviderCircuitBreaker opens after N failures, preventing network calls. Rate limiter: token-bucket per provider, returns LLMRateLimitError before any network call when empty.


@inject
class LLMProvider(Provider):
name = "llm"
priority = ProviderPriority.DOMAIN
async def register(self, container):
container.singleton(ClientConfig, self.config)
container.singleton(ProviderRegistry, registry)
container.singleton(ProviderRegistryProtocol, registry)
container.singleton(LLMClientProtocol, client)
container.singleton(TokenCounterProtocol, default_counter)
async def boot(self, container):
# Validate API key format, register in shared ProviderRegistryProtocol
async def shutdown(self):
await self._llm_client.close()

Usage: LLMModule.configure(ClientConfig(provider="openai", model="gpt-4o")) for single, LLMModule.configure(routing=LLMConfig()) for multi-provider.