Skip to content
GitHubDiscord

API Reference

Protocol for LLM cache implementations.
async def get(key: str | dict[str, Any]) -> Any | None

Get value from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or structured dict).
Returns
TypeDescription
Any | NoneCached value, or ``None`` if not present.
async def set(
    key: str | dict[str, Any],
    value: Any,
    ttl: float | None = None
) -> None

Set value in cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or structured dict).
`value`AnyValue to store.
`ttl`float | NoneOptional time-to-live in seconds.
async def delete(key: str | dict[str, Any]) -> bool

Delete entry from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key to remove.
Returns
TypeDescription
bool``True`` if the key existed and was removed, ``False`` otherwise.
async def clear() -> None

Clear all entries.

def get_stats() -> dict[str, Any]

Return cache statistics.

Returns
TypeDescription
dict[str, Any]Mapping of statistic name to value.

Pricing source from HTTP API endpoint.

Fetches pricing data from a remote API. Useful for getting the latest pricing updates, but requires network connectivity.

Attributes: endpoint: API endpoint URL. timeout: Request timeout in seconds.

Example

source = APIPricingSource( … “https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json” … ) pricing = await source.get_pricing(“gpt-4”)

def __init__(
    endpoint: str,
    timeout: float = 10.0
)

Initialize API pricing source.

Parameters
ParameterTypeDescription
`endpoint`strURL to fetch pricing from.
`timeout`floatRequest timeout in seconds (default: 10).
async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier.
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
async def get_all_pricing() -> dict[str, ModelPricing]

Get all pricing data.

Returns
TypeDescription
dict[str, ModelPricing]All pricing data from API.
property source_name() -> str

Get source name.

def invalidate_cache() -> None

Clear cached pricing data to force refresh.


Abstract base class for pricing data sources.

All pricing sources must implement get_pricing() to return ModelPricing for a given model name, or None if not found.

async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier (e.g., "gpt-4-turbo").
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
async def get_all_pricing() -> dict[str, ModelPricing]

Get all available pricing data.

Returns
TypeDescription
dict[str, ModelPricing]Dictionary mapping model names to pricing.
property source_name() -> str

Get the name of this pricing source.

Returns
TypeDescription
strHuman-readable source name.

Anthropic Claude LLM client implementation.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Claude 3 (Opus, Sonnet, Haiku) models with:

  • Streaming responses
  • Tool calling
  • Vision capabilities
  • Automatic retry and error handling

Example

from lexigram.ai import ClientConfig config = ClientConfig(provider=“anthropic”, model=“claude-3-sonnet-20240229”) client = AnthropicClient(config) completion = await client.complete([ … ChatMessage(role=“user”, content=“Hello!”) … ])

def __init__(config: ClientConfig)

Initialize Anthropic client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
Raises
ExceptionDescription
ImportErrorIf anthropic package is not installed
async def close() -> None

Close the Anthropic client.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Cache entry with metadata.

Attributes: key: Cache key. value: Cached value. created_at: When entry was created. expires_at: When entry expires (Unix timestamp). hits: Number of cache hits. size_bytes: Approximate size in bytes.


Cache statistics.

Attributes: hits: Number of cache hits. misses: Number of cache misses. evictions: Number of evictions. total_entries: Current number of entries. total_size_bytes: Total cache size in bytes.

property hit_rate() -> float

Calculate cache hit rate.


Character-based token count estimator (~4 chars per token).

Always available without any optional dependencies. Suitable as a safe fallback counter.

Parameters
ParameterTypeDescription
`model`Model name (used for identification only).
def __init__(model: str = 'unknown') -> None

Initialize CharEstimateCounter.

Parameters
ParameterTypeDescription
`model`strModel name for identification.
property model() -> str

The model this counter is calibrated for.

def count(text: str) -> int

Count tokens using character estimation.

def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages.


A single chat message.

Implements ChatMessageProtocol with DomainModel semantics for validation.

Example

msg = ChatMessage(role=“user”, content=“Hello, how are you?”)


Configuration for LLM clients.

Example

config = ClientConfig( … provider=“openai”, … model=“gpt-4-turbo”, … api_key=“sk-…”, … temperature=0.7, … max_tokens=2000, … )


Client for Cohere's enterprise NLP API.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Chat, Embeddings, and Reranking with:

  • RAG-optimized models (Command R/R+)
  • High-performance embeddings
  • Native reranking support
def __init__(config: ClientConfig) -> None

Initialize Cohere client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
property api_key() -> SecretStr

Get API key from config.

property base_url() -> str

Get base URL from config.

async def embed(
    texts: list[str] | str,
    model: str = 'embed-english-v3.0',
    input_type: str = 'search_document',
    **kwargs: Any
) -> list[list[float]]

Generate embeddings.

Parameters
ParameterTypeDescription
`texts`list[str] | strText or list of texts to embed.
`model`strModel ID (default: "embed-english-v3.0").
`input_type`strType of input ("search_document", "search_query", "classification", "clustering"). **kwargs: Additional parameters.
Returns
TypeDescription
list[list[float]]List of embedding vectors.

Example

doc_embeddings = await client.embed( … texts=[“Doc 1”, “Doc 2”], … input_type=“search_document” … )

query_embedding = await client.embed( … texts=“What is AI?”, … input_type=“search_query” … )

async def rerank(
    query: str,
    documents: list[str] | list[dict[str, str]],
    model: str = 'rerank-english-v3.0',
    top_n: int | None = None,
    **kwargs: Any
) -> list[dict[str, Any]]

Rerank documents for a query.

Parameters
ParameterTypeDescription
`query`strSearch query.
`documents`list[str] | list[dict[str, str]]List of documents (strings or dicts with 'text' key).
`model`strReranking model (default: "rerank-english-v3.0").
`top_n`int | NoneReturn top N results (default: all). **kwargs: Additional parameters.
Returns
TypeDescription
list[dict[str, Any]]List of ranked documents with scores.

Example

results = await client.rerank( … query=“What is machine learning?”, … documents=[ … “ML is a subset of AI…”, … “Unrelated document…”, … “Deep learning uses neural networks…” … ], … top_n=2 … ) for result in results: … print(f”Score: {result[‘relevance_score’]:.3f} - {result[‘document’][‘text’]}”)

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the provider.

async def close() -> None

Close the HTTP client.


LLM completion response.

Implements completion semantics with DomainModel for validation and additional fields.

Example

completion = Completion( … content=“Hello! I’m doing well, thank you.”, … model=“gpt-4-turbo”, … usage=TokenUsage(prompt_tokens=10, completion_tokens=8, total_tokens=18) … )


Configuration for conversation management.

Example

config = ConversationConfig( … max_tokens=4096, … reserve_tokens=1000, … trim_strategy=“oldest” … )


Manage multi-turn conversations with automatic context window management.

This class handles:

  • Message history management
  • Automatic token counting
  • Context window trimming
  • System prompt handling
  • Conversation statistics

Example

from lexigram.ai.llm import OpenAIClient, ConversationManager

client = OpenAIClient(api_key=“sk-…”, model=“gpt-4”) manager = ConversationManager( … client=client, … system_prompt=“You are a helpful assistant.”, … max_tokens=4096 … )

response = await manager.chat(“What is Python?”) print(response.content)

response = await manager.chat(“Tell me more about it”) print(response.content)

history = manager.get_history() stats = manager.get_stats() print(f”Total messages: {stats.total_messages}”) print(f”Total tokens: {stats.total_tokens}”)

def __init__(
    client: AbstractLLMClient,
    system_prompt: str | None = None,
    max_tokens: int = 4096,
    reserve_tokens: int = 1000,
    trim_strategy: str = 'oldest',
    metadata: Metadata | None = None,
    token_counter: TokenCounterProtocol | None = None
) -> None

Initialize conversation manager.

Parameters
ParameterTypeDescription
`client`AbstractLLMClientLLM client for completions
`system_prompt`str | NoneOptional system prompt (prepended to all conversations)
`max_tokens`intMaximum context window size
`reserve_tokens`intTokens to reserve for completion
`trim_strategy`strMessage trimming strategy ('oldest', 'middle', 'summary')
`metadata`Metadata | NoneAdditional metadata for the conversation
`token_counter`TokenCounterProtocol | NoneOptional TokenCounterProtocol implementation. If not provided, uses CharEstimateCounter.
async def chat(
    message: str,
    role: Role = Role.USER,
    **completion_kwargs: Any
) -> Completion

Send a message and get a response.

Parameters
ParameterTypeDescription
`message`strMessage content
`role`RoleMessage role (default: USER) **completion_kwargs: Additional kwargs for completion
Returns
TypeDescription
CompletionCompletion response from LLM

Example

response = await manager.chat(“Hello!”) print(response.content)

async def add_message(
    role: Role,
    content: str,
    update_stats: bool = True
) -> None

Add a message to conversation history without getting a response.

Parameters
ParameterTypeDescription
`role`RoleMessage role
`content`strMessage content
`update_stats`boolWhether to update statistics

Example

await manager.add_message(Role.USER, “Hello”) await manager.add_message(Role.ASSISTANT, “Hi there!”)

def get_history(
    include_system: bool = True,
    limit: int | None = None
) -> list[ChatMessage]

Get conversation history.

Parameters
ParameterTypeDescription
`include_system`boolInclude system message in history
`limit`int | NoneMaximum number of messages to return (most recent)
Returns
TypeDescription
list[ChatMessage]List of chat messages

Example

history = manager.get_history(limit=10) for msg in history: … print(f”{msg.role}: {msg.content}”)

def get_stats() -> ConversationStats

Get conversation statistics.

Returns
TypeDescription
ConversationStatsConversation statistics

Example

stats = manager.get_stats() print(f”Total tokens: {stats.total_tokens}”)

def clear_history(keep_system: bool = True) -> None

Clear conversation history.

Parameters
ParameterTypeDescription
`keep_system`boolKeep system message when clearing

Example

manager.clear_history()

def update_system_prompt(system_prompt: str) -> None

Update the system prompt.

Parameters
ParameterTypeDescription
`system_prompt`strNew system prompt

Example

manager.update_system_prompt(“You are a Python expert.”)

def get_token_count() -> int

Get current total token count.

Returns
TypeDescription
intTotal tokens in conversation

Example

tokens = manager.get_token_count() print(f”Current tokens: {tokens}”)

def get_available_tokens() -> int

Get available tokens for completion.

Returns
TypeDescription
intAvailable tokens (max_tokens - current_tokens - reserve_tokens) Can be negative if context window is exceeded

Example

available = manager.get_available_tokens() print(f”Available for completion: {available}”)

def export_history() -> dict[str, Any]

Export conversation history to dictionary.

Returns
TypeDescription
dict[str, Any]Dictionary with conversation data (JSON-serializable)

Example

data = manager.export_history() from lexigram import serialization as json with open(“conversation.json”, “w”) as f: … json.dump(data, f)

def from_history(
    cls,
    client: AbstractLLMClient,
    history_data: dict[str, Any]
) -> ConversationManager

Create conversation manager from exported history.

Parameters
ParameterTypeDescription
`client`AbstractLLMClientLLM client
`history_data`dict[str, Any]Exported history data
Returns
TypeDescription
ConversationManagerConversationManager instance

Example

from lexigram import serialization as json with open(“conversation.json”) as f: … data = json.load(f) manager = ConversationManager.from_history(client, data)


Statistics for a conversation.

Example

stats = ConversationStats( … total_messages=10, … total_tokens=2048, … user_messages=5, … assistant_messages=5 … )


Cost estimation result.

Attributes: prompt_cost: Cost for prompt tokens. completion_cost: Cost for completion tokens. total_cost: Total estimated cost. currency: Currency code (default: USD). model: Model name. rate_per_1k_prompt: Rate per 1000 prompt tokens. rate_per_1k_completion: Rate per 1000 completion tokens.


Function call request from LLM.

Default generation parameters applied to every routing attempt.

Example

defaults = GenerationDefaults(temperature=0.3, max_tokens=2048)


Client for Groq's ultra-fast LLM inference API.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Chat, Stream, and Vision with:

  • Ultra-fast LPU hardware synergy
  • OpenAI-compatible API surface
  • Blazing-fast token generation
def __init__(config: ClientConfig)

Initialize Groq client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
property api_key() -> SecretStr

Get API key from config.

property base_url() -> str

Get base URL from config.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Run a lightweight provider health probe.

async def list_models() -> list[dict[str, Any]]

List models available from the Groq API.

async def close() -> None

Close the HTTP client.

Example

await client.close()


Token counter using HuggingFace AutoTokenizer (lazy-loaded).

When constructed without a model, uses character estimation (~4 chars/token). When constructed with a model name, lazy-loads that model’s tokenizer on first use.

Parameters
ParameterTypeDescription
`model`Optional HuggingFace model name. If None, uses char estimation fallback.
def __init__(model: str | None = None) -> None

Initialize HuggingFaceCounter.

Parameters
ParameterTypeDescription
`model`str | NoneOptional HuggingFace model name for tokenizer loading.
property model() -> str

Backend identifier.

def count(text: str) -> int

Count tokens in a text string.

def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages.


An image pre-encoded as base64 in a multimodal message.

Attributes: data: Raw base64-encoded bytes (no data: prefix). media_type: MIME type, e.g. "image/jpeg". type: Discriminator field, always "image_base64".


An image specified by URL in a multimodal message.

The framework passes the URL through to providers that support it natively (OpenAI, Anthropic, Gemini). For providers that require base64 (Ollama, Bedrock), the client fetches and converts.

Attributes: url: Public or data-URI URL of the image. detail: OpenAI vision detail level ("auto", "low", "high"). type: Discriminator field, always "image_url".


Structured extraction from LLM completions using instructor library.

Extracts typed Pydantic models from LLM responses by:

  1. Building a ChatMessage list with extraction instructions
  2. Calling llm_client.complete() to get a Completion
  3. Parsing the completion text as JSON
  4. Validating against the response_model
  5. Retrying on validation/parse failures up to max_retries

Unlike direct instructor usage, this implementation uses the standard LLMClientProtocol.complete() method, avoiding coupling to provider-specific client patching mechanisms.

Example

from pydantic import BaseModel
class UserInfo(BaseModel):
name: str
age: int
extractor = InstructorExtractor(llm_client)
result = await extractor.extract(
prompt="Extract user info from: 'John is 30 years old'",
response_model=UserInfo,
)
if result.is_ok():
user = result.unwrap()
print(user.name, user.age)
else:
error = result.unwrap_err()
# handle ExtractionError
def __init__(
    llm_client: LLMClientProtocol,
    mode: str = 'json',
    max_retries: int = 3
) -> None

Initialize InstructorExtractor.

Parameters
ParameterTypeDescription
`llm_client`LLMClientProtocolLLMClientProtocol instance for making LLM calls.
`mode`strInstructor patching mode (reserved for future provider-level integration; currently unused).
`max_retries`intMaximum number of retries on validation/parse failure.
async def extract(
    prompt: str,
    response_model: type[T],
    context: list | None = None,
    **kwargs: Any
) -> Result[T, ExtractionError]

Extract a structured response_model instance from an LLM call.

Parameters
ParameterTypeDescription
`prompt`strUser prompt for extraction.
`response_model`type[T]Pydantic BaseModel class to extract and validate.
`context`list | NoneOptional list of additional ChatMessage objects for context. **kwargs: Additional parameters passed to llm_client.complete().
Returns
TypeDescription
Result[T, ExtractionError]``Ok(instance)`` on successful extraction and validation. ``Err(ExtractionError)`` on parse, validation, or max retries failure.
Raises
ExceptionDescription

Extract and parse JSON from LLM responses.
def extract(
    text: str,
    multiple: bool = False
) -> Any

Extract JSON from text.

def extract_array(text: str) -> list[Any]

Extract JSON array from text.


Pricing source from local JSON file.

This is the fastest and most reliable source as it doesn’t require network calls and works offline.

Attributes: file_path: Path to JSON pricing file. cache: In-memory cache of loaded pricing.

Example

source = JSONFilePricingSource(Path(“custom_pricing.json”)) pricing = await source.get_pricing(“gpt-4-turbo”)

def __init__(file_path: Path)

Initialize JSON file pricing source.

Parameters
ParameterTypeDescription
`file_path`PathPath to JSON file containing pricing data.
async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier.
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
async def get_all_pricing() -> dict[str, ModelPricing]

Get all pricing data.

Returns
TypeDescription
dict[str, ModelPricing]All pricing data from JSON file.
property source_name() -> str

Get source name.

def invalidate_cache() -> None

Clear cached pricing data to force reload.


In-memory cache for LLM responses with TTL.

Implements LRU eviction when max_size is reached.

Parameters
ParameterTypeDescription
`ttl`Time-to-live in seconds (default: 1 hour).
`max_size`Maximum number of entries (default: 1000).
`max_size_bytes`Maximum cache size in bytes (default: 100MB).

Example

cache = LLMCache(ttl=3600, max_size=500) result = await cache.get(“key”) await cache.set(“key”, “value”)

def __init__(
    ttl: float = 3600,
    max_size: int = 1000,
    max_size_bytes: int = 100 * 1024 * 1024
)

Initialize LLM cache.

Parameters
ParameterTypeDescription
`ttl`floatTime-to-live in seconds.
`max_size`intMaximum number of entries.
`max_size_bytes`intMaximum total size in bytes.
async def get(key: str | dict[str, Any]) -> Any | None

Get value from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or dict).
Returns
TypeDescription
Any | NoneCached value or None if not found/expired.
async def set(
    key: str | dict[str, Any],
    value: Any,
    ttl: float | None = None
) -> None

Set value in cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or dict).
`value`AnyValue to cache.
`ttl`float | NoneOptional TTL override.
async def get_or_compute(
    key: str | dict[str, Any],
    compute_fn: Callable[[], Any],
    ttl: float | None = None
) -> Any

Get from cache or compute and cache result.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
`compute_fn`Callable[[], Any]Function to compute value if cache miss.
`ttl`float | NoneOptional TTL override.
Returns
TypeDescription
AnyCached or computed value.

Example

result = await cache.get_or_compute( … key=“greeting”, … compute_fn=lambda: llm.complete(“Say hello”) … )

async def delete(key: str | dict[str, Any]) -> bool

Delete entry from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key to delete.
Returns
TypeDescription
boolTrue if entry was deleted.
async def clear() -> None

Clear all cache entries.

def get_stats() -> CacheStats

Get cache statistics.

Returns
TypeDescription
CacheStatsCacheStats object.

Emitted when an LLM completion is received.

Distinct from LLMCallStartedHook (which intercepts); this is the immutable record that a completion happened.

Consumed by: cost accounting, audit, safety review.


Root configuration object for the LLM routing system.

All providers are opt-in: a provider joins the cascade only when its credential environment variable is set. Use from_env to build from LEX_AI_LLM__ environment variables.

Example

config = LLMConfig(
providers=[
ProviderConfig(name="groq", model="llama-3.3-70b-versatile", api_key="gsk_..."),
ProviderConfig(name="gemini", model="gemini-2.5-flash", api_key="AIza..."),
],
defaults=GenerationDefaults(temperature=0.3),
)

Environment variables (prefix LEX_AI_LLM__)

Global:
LEX_AI_LLM__STRATEGY sequential | parallel_race |
cost_optimized | latency_optimized
LEX_AI_LLM__DEFAULTS__TEMPERATURE float (default 0.2)
LEX_AI_LLM__DEFAULTS__MAX_TOKENS int (default: provider default)
LEX_AI_LLM__QUOTA__BACKEND memory | database (default memory)
LEX_AI_LLM__LOG__BACKEND memory | database (default memory)
LEX_AI_LLM__LOG__MAX_ENTRIES int (default 1000)
Per-provider (pattern: LEX_AI_LLM__PROVIDERS__{NAME}__{FIELD}):
__{NAME}__API_KEY str API key -- activates key-auth providers
__{NAME}__BASE_URL str Endpoint -- activates local/custom providers
__{NAME}__MODEL str Model override (has per-provider defaults)
__{NAME}__TIMEOUT int Request timeout in seconds (default 30)
__{NAME}__ENABLED bool Explicit enable/disable (default true)
Supported provider names and their activation:
OPENAI API_KEY required default model: gpt-4o
ANTHROPIC API_KEY required default model: claude-3-5-sonnet-20241022
GROQ API_KEY required default model: llama-3.3-70b-versatile
GEMINI API_KEY required default model: gemini-2.5-flash
MISTRAL API_KEY required default model: mistral-large-latest
COHERE API_KEY required default model: command-r-plus
OPENROUTER API_KEY required default model: openai/gpt-4o-mini
DEEPSEEK API_KEY required default model: deepseek-chat
TOGETHER API_KEY required default model: meta-llama/Llama-3-8b-chat-hf
FIREWORKS API_KEY required default model: accounts/fireworks/models/llama-v3-70b-instruct
OLLAMA BASE_URL required default model: llama3.2 (default base: http://localhost:11434)
LOCAL BASE_URL + MODEL required (generic OpenAI-compatible: LM Studio, VLLM, etc.)
Azure-specific extras (activated by AZURE__API_KEY + AZURE__BASE_URL):
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_RESOURCE
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_DEPLOYMENT
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_API_VERSION
Cloudflare-specific extras (activated by CLOUDFLARE__EXTRAS__CF_ACCOUNT_ID):
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__EXTRAS__CF_ACCOUNT_ID <- activates
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__EXTRAS__CF_API_TOKEN
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__MODEL
AWS Bedrock extras (activated by BEDROCK__EXTRAS__AWS_REGION):
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_REGION <- activates
LEX_AI_LLM__PROVIDERS__BEDROCK__MODEL
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_ACCESS_KEY_ID
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_SECRET_ACCESS_KEY
Google Vertex AI extras (activated by VERTEX__EXTRAS__VERTEX_PROJECT):
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_PROJECT <- activates
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_LOCATION
LEX_AI_LLM__PROVIDERS__VERTEX__MODEL
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_CREDENTIALS_FILE
def from_env(cls) -> LLMConfig

Build a routing config from LEX_AI_LLM__ environment variables.

Returns
TypeDescription
LLMConfigPopulated LLMConfig.

LLM client and model-management integration.

Call configure to register an LLMClientProtocol implementation and optional model manager for injection.

Usage

from lexigram.ai.llm.config import ClientConfig
@module(
imports=[
LLMModule.configure(
ClientConfig(provider="openai", model="gpt-4o")
)
]
)
class AppModule(Module):
pass

Multi-provider routing

from lexigram.ai.llm import LLMModule
@module(
imports=[LLMModule.configure(routing=LLMConfig())]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: ClientConfig | Any | None = None,
    *,
    routing: LLMConfig | Any | None = None,
    enable_model_manager: bool = False,
    enable_streaming: bool = True
) -> DynamicModule

Create an LLMModule with a single configured provider.

Parameters
ParameterTypeDescription
`config`ClientConfig | Any | NoneClientConfig or ``None`` to read configuration from environment variables.
`routing`LLMConfig | Any | NoneOptional LLMConfig enabling the multi-provider routing layer instead of the single-provider client.
`enable_model_manager`boolRegister LLMModelManager for local model lifecycle control.
`enable_streaming`boolEnable streaming response support. Defaults to ``True``; set to ``False`` to restrict to non-streaming clients only.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(
    cls,
    config: ClientConfig | Any | None = None
) -> DynamicModule

Create an LLMModule suitable for unit and integration testing.

Uses a no-op or stub LLM client with minimal external dependencies. Streaming is disabled by default to simplify test assertions.

Parameters
ParameterTypeDescription
`config`ClientConfig | Any | NoneOptional ClientConfig override. Uses safe test defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Provider that registers LLM services with the Lexigram DI container.

Registers an LLMClientProtocol, optional LLM response cache, and an LLMModelManager so all three are injectable throughout the application.

Example

from lexigram.ai.llm.di.provider import LLMProvider from lexigram.ai.llm.config import ClientConfig

app.use(LLMProvider(ClientConfig(provider=“openai”, model=“gpt-4o”)))

class MyService: … def init(self, llm: LLMClientProtocol) -> None: … self.llm = llm

def __init__(
    config: ClientConfig | None = None,
    enable_model_manager: bool = False,
    enable_streaming: bool = True,
    name: str = 'llm',
    cache_backend: CacheBackendProtocol | None = None
) -> None

Initialize the LLM Provider.

Parameters
ParameterTypeDescription
`config`ClientConfig | NoneLLM client configuration; defaults to ClientConfig() (reads env).
`enable_model_manager`boolRegister LLMModelManager for local model control.
`enable_streaming`boolEnable streaming response support.
`name`strProvider name used for identification.
`cache_backend`CacheBackendProtocol | NoneInjected cache backend for optional response caching.
async def register(container: ContainerRegistrarProtocol) -> None

Register LLM services with the DI container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe Lexigram DI container registrar.
async def boot(container: ContainerResolverProtocol) -> None

Boot the LLM provider — validates API key presence and format.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container resolver.
async def shutdown() -> None

Close client connections on application shutdown.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Return basic health information for the registered LLM client.


Payload fired when an LLM provider is registered in the provider registry.

Attributes: provider: Identifier of the provider that was registered.


Payload fired when an LLM request is dispatched to a provider.

Attributes: provider: Provider identifier (e.g. "openai"). model: Model name targeted by the request (e.g. "gpt-4o").


Payload fired when a complete LLM response is received from a provider.

Attributes: provider: Provider identifier that returned the response. model: Model name that produced the response.


Provider that registers the multi-provider LLM router with the DI container.

Builds the LLMRouter from a LLMConfig, chooses the appropriate quota backend and inference logger, and registers everything as singletons.

Example

from lexigram.ai.llm.module import LLMModule from lexigram.ai.llm.routing import LLMConfig

app.use(LLMModule.configure(routing=LLMConfig.from_env()))

class MyService: … def init(self, router: LLMRouterProtocol) -> None: … self.router = router

def __init__(
    config: LLMConfig | None = None,
    database_provider: DatabaseProviderProtocol | None = None,
    model_selector: ModelSelector | None = None
) -> None

Initialise the LLM routing provider.

Parameters
ParameterTypeDescription
`config`LLMConfig | NoneRouting configuration; defaults to ``LLMConfig.from_env()``.
`database_provider`DatabaseProviderProtocol | NoneInjected DB provider used when ``quota.backend`` or ``logging.backend`` is ``database``.
`model_selector`ModelSelector | NoneOptional model selector for capability-based routing. When provided, ``required_capabilities`` in route kwargs will filter providers whose models lack the requested capabilities.
async def register(container: ContainerRegistrarProtocol) -> None

Build and register the LLMRouter with the DI container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe Lexigram DI container registrar.
async def boot(container: ContainerResolverProtocol) -> None

Boot phase — no-op for this provider.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container resolver.
async def shutdown() -> None

Close all routing clients on application shutdown.

async def health_check(timeout: float = 5.0) -> dict[str, Any]

Return basic health information for the router.

Parameters
ParameterTypeDescription
`timeout`floatUnused; retained for interface compatibility.
Returns
TypeDescription
dict[str, Any]A dict with ``status`` and ``providers`` keys.

Configuration for inference attempt logging.

Example

cfg = LogConfig(backend=“database”, max_entries=5000)


Client for Mistral AI's LLM API.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Chat, Stream, and Embeddings with:

  • High-performance European LLMs
  • GDPR compliance and data sovereignty
  • Function calling and JSON mode
def __init__(config: ClientConfig)

Initialize Mistral client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
property api_key() -> SecretStr

Get API key from config.

property base_url() -> str

Get base URL from config.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the Mistral API.

Calls the models endpoint to verify the API key is valid and the service is reachable.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for the response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult.
async def embed(
    model: str = 'mistral-embed',
    input_texts: list[str] | str | None = None,
    **kwargs
) -> list[list[float]]

Generate embeddings.

Parameters
ParameterTypeDescription
`model`strModel ID (default: "mistral-embed").
`input_texts`list[str] | str | NoneText or list of texts to embed. **kwargs: Additional parameters.
Returns
TypeDescription
list[list[float]]List of embedding vectors.

Example

embeddings = await client.embed( … input_texts=[“Hello world”, “Bonjour monde”] … ) print(f”Embedding dimension: {len(embeddings[0])}”)

async def close() -> None

Close the HTTP client.

Example

await client.close()


Token counter using mistral-common tokenizer (lazy-loaded).

Tokenizer is loaded on first use, not at construction time.

def __init__() -> None

Initialize MistralCounter.

property model() -> str

Backend identifier.

def count(text: str) -> int

Count tokens in a text string.

def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages.


Model capabilities and constraints.

Pricing information for a specific LLM model.

Attributes: model: Model identifier (e.g., “gpt-4-turbo”, “claude-3-opus”). prompt_per_1m: Cost per 1 million prompt tokens in USD. completion_per_1m: Cost per 1 million completion tokens in USD. provider: Provider name (e.g., “openai”, “anthropic”). last_updated: When pricing was last updated. source: Where pricing data came from (e.g., “json”, “api”, “static”).

Example

pricing = ModelPricing( … model=“gpt-4-turbo”, … prompt_per_1m=10.00, … completion_per_1m=30.00, … provider=“openai” … ) print(f”${pricing.prompt_per_1m} per 1M prompt tokens”)

def serialize_model(handler) -> Any

Custom serializer to handle datetime objects.


Intelligent model selector with fallback support.

Automatically selects the best model based on prompt characteristics and provides fallback chains for reliability.

Example

selector = ModelSelector( … default_model=“gpt-3.5-turbo”, … strategies=[ … SelectionStrategy( … name=“complex”, … model=“gpt-4-turbo”, … conditions={“min_tokens”: 1000} … ), … SelectionStrategy( … name=“simple”, … model=“claude-3-haiku-20240307”, … conditions={“max_tokens”: 500} … ) … ], … fallback_chain=[“gpt-4-turbo”, “gpt-3.5-turbo”] … )

model = selector.select(“Long prompt here…”) print(model) ‘gpt-4-turbo’

fallback = selector.get_fallback(“gpt-4-turbo”) print(fallback) ‘gpt-3.5-turbo’

def __init__(
    default_model: str | None = None,
    strategies: list[SelectionStrategy] | None = None,
    fallback_chain: list[str] | None = None,
    model_capabilities: dict[str, ModelCapabilities] | None = None,
    token_counter: TokenCounterProtocol | None = None
)

Initialize model selector.

Parameters
ParameterTypeDescription
`default_model`str | NoneDefault model to use
`strategies`list[SelectionStrategy] | NoneList of selection strategies
`fallback_chain`list[str] | NoneOrdered list of fallback models
`model_capabilities`dict[str, ModelCapabilities] | NoneCustom model capabilities
`token_counter`TokenCounterProtocol | NoneToken counter for prompt analysis

Example

selector = ModelSelector( … default_model=“gpt-3.5-turbo”, … fallback_chain=[“gpt-4”, “claude-3-sonnet-20240229”] … )

def select(
    prompt: str,
    context: dict[str, Any] | None = None,
    required_capabilities: list[str] | None = None
) -> str

Select the best model for the given prompt.

Parameters
ParameterTypeDescription
`prompt`strThe prompt text
`context`dict[str, Any] | NoneAdditional context for selection
`required_capabilities`list[str] | NoneRequired capabilities (e.g., ["supports_functions"])
Returns
TypeDescription
strSelected model name

Example

model = selector.select( … “Analyze this image…”, … required_capabilities=[“supports_vision”] … ) print(model) ‘gpt-4-turbo’

def get_fallback(failed_model: str) -> str | None

Get the next model in the fallback chain.

Parameters
ParameterTypeDescription
`failed_model`strThe model that failed
Returns
TypeDescription
str | NoneNext fallback model, or None if no fallback available

Example

fallback = selector.get_fallback(“gpt-4-turbo”) print(fallback) ‘gpt-3.5-turbo’

def get_capabilities(model: str) -> ModelCapabilities | None

Get capabilities for a model.

Parameters
ParameterTypeDescription
`model`strModel name
Returns
TypeDescription
ModelCapabilities | NoneModel capabilities or None if unknown

Example

caps = selector.get_capabilities(“gpt-4-turbo”) print(caps.max_tokens) 128000

def estimate_cost(
    model: str,
    input_tokens: int,
    output_tokens: int
) -> float

Estimate cost for a model call.

Parameters
ParameterTypeDescription
`model`strModel name
`input_tokens`intNumber of input tokens
`output_tokens`intNumber of output tokens
Returns
TypeDescription
floatEstimated cost in USD

Example

cost = selector.estimate_cost(“gpt-4-turbo”, 1000, 500) print(f”${cost:.4f}”) $0.0250


Ollama LLM client for local models.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports running LLMs locally with Ollama:

  • Llama 3, Mistral, Phi, and other open models
  • Streaming responses
  • Zero API costs
  • Full data privacy

Example

from lexigram.ai import ClientConfig config = ClientConfig( … provider=“ollama”, … model=“llama3:8b”, … api_base=“http://localhost:11434” … ) client = OllamaClient(config) completion = await client.complete([ … ChatMessage(role=“user”, content=“Hello!”) … ])

def __init__(config: ClientConfig)

Initialize Ollama client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
Raises
ExceptionDescription
ImportErrorIf ollama package is not installed
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the Ollama daemon.

Calls list() to verify the daemon is running and reachable.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for the response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult.
async def close() -> None

Close Ollama client.


OpenAI LLM client implementation.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports GPT-4, GPT-3.5-Turbo, and other OpenAI models with:

  • Streaming responses
  • Function/tool calling
  • Vision models
  • Automatic retry with exponential backoff
  • Error handling and rate limit management

Example

from lexigram.ai import ClientConfig config = ClientConfig(provider=“openai”, model=“gpt-4-turbo”) client = OpenAIClient(config) completion = await client.complete([ … ChatMessage(role=“user”, content=“Hello!”) … ])

def __init__(config: ClientConfig)

Initialize OpenAI client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
Raises
ExceptionDescription
ImportErrorIf openai package is not installed
async def close() -> None

Close the OpenAI client and cleanup resources.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Client for OpenRouter (OpenAI-compatible) API.

Conforms to: LLMClientProtocol protocol via structural typing.

def __init__(config: ClientConfig)

Initialize OpenRouter client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
property api_key() -> SecretStr

Get API key from config.

property base_url() -> str

Get base URL from config.

property model() -> str

Get default model from config.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the OpenRouter API.

Calls the models listing endpoint to verify the API key is valid and the service is reachable.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for the response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult.
async def embeddings(
    texts: list[str],
    **kwargs: Any
) -> list[list[float]]
async def close() -> None

Filter LLM output for sensitive information.

Prevents leaking of system prompts, internal data, etc.

def filter_output(
    output: str,
    system_prompt: str
) -> str

Filter LLM output for leaks.

Parameters
ParameterTypeDescription
`output`strLLM output
`system_prompt`strSystem prompt (check if leaked)
Returns
TypeDescription
strFiltered output

Manages pricing data from multiple sources with caching.

Sources are queried in order until pricing is found. Typical hierarchy:

  1. JSON file (fastest, most reliable)
  2. API endpoints (for updates)
  3. Static fallback (hardcoded)

Attributes: sources: List of pricing sources in priority order. cache: Pricing cache instance. enable_fuzzy_match: Whether to enable fuzzy model name matching.

Example

manager = PricingManager.from_defaults()

manager = ( … PricingManager.builder() … .add_json_source(“pricing.json”) … .add_api_source(“https://api.example.com/pricing”) … .with_cache_ttl(3600) … .enable_fuzzy_matching() … .build() … )

pricing = await manager.get_pricing(“gpt-4-turbo”)

def __init__(
    sources: Sequence[AbstractPricingSource],
    cache_ttl: int = 86400,
    enable_fuzzy_match: bool = True
)

Initialize pricing manager.

Parameters
ParameterTypeDescription
`sources`Sequence[AbstractPricingSource]List of pricing sources in priority order.
`cache_ttl`intCache TTL in seconds (default: 24 hours).
`enable_fuzzy_match`boolEnable fuzzy model name matching (default: True).
async def get_pricing(
    model: str,
    force_refresh: bool = False
) -> ModelPricing

Get pricing for a specific model.

Queries sources in order:

  1. Cache (if not force_refresh)
  2. Each source in priority order
  3. Fuzzy match if enabled
  4. Default fallback
Parameters
ParameterTypeDescription
`model`strModel identifier (e.g., "gpt-4-turbo").
`force_refresh`boolBypass cache and fetch fresh data.
Returns
TypeDescription
ModelPricingModelPricing for the model.
Raises
ExceptionDescription
ValueErrorIf model not found in any source.
async def list_models(provider: str | None = None) -> list[str]

List all available models.

Parameters
ParameterTypeDescription
`provider`str | NoneFilter by provider (optional).
Returns
TypeDescription
list[str]List of model names.
async def clear_cache() -> None

Clear pricing cache.

def from_defaults(cls) -> PricingManager

Create manager with default configuration.

Uses LiteLLM API for dynamic, up-to-date pricing data. No static pricing files - always fetches current data.

Returns
TypeDescription
PricingManagerPricingManager with API source.

Example

manager = PricingManager.from_defaults() pricing = await manager.get_pricing(“gpt-4”)

def from_json(
    cls,
    file_path: str | Path,
    cache_ttl: int = 86400
) -> PricingManager

Create manager from JSON file only.

Useful for offline applications or when you want full control over pricing data.

Parameters
ParameterTypeDescription
`file_path`str | PathPath to JSON pricing file.
`cache_ttl`intCache TTL in seconds (default: 24 hours).
Returns
TypeDescription
PricingManagerPricingManager with JSON source only.

Example

manager = PricingManager.from_json(“my_pricing.json”) pricing = await manager.get_pricing(“custom-model”)

def from_api(
    cls,
    endpoint: str,
    cache_ttl: int = 86400
) -> PricingManager

Create manager from API endpoint only.

Parameters
ParameterTypeDescription
`endpoint`strAPI endpoint URL.
`cache_ttl`intCache TTL in seconds (default: 24 hours).
Returns
TypeDescription
PricingManagerPricingManager with API source only.

Example

manager = PricingManager.from_api(“https://api.example.com/pricing”) pricing = await manager.get_pricing(“gpt-4”)

def builder(cls) -> PricingManagerBuilder

Create a builder for custom configuration.

Returns
TypeDescription
PricingManagerBuilderPricingManagerBuilder instance.

Example

manager = ( … PricingManager.builder() … .add_json_source(“custom.json”) … .add_api_source(“https://api.example.com”) … .with_cache_ttl(3600) … .build() … )


Builder for PricingManager with validation.

Provides a fluent API for configuring pricing sources safely.

Example

manager = ( … PricingManager.builder() … .add_json_source(“pricing.json”) … .add_api_source(“https://api.example.com/pricing”) … .add_fallback({“custom-model”: ModelPricing(…)}) … .with_cache_ttl(3600) … .enable_fuzzy_matching() … .build() … )

def __init__() -> Any

Initialize builder.

def add_json_source(file_path: str | Path) -> PricingManagerBuilder

Add JSON file pricing source.

Parameters
ParameterTypeDescription
`file_path`str | PathPath to JSON file.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
def add_api_source(
    endpoint: str,
    timeout: float = 10.0
) -> PricingManagerBuilder

Add API endpoint pricing source.

Parameters
ParameterTypeDescription
`endpoint`strAPI endpoint URL.
`timeout`floatRequest timeout in seconds (default: 10).
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
def add_fallback(pricing_map: dict[str, ModelPricing]) -> PricingManagerBuilder

Add static fallback pricing.

Parameters
ParameterTypeDescription
`pricing_map`dict[str, ModelPricing]Dictionary of model to pricing.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
def add_source(source: AbstractPricingSource) -> PricingManagerBuilder

Add custom pricing source.

Parameters
ParameterTypeDescription
`source`AbstractPricingSourceCustom AbstractPricingSource implementation.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
def with_cache_ttl(seconds: int) -> PricingManagerBuilder

Set cache TTL.

Parameters
ParameterTypeDescription
`seconds`intCache TTL in seconds.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
Raises
ExceptionDescription
ValueErrorIf seconds is negative.
def enable_fuzzy_matching(enabled: bool = True) -> PricingManagerBuilder

Enable or disable fuzzy model name matching.

Parameters
ParameterTypeDescription
`enabled`boolWhether to enable fuzzy matching (default: True).
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
def build() -> PricingManager

Build PricingManager instance.

Returns
TypeDescription
PricingManagerConfigured PricingManager.
Raises
ExceptionDescription
ValueErrorIf no sources were added.

Configuration for a single provider in the routing cascade.

Every provider in the cascade has the same shape regardless of type. Provider-specific fields (Azure deployment, Cloudflare account ID, Bedrock region, Vertex project) go in extras.

Example

cfg = ProviderConfig( … name=“groq”, … model=“llama-3.3-70b-versatile”, … api_key=“gsk_…”, … )


Information about an LLM provider.

Attributes: name: Provider identifier (e.g., “openai”, “anthropic”). client_class: LLMClientProtocol implementation class. default_models: List of default/recommended models. supports_streaming: Whether streaming is supported. supports_tools: Whether function/tool calling is supported. supports_vision: Whether vision/image inputs are supported. base_url: Default base URL for API (optional). docs_url: Documentation URL (optional). pricing_url: Pricing page URL (optional). description: Human-readable description.


Registry for LLM providers.

Singleton registry that maintains information about all available LLM providers, both built-in and custom.

def __init__() -> Any

Initialize provider registry.

def register(
    name: str,
    client_class: type[object],
    default_models: list[str] | None = None,
    supports_streaming: bool = True,
    supports_tools: bool = False,
    supports_vision: bool = False,
    base_url: str | None = None,
    docs_url: str | None = None,
    pricing_url: str | None = None,
    description: str = ''
) -> ProviderInfo

Register a new LLM provider.

def get_provider(name: str) -> ProviderInfo

Get provider information.

def list_providers() -> list[str]

List all registered provider names.

def search_providers(
    supports_streaming: bool | None = None,
    supports_tools: bool | None = None,
    supports_vision: bool | None = None
) -> list[ProviderInfo]

Search providers by capabilities.

def unregister(name: str) -> None

Unregister a provider.

async def register_provider(
    name: str,
    client: LLMClientProtocol,
    models: list[ModelInfo]
) -> None

Register a provider following the ProviderRegistryProtocol.

async def get_client(provider: str) -> LLMClientProtocol | None

Get an initialized client for a provider.

def list_models(capabilities: set[ModelCapability] | None = None) -> list[ModelInfo]

List all models matching capabilities.

def get_model_info(model_id: str) -> ModelInfo | None

Get information about a specific model.


Configuration for the quota tracking backend.

Example

cfg = QuotaConfig(backend=“database”)


Rate limiter for LLM requests (RPM and TPM).

Manages multiple buckets for different models and providers.

def __init__() -> Any

Initialize rate limiter.

async def check(
    provider: str,
    model: str,
    tpm_limit: int | None = None,
    rpm_limit: int | None = None,
    estimated_tokens: int = 0
) -> bool

Check if request is allowed under current limits.

Parameters
ParameterTypeDescription
`provider`strAI provider name
`model`strModel name
`tpm_limit`int | NoneTokens Per Minute limit
`rpm_limit`int | NoneRequests Per Minute limit
`estimated_tokens`intEstimated tokens in request
Returns
TypeDescription
boolTrue if allowed, False if blocked

Redis-backed cache for distributed deployments.

Requires redis package to be installed.

Parameters
ParameterTypeDescription
`redis_url`Redis connection URL.
`ttl`Time-to-live in seconds.
`key_prefix`Prefix for all cache keys.

Example

cache = RedisLLMCache(redis_url=“redis://localhost:6379”) await cache.connect() result = await cache.get(“key”)

def __init__(
    cache_backend: CacheBackendProtocol,
    ttl: float = 3600,
    key_prefix: str = 'llm_cache:'
)

Initialize Redis cache.

Parameters
ParameterTypeDescription
`cache_backend`CacheBackendProtocolThe platform's cache backend.
`ttl`floatTime-to-live in seconds.
`key_prefix`strPrefix for cache keys.
async def connect() -> None

Compatibility method for lifecycle-managed cache.

async def disconnect() -> None

Compatibility method for lifecycle-managed cache.

async def get(key: str | dict[str, Any]) -> Any | None

Get value from Redis cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
Returns
TypeDescription
Any | NoneCached value or None.
async def set(
    key: str | dict[str, Any],
    value: Any,
    ttl: float | None = None
) -> None

Set value in Redis cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
`value`AnyValue to cache.
`ttl`float | NoneOptional TTL override.
async def get_or_compute(
    key: str | dict[str, Any],
    compute_fn: Callable[[], Any],
    ttl: float | None = None
) -> Any

Get from cache or compute and cache result.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
`compute_fn`Callable[[], Any]Function to compute value if cache miss.
`ttl`float | NoneOptional TTL override.
Returns
TypeDescription
AnyCached or computed value.
async def delete(key: str | dict[str, Any]) -> bool

Delete entry from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
Returns
TypeDescription
boolTrue if deleted.
async def clear() -> None

Clear all cache entries (Warning: clears entire backend if not namespaced).

def get_stats() -> CacheStats

Get cache statistics.

Returns
TypeDescription
CacheStatsCacheStats object.

Format and convert LLM responses to various types.

Example

formatter = ResponseFormatter() completion = Completion(content=“42”, …) num = formatter.to_int(completion) print(num) 42

def to_json(completion: Completion) -> JSON

Convert response to JSON.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
JSONParsed JSON

Example

data = formatter.to_json(completion)

def to_string(
    completion: Completion,
    strip: bool = True
) -> str

Convert response to string.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
`strip`boolWhether to strip whitespace
Returns
TypeDescription
strResponse string

Example

text = formatter.to_string(completion)

def to_int(completion: Completion) -> int

Convert response to integer.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
intParsed integer
Raises
ExceptionDescription
ParseErrorIf conversion fails

Example

num = formatter.to_int(completion)

def to_float(completion: Completion) -> float

Convert response to float.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
floatParsed float
Raises
ExceptionDescription
ParseErrorIf conversion fails

Example

num = formatter.to_float(completion)

def to_bool(completion: Completion) -> bool

Convert response to boolean.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
boolParsed boolean

Example

result = formatter.to_bool(completion)

def to_list(
    completion: Completion,
    separator: str = '\n'
) -> list[str]

Convert response to list of strings.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
`separator`strString separator (default: newline)
Returns
TypeDescription
list[str]List of strings

Example

items = formatter.to_list(completion)


Concrete chat message role constants shared across AI packages.

LLM client with injection protection and safety features.
def __init__(
    llm_provider: Annotated[LLMClientProtocol, Inject],
    system_prompt: str = 'You are a helpful assistant.',
    enable_output_filtering: bool = True,
    rate_limiter: Annotated[RateLimiter | None, Inject] = None,
    rpm_limit: int = 60
) -> None

Initialize secure LLM client.

Parameters
ParameterTypeDescription
`llm_provider`Annotated[LLMClientProtocol, Inject]Underlying LLM provider (injected)
`system_prompt`strSystem prompt template
`enable_output_filtering`boolEnable output filtering
async def chat(
    user_input: str,
    user_id: str,
    context: Sequence[dict[str, str]] | None = None,
    strict_validation: bool = True
) -> str

Send chat message with safety protections.

Parameters
ParameterTypeDescription
`user_input`strUser message
`user_id`strUser identifier (for rate limiting)
`context`Sequence[dict[str, str]] | NonePrevious conversation context
`strict_validation`boolReject invalid input vs sanitize
Returns
TypeDescription
strLLM response
Raises
ExceptionDescription
ValueErrorIf input invalid (strict mode)
def update_system_prompt(system_prompt: str) -> None

Update system prompt.

Parameters
ParameterTypeDescription
`system_prompt`strNew system prompt

Structured prompt template with injection protection.

Uses clear delimiters to separate system instructions from user input. Implements multi-layered injection detection.

def detect_injection(prompt: str) -> tuple[bool, list[str]]

Multi-layered injection detection.

Parameters
ParameterTypeDescription
`prompt`strInput to analyze
Returns
TypeDescription
tuple[bool, list[str]]Tuple of (is_malicious, reasons)
def validate_input(user_input: str) -> tuple[bool, str | None]

Validate user input for injection attempts.

Parameters
ParameterTypeDescription
`user_input`strUser input to validate
Returns
TypeDescription
tuple[bool, str | None]Tuple of (is_valid, error_message)
def sanitize_input(user_input: str) -> str

Sanitize user input by removing dangerous patterns.

Parameters
ParameterTypeDescription
`user_input`strUser input to sanitize
Returns
TypeDescription
strSanitized input
def format(
    user_input: str,
    strict: bool = True
) -> str

Format prompt with user input.

Parameters
ParameterTypeDescription
`user_input`strUser input
`strict`boolIf True, reject invalid input. If False, sanitize.
Returns
TypeDescription
strFormatted prompt
Raises
ExceptionDescription
ValueErrorIf input invalid and strict=True

Criteria for model selection.

Strategy for selecting models based on conditions.

Example

strategy = SelectionStrategy( … name=“long_context”, … model=“gpt-4-turbo-preview”, … conditions={ … “min_tokens”: 2000, … “max_tokens”: 100000 … } … )

def matches(context: dict[str, Any]) -> bool

Check if this strategy matches the given context.

Parameters
ParameterTypeDescription
`context`dict[str, Any]Context dictionary with prompt info
Returns
TypeDescription
boolTrue if all conditions are met

Example

context = {“token_count”: 2500, “has_code”: True} strategy.matches(context) True


Pricing source from static dictionary.

Hardcoded pricing data as a fallback when other sources are unavailable. Useful for custom internal models or as ultimate fallback.

Attributes: pricing_map: Dictionary of model name to pricing.

Example

source = StaticPricingSource({ … “my-model”: ModelPricing( … model=“my-model”, … prompt_per_1m=5.0, … completion_per_1m=10.0, … provider=“custom” … ) … })

def __init__(pricing_map: dict[str, ModelPricing])

Initialize static pricing source.

Parameters
ParameterTypeDescription
`pricing_map`dict[str, ModelPricing]Dictionary mapping model names to pricing.
async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier.
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
async def get_all_pricing() -> dict[str, ModelPricing]

Get all pricing data.

Returns
TypeDescription
dict[str, ModelPricing]All static pricing data.
property source_name() -> str

Get source name.


A chunk of streamed completion.

Implements streaming semantics with DomainModel for validation.

Example

chunk = StreamChunk(delta=“Hello”, model=“gpt-4-turbo”, finish_reason=None)


Schema-aware parser that validates LLM responses against a model.

Wraps extract_json_block, validate_against_model, and build_json_schema into a convenient class-based API.

Parameters
ParameterTypeDescription
`output_model`Model class for validation.
`strict`Whether to enforce strict validation (default ``True``).
def __init__(
    output_model: type[Any],
    *,
    strict: bool = True
) -> None

Initialise with model class.

def parse(completion: Any) -> Any

Parse and validate a completion into an output_model instance.

Parameters
ParameterTypeDescription
`completion`AnyCompletion object with ``.content`` attribute, or a string.
Returns
TypeDescription
AnyValidated model instance.
Raises
ExceptionDescription
ParseErrorWhen JSON cannot be extracted.
SchemaValidationErrorWhen validation fails.
def parse_array(completion: Any) -> list[Any]

Parse and validate an array of output_model instances.

Parameters
ParameterTypeDescription
`completion`AnyCompletion object with ``.content`` attribute.
Returns
TypeDescription
list[Any]List of validated model instances.
Raises
ExceptionDescription
ParseErrorWhen JSON is not an array.
SchemaValidationErrorWhen validation fails.
def get_json_schema() -> dict[str, Any]

Return JSON Schema dict for the output model.

def get_schema_prompt() -> str

Return a human-readable schema prompt string.


A plain-text content part in a multimodal message.

Attributes: text: The text content. type: Discriminator field, always "text".


Token counter using tiktoken (OpenAI/compatible models).

Implements TokenCounterProtocol using tiktoken for precise counting. tiktoken is a required dependency for this counter.

Parameters
ParameterTypeDescription
`model`Model name (e.g. 'gpt-4', 'gpt-3.5-turbo').
`encoding_name`Optional tiktoken encoding name override.
def __init__(
    model: str = 'gpt-3.5-turbo',
    encoding_name: str | None = None
) -> None

Initialize TiktokenCounter.

Parameters
ParameterTypeDescription
`model`strModel name for token counting.
`encoding_name`str | NoneOptional tiktoken encoding name override.
Raises
ExceptionDescription
ImportErrorIf tiktoken is not installed.
property model() -> str

The model this counter is calibrated for.

def count(text: str) -> int

Count tokens in a text string.

def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages, including overhead.


Token count result with metadata.

Attributes: total: Total number of tokens. prompt_tokens: Number of tokens in the prompt. completion_tokens: Number of tokens in the completion (if applicable). model: Model name used for counting. timestamp: When the count was performed.


Registry mapping model-name patterns to TokenCounterProtocol backends.

Uses named backend keys and regex patterns for flexible model mapping.

Usage

registry = TokenCounterRegistry.with_defaults()
counter = registry.for_model("gpt-4o")
tokens = counter.count("Hello!")
def __init__() -> None

Create an empty registry.

def with_defaults(cls) -> TokenCounterRegistry

Create registry with all available tokenizer backends.

Registers:

  • char_estimate (always available, fallback)
  • tiktoken (if installed, for OpenAI/Anthropic models)
  • huggingface (if installed, for HuggingFace models)
  • mistral (if installed, for Mistral models)
Returns
TypeDescription
TokenCounterRegistryTokenCounterRegistry pre-populated with default backends.
def register(
    key: str,
    counter: TokenCounterProtocol
) -> None

Register a counter backend under a named key.

Parameters
ParameterTypeDescription
`key`strBackend name (e.g., 'tiktoken', 'huggingface', 'char_estimate').
`counter`TokenCounterProtocolCounter implementing TokenCounterProtocol.
def map_models(
    pattern: str,
    counter_key: str
) -> None

Map a regex pattern of model names to a backend key.

Parameters
ParameterTypeDescription
`pattern`strRegex pattern matching model names (case-insensitive).
`counter_key`strBackend key (must be registered).
def for_model(model: str) -> TokenCounterProtocol

Get the best counter for the given model name.

Tries exact regex match in _patterns first, falls back to ‘char_estimate’.

Parameters
ParameterTypeDescription
`model`strModel name.
Returns
TypeDescription
TokenCounterProtocolTokenCounterProtocol implementation.

Token usage statistics.

Tool call request from LLM.

async def complete_with_json(
    client: LLMClientProtocol,
    prompt: str,
    system_prompt: str | None = None,
    **kwargs: Any
) -> JSON

Complete and parse response as JSON.

Parameters
ParameterTypeDescription
`client`LLMClientProtocolLLM client
`prompt`strUser prompt
`system_prompt`str | NoneOptional system prompt **kwargs: Additional completion arguments
Returns
TypeDescription
JSONParsed JSON

Example

data = await complete_with_json( … client, … “Generate a config with 3 fields” … )


async def complete_with_schema(
    client: LLMClientProtocol,
    prompt: str,
    schema: type[T],
    system_prompt: str | None = None,
    **kwargs: Any
) -> T

Complete with automatic schema parsing and validation.

Parameters
ParameterTypeDescription
`client`LLMClientProtocolLLM client
`prompt`strUser prompt
`schema`type[T]Pydantic model for validation
`system_prompt`str | NoneOptional system prompt **kwargs: Additional completion arguments
Returns
TypeDescription
TValidated schema instance

Example

from lexigram.ai.llm import OpenAIClient

client = OpenAIClient(api_key=“sk-…”) person = await complete_with_schema( … client, … “Extract person from: John Doe, age 30”, … schema=Person … )


def create_assistant_template() -> SecurePromptTemplate

Create template for general assistant.

Returns
TypeDescription
SecurePromptTemplateConfigured template

def create_balanced_selector() -> ModelSelector

Create a balanced model selector.


def create_cost_optimized_selector(budget_per_1k_tokens: float = 2.0) -> ModelSelector

Create a cost-optimized model selector.


def create_data_extraction_template() -> SecurePromptTemplate

Create template for data extraction (high security).

Returns
TypeDescription
SecurePromptTemplateConfigured template

def create_json_mode_messages(
    prompt: str,
    schema: type[DomainModel] | None = None,
    system_prompt: str | None = None
) -> list[dict[str, str]]

Create messages for JSON mode with optional schema.

Parameters
ParameterTypeDescription
`prompt`strUser prompt
`schema`type[DomainModel] | NoneOptional Pydantic model for schema
`system_prompt`str | NoneOptional system prompt (default: JSON instruction)
Returns
TypeDescription
list[dict[str, str]]Messages list for LLM

Example

messages = create_json_mode_messages( … “Extract person info”, … schema=Person … )


def create_quality_optimized_selector() -> ModelSelector

Create a quality-optimized model selector.


def create_token_counter(
    model: str = 'gpt-3.5-turbo',
    encoding_name: str | None = None
) -> TiktokenCounter

Factory function for creating token counters.

Parameters
ParameterTypeDescription
`model`strModel name.
`encoding_name`str | NoneOptional encoding name override.
Returns
TypeDescription
TiktokenCounterTiktokenCounter instance.

Example

from lexigram.ai.llm import create_token_counter

counter = create_token_counter(“gpt-4”) count = counter.count(“Hello!”) print(count)


def normalize_thinking_text(text: str) -> tuple[str, str | None]

Extract thinking text from raw LLM output.

Tries each pattern in THINKING_PATTERNS order. Returns (clean_content, thinking_text_or_None). clean_content has thinking block removed and is stripped. thinking_text is the raw thinking content (stripped), or None if not found.

Pattern matching is by substring presence of start_marker (and end_marker after it), NOT by model name. The bare-closing-tag pattern (end_marker="", no start) matches only when start_marker is NOT found but end_marker IS found — this covers models that output …thinking…\nresponse.

Falls back: after removing a thinking block, if clean_content is empty but thinking text was found, tries to extract from the first { or [ in the original text to recover any JSON that may have been embedded.

Parameters
ParameterTypeDescription
`text`strRaw LLM response text, possibly containing inline thinking tags.
Returns
TypeDescription
tuple[str, str | None]A tuple of (clean_content, thinking_text_or_None). - clean_content: The response text with thinking stripped out, stripped of whitespace. - thinking_text_or_None: The thinking/reasoning text, or None if no thinking found.

Base class for structured extraction errors in lexigram-ai-llm.

Error raised when extraction max retries are exhausted.

Error raised when extraction response cannot be parsed as JSON.

Error raised when parsed extraction response fails schema validation.

Error raised when a request to an LLM provider is invalid.

Invalid API key or credentials — infrastructure error, raised not wrapped.

Raised as an exception (NOT wrapped in Result). Indicates a misconfiguration the application cannot route around.


Content blocked by provider safety filter — recoverable via reformulation.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should reformulate the prompt or inform the user.


Base exception for all LLM-domain errors in lexigram-ai-llm.

Model unavailable or not found — recoverable via fallback routing.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should route to a different model or provider.


API quota or billing limit exceeded — recoverable by routing elsewhere.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should route the request to a different provider or account.


Rate limit exceeded — recoverable via backoff/retry.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should implement exponential backoff or route to another provider.


Model unavailable or not found — recoverable via fallback routing.

Raised when response cannot be parsed.

Error raised when connection to an LLM provider fails.

Raised when parsed response fails validation.

Error raised during LLM response streaming.

Base exception for structured output errors.

Error raised when the token limit for a request is exceeded.