Skip to content
GitHub

API Reference

Protocol for LLM cache implementations.
get
async def get(key: str | dict[str, Any]) -> Any | None

Get value from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or structured dict).
Returns
TypeDescription
Any | NoneCached value, or ``None`` if not present.
set
async def set(
    key: str | dict[str, Any],
    value: Any,
    ttl: float | None = None
) -> None

Set value in cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or structured dict).
`value`AnyValue to store.
`ttl`float | NoneOptional time-to-live in seconds.
delete
async def delete(key: str | dict[str, Any]) -> bool

Delete entry from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key to remove.
Returns
TypeDescription
bool``True`` if the key existed and was removed, ``False`` otherwise.
clear
async def clear() -> None

Clear all entries.

get_stats
def get_stats() -> dict[str, Any]

Return cache statistics.

Returns
TypeDescription
dict[str, Any]Mapping of statistic name to value.

Pricing source from HTTP API endpoint.

Fetches pricing data from a remote API. Useful for getting the latest pricing updates, but requires network connectivity.

Attributes: endpoint: API endpoint URL. timeout: Request timeout in seconds.

Example

source = APIPricingSource(
"https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"
)
pricing = await source.get_pricing("gpt-4")
__init__
def __init__(
    endpoint: str,
    timeout: float = 10.0
)

Initialize API pricing source.

Parameters
ParameterTypeDescription
`endpoint`strURL to fetch pricing from.
`timeout`floatRequest timeout in seconds (default: 10).
get_pricing
async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier.
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
get_all_pricing
async def get_all_pricing() -> dict[str, ModelPricing]

Get all pricing data.

Returns
TypeDescription
dict[str, ModelPricing]All pricing data from API.
source_name
property source_name() -> str

Get source name.

invalidate_cache
def invalidate_cache() -> None

Clear cached pricing data to force refresh.


Abstract base class for pricing data sources.

All pricing sources must implement get_pricing() to return ModelPricing for a given model name, or None if not found.

get_pricing
async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier (e.g., "gpt-4-turbo").
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
get_all_pricing
async def get_all_pricing() -> dict[str, ModelPricing]

Get all available pricing data.

Returns
TypeDescription
dict[str, ModelPricing]Dictionary mapping model names to pricing.
source_name
property source_name() -> str

Get the name of this pricing source.

Returns
TypeDescription
strHuman-readable source name.

Anthropic Claude LLM client implementation.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Claude 3 (Opus, Sonnet, Haiku) models with:

  • Streaming responses
  • Tool calling
  • Vision capabilities
  • Automatic retry and error handling

Example

from lexigram.ai import ClientConfig
config = ClientConfig(provider="anthropic", model="claude-3-sonnet-20240229")
client = AnthropicClient(config)
completion = await client.complete([
ChatMessage(role="user", content="Hello!")
])
__init__
def __init__(config: ClientConfig)

Initialize Anthropic client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
Raises
ExceptionDescription
ImportErrorIf anthropic package is not installed
close
async def close() -> None

Close the Anthropic client.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Parse LLM responses into lists of dictionaries (CSV format).

Extracts JSON array from the response and converts to list of dicts, where each dict represents a CSV row with column names as keys.

Example

parser = CSVOutputParser()
result = parser.parse('[{"name": "John", "age": 30}, {"name": "Jane", "age": 25}]')
assert len(result) == 2
assert result[0]["name"] == "John"
parse
def parse(text: str) -> list[dict[str, Any]]

Parse text into a list of dictionaries.

Parameters
ParameterTypeDescription
`text`strRaw LLM response text that may contain JSON array.
Returns
TypeDescription
list[dict[str, Any]]List of dictionaries, each representing a CSV row.
Raises
ExceptionDescription
ParseErrorWhen JSON cannot be extracted or is not an array.
parse_csv_string
def parse_csv_string(text: str) -> list[dict[str, Any]]

Parse raw CSV text (not JSON) into list of dictionaries.

Parameters
ParameterTypeDescription
`text`strRaw CSV text with header row.
Returns
TypeDescription
list[dict[str, Any]]List of dictionaries, each representing a CSV row.
Raises
ExceptionDescription
ParseErrorWhen CSV cannot be parsed.
get_format_instructions
def get_format_instructions() -> str

Return format instructions for the LLM.

Returns
TypeDescription
strFormat instruction string telling the model to output a valid JSON array of objects.

Cache entry with metadata.

Attributes: key: Cache key. value: Cached value. created_at: When entry was created. expires_at: When entry expires (Unix timestamp). hits: Number of cache hits. size_bytes: Approximate size in bytes.


Cache statistics.

Attributes: hits: Number of cache hits. misses: Number of cache misses. evictions: Number of evictions. total_entries: Current number of entries. total_size_bytes: Total cache size in bytes.

hit_rate
property hit_rate() -> float

Calculate cache hit rate.


Character-based token count estimator (~4 chars per token).

Always available without any optional dependencies. Suitable as a safe fallback counter.

Parameters
ParameterTypeDescription
`model`Model name (used for identification only).
__init__
def __init__(model: str = 'unknown') -> None

Initialize CharEstimateCounter.

Parameters
ParameterTypeDescription
`model`strModel name for identification.
model
property model() -> str

The model this counter is calibrated for.

count
def count(text: str) -> int

Count tokens using character estimation.

count_messages
def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages.


A single chat message.

Implements ChatMessageProtocol with DomainModel semantics for validation.

Example

msg = ChatMessage(role="user", content="Hello, how are you?")

Configuration for LLM clients.

Example

config = ClientConfig(
provider="openai",
model="gpt-4-turbo",
api_key="sk-...",
temperature=0.7,
max_tokens=2000,
)

Client for Cohere's enterprise NLP API.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Chat, Embeddings, and Reranking with:

  • RAG-optimized models (Command R/R+)
  • High-performance embeddings
  • Native reranking support
__init__
def __init__(config: ClientConfig) -> None

Initialize Cohere client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
api_key
property api_key() -> SecretStr

Get API key from config.

base_url
property base_url() -> str

Get base URL from config.

embed
async def embed(
    texts: list[str] | str,
    model: str = 'embed-english-v3.0',
    input_type: str = 'search_document',
    **kwargs: Any
) -> list[list[float]]

Generate embeddings.

Parameters
ParameterTypeDescription
`texts`list[str] | strText or list of texts to embed.
`model`strModel ID (default: "embed-english-v3.0").
`input_type`strType of input ("search_document", "search_query", "classification", "clustering"). **kwargs: Additional parameters.
Returns
TypeDescription
list[list[float]]List of embedding vectors.

Example

# Embed documents
doc_embeddings = await client.embed(
texts=["Doc 1", "Doc 2"],
input_type="search_document"
)
# Embed query
query_embedding = await client.embed(
texts="What is AI?",
input_type="search_query"
)
rerank
async def rerank(
    query: str,
    documents: list[str] | list[dict[str, str]],
    model: str = 'rerank-english-v3.0',
    top_n: int | None = None,
    **kwargs: Any
) -> list[dict[str, Any]]

Rerank documents for a query.

Parameters
ParameterTypeDescription
`query`strSearch query.
`documents`list[str] | list[dict[str, str]]List of documents (strings or dicts with 'text' key).
`model`strReranking model (default: "rerank-english-v3.0").
`top_n`int | NoneReturn top N results (default: all). **kwargs: Additional parameters.
Returns
TypeDescription
list[dict[str, Any]]List of ranked documents with scores.

Example

results = await client.rerank(
query="What is machine learning?",
documents=[
"ML is a subset of AI...",
"Unrelated document...",
"Deep learning uses neural networks..."
],
top_n=2
)
for result in results:
print(f"Score: {result['relevance_score']:.3f} - {result['document']['text']}")
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the provider.

close
async def close() -> None

Close the HTTP client.


LLM completion response.

Implements completion semantics with DomainModel for validation and additional fields.

Example

completion = Completion(
content="Hello! I'm doing well, thank you.",
model="gpt-4-turbo",
usage=TokenUsage(prompt_tokens=10, completion_tokens=8, total_tokens=18)
)

Configuration for conversation management.

Example

config = ConversationConfig(
max_tokens=4096,
reserve_tokens=1000,
trim_strategy="oldest"
)

Manage multi-turn conversations with automatic context window management.

This class handles:

  • Message history management
  • Automatic token counting
  • Context window trimming
  • System prompt handling
  • Conversation statistics

Example

from lexigram.ai.llm import OpenAIClient, ConversationManager
client = OpenAIClient(api_key="sk-...", model="gpt-4")
manager = ConversationManager(
client=client,
system_prompt="You are a helpful assistant.",
max_tokens=4096
)
# Add user message and get response
response = await manager.chat("What is Python?")
print(response.content)
# Continue conversation
response = await manager.chat("Tell me more about it")
print(response.content)
# Get conversation history
history = manager.get_history()
stats = manager.get_stats()
print(f"Total messages: {stats.total_messages}")
print(f"Total tokens: {stats.total_tokens}")
__init__
def __init__(
    client: AbstractLLMClient,
    system_prompt: str | None = None,
    max_tokens: int = 4096,
    reserve_tokens: int = 1000,
    trim_strategy: str = 'oldest',
    metadata: Metadata | None = None,
    token_counter: TokenCounterProtocol | None = None
) -> None

Initialize conversation manager.

Parameters
ParameterTypeDescription
`client`AbstractLLMClientLLM client for completions
`system_prompt`str | NoneOptional system prompt (prepended to all conversations)
`max_tokens`intMaximum context window size
`reserve_tokens`intTokens to reserve for completion
`trim_strategy`strMessage trimming strategy ('oldest', 'middle', 'summary')
`metadata`Metadata | NoneAdditional metadata for the conversation
`token_counter`TokenCounterProtocol | NoneOptional TokenCounterProtocol implementation. If not provided, uses CharEstimateCounter.
chat
async def chat(
    message: str,
    role: Role = Role.USER,
    **completion_kwargs: Any
) -> Completion

Send a message and get a response.

Parameters
ParameterTypeDescription
`message`strMessage content
`role`RoleMessage role (default: USER) **completion_kwargs: Additional kwargs for completion
Returns
TypeDescription
CompletionCompletion response from LLM

Example

response = await manager.chat("Hello!")
print(response.content)
add_message
async def add_message(
    role: Role,
    content: str,
    update_stats: bool = True
) -> None

Add a message to conversation history without getting a response.

Parameters
ParameterTypeDescription
`role`RoleMessage role
`content`strMessage content
`update_stats`boolWhether to update statistics

Example

await manager.add_message(Role.USER, "Hello")
await manager.add_message(Role.ASSISTANT, "Hi there!")
get_history
def get_history(
    include_system: bool = True,
    limit: int | None = None
) -> list[ChatMessage]

Get conversation history.

Parameters
ParameterTypeDescription
`include_system`boolInclude system message in history
`limit`int | NoneMaximum number of messages to return (most recent)
Returns
TypeDescription
list[ChatMessage]List of chat messages

Example

history = manager.get_history(limit=10)
for msg in history:
print(f"{msg.role}: {msg.content}")
get_stats
def get_stats() -> ConversationStats

Get conversation statistics.

Returns
TypeDescription
ConversationStatsConversation statistics

Example

stats = manager.get_stats()
print(f"Total tokens: {stats.total_tokens}")
clear_history
def clear_history(keep_system: bool = True) -> None

Clear conversation history.

Parameters
ParameterTypeDescription
`keep_system`boolKeep system message when clearing

Example

manager.clear_history()
update_system_prompt
def update_system_prompt(system_prompt: str) -> None

Update the system prompt.

Parameters
ParameterTypeDescription
`system_prompt`strNew system prompt

Example

manager.update_system_prompt("You are a Python expert.")
get_token_count
def get_token_count() -> int

Get current total token count.

Returns
TypeDescription
intTotal tokens in conversation

Example

tokens = manager.get_token_count()
print(f"Current tokens: {tokens}")
get_available_tokens
def get_available_tokens() -> int

Get available tokens for completion.

Returns
TypeDescription
intAvailable tokens (max_tokens - current_tokens - reserve_tokens) Can be negative if context window is exceeded

Example

available = manager.get_available_tokens()
print(f"Available for completion: {available}")
export_history
def export_history() -> dict[str, Any]

Export conversation history to dictionary.

Returns
TypeDescription
dict[str, Any]Dictionary with conversation data (JSON-serializable)

Example

data = manager.export_history()
from lexigram import serialization as json
with open("conversation.json", "w") as f:
json.dump(data, f)
from_history
def from_history(
    cls,
    client: AbstractLLMClient,
    history_data: dict[str, Any]
) -> ConversationManager

Create conversation manager from exported history.

Parameters
ParameterTypeDescription
`client`AbstractLLMClientLLM client
`history_data`dict[str, Any]Exported history data
Returns
TypeDescription
ConversationManagerConversationManager instance

Example

from lexigram import serialization as json
with open("conversation.json") as f:
data = json.load(f)
manager = ConversationManager.from_history(client, data)

Statistics for a conversation.

Example

stats = ConversationStats(
total_messages=10,
total_tokens=2048,
user_messages=5,
assistant_messages=5
)

Cost estimation result.

Attributes: prompt_cost: Cost for prompt tokens. completion_cost: Cost for completion tokens. total_cost: Total estimated cost. currency: Currency code (default: USD). model: Model name. rate_per_1k_prompt: Rate per 1000 prompt tokens. rate_per_1k_completion: Rate per 1000 completion tokens.


Parse LLM responses into Enum members.

Extracts JSON from the response and maps it to an Enum member. Supports both string values and integer values.

Example

from enum import Enum
class Status(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
parser = EnumOutputParser(Status)
result = parser.parse('"active"')
assert result == Status.ACTIVE
__init__
def __init__(enum: type[Enum]) -> None

Initialize with an Enum class.

Parameters
ParameterTypeDescription
`enum`type[Enum]Enum subclass to parse into.
parse
def parse(text: str) -> Enum

Parse text into an Enum member.

Parameters
ParameterTypeDescription
`text`strRaw LLM response text that may contain JSON with enum value.
Returns
TypeDescription
EnumCorresponding Enum member.
Raises
ExceptionDescription
ParseErrorWhen JSON cannot be extracted or enum value is invalid.
get_format_instructions
def get_format_instructions() -> str

Return format instructions for the LLM.

Returns
TypeDescription
strFormat instruction string telling the model to output a valid enum value.

Parser that retries with LLM-assisted fixing on parse failure.

Wraps a base parser and, on parse failure, calls the LLM with a fixing prompt that includes the original output and the parse error. Retries are bounded by the retry_budget.

Example

parser = FormatFixingParser(
base_parser=JSONOutputParser(),
llm_client=llm_client,
retry_budget=3
)
result = parser.parse('not valid json')
__init__
def __init__(
    base_parser: Any,
    llm_client: Any,
    *,
    retry_budget: int = 3,
    guard_check: Callable[[str], bool] | None = None
) -> None

Initialize the format fixing parser.

Parameters
ParameterTypeDescription
`base_parser`AnyThe underlying parser to use for parsing.
`llm_client`AnyLLM client to use for fixing attempts.
`retry_budget`intMaximum number of fix attempts (default 3).
`guard_check`Callable[[str], bool] | NoneOptional guard function to validate malformed input before sending to LLM. Should return True if safe.
parse
async def parse(text: str) -> Any

Parse text, attempting fixes on failure.

Parameters
ParameterTypeDescription
`text`strRaw LLM response text to parse.
Returns
TypeDescription
AnyParsed output from the base parser.
Raises
ExceptionDescription
ParseErrorWhen all fix attempts fail or guard check fails.
get_format_instructions
def get_format_instructions() -> str

Return format instructions from the base parser.

Returns
TypeDescription
strFormat instructions from the wrapped parser.

Function call request from LLM.

Default generation parameters applied to every routing attempt.

Example

defaults = GenerationDefaults(temperature=0.3, max_tokens=2048)

Client for Groq's ultra-fast LLM inference API.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Chat, Stream, and Vision with:

  • Ultra-fast LPU hardware synergy
  • OpenAI-compatible API surface
  • Blazing-fast token generation
__init__
def __init__(config: ClientConfig)

Initialize Groq client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
api_key
property api_key() -> SecretStr

Get API key from config.

base_url
property base_url() -> str

Get base URL from config.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Run a lightweight provider health probe.

list_models
async def list_models() -> list[dict[str, Any]]

List models available from the Groq API.

close
async def close() -> None

Close the HTTP client.

Example

await client.close()

Token counter using HuggingFace AutoTokenizer (lazy-loaded).

When constructed without a model, uses character estimation (~4 chars/token). When constructed with a model name, lazy-loads that model’s tokenizer on first use.

Parameters
ParameterTypeDescription
`model`Optional HuggingFace model name. If None, uses char estimation fallback.
__init__
def __init__(model: str | None = None) -> None

Initialize HuggingFaceCounter.

Parameters
ParameterTypeDescription
`model`str | NoneOptional HuggingFace model name for tokenizer loading.
model
property model() -> str

Backend identifier.

count
def count(text: str) -> int

Count tokens in a text string.

count_messages
def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages.


An image pre-encoded as base64 in a multimodal message.

Attributes: data: Raw base64-encoded bytes (no data: prefix). media_type: MIME type, e.g. "image/jpeg". type: Discriminator field, always "image_base64".


An image specified by URL in a multimodal message.

The framework passes the URL through to providers that support it natively (OpenAI, Anthropic, Gemini). For providers that require base64 (Ollama, Bedrock), the client fetches and converts.

Attributes: url: Public or data-URI URL of the image. detail: OpenAI vision detail level ("auto", "low", "high"). type: Discriminator field, always "image_url".


Structured extraction from LLM completions using instructor library.

Extracts typed Pydantic models from LLM responses by:

  1. Building a ChatMessage list with extraction instructions
  2. Calling llm_client.complete() to get a Completion
  3. Parsing the completion text as JSON
  4. Validating against the response_model
  5. Retrying on validation/parse failures up to max_retries

Unlike direct instructor usage, this implementation uses the standard LLMClientProtocol.complete() method, avoiding coupling to provider-specific client patching mechanisms.

Example

from pydantic import BaseModel
class UserInfo(BaseModel):
name: str
age: int
extractor = InstructorExtractor(llm_client)
result = await extractor.extract(
prompt="Extract user info from: 'John is 30 years old'",
response_model=UserInfo,
)
if result.is_ok():
user = result.unwrap()
print(user.name, user.age)
else:
error = result.unwrap_err()
# handle ExtractionError
from pydantic import BaseModel
class UserInfo(BaseModel):
name: str
age: int
extractor = InstructorExtractor(llm_client)
result = await extractor.extract(
prompt="Extract user info from: 'John is 30 years old'",
response_model=UserInfo,
)
if result.is_ok():
user = result.unwrap()
print(user.name, user.age)
else:
error = result.unwrap_err()
# handle ExtractionError
__init__
def __init__(
    llm_client: LLMClientProtocol,
    mode: str = 'json',
    max_retries: int = 3
) -> None

Initialize InstructorExtractor.

Parameters
ParameterTypeDescription
`llm_client`LLMClientProtocolLLMClientProtocol instance for making LLM calls.
`mode`strInstructor patching mode (reserved for future provider-level integration; currently unused).
`max_retries`intMaximum number of retries on validation/parse failure.
extract
async def extract(
    prompt: str,
    response_model: type[T],
    context: list | None = None,
    **kwargs: Any
) -> Result[T, ExtractionError]

Extract a structured response_model instance from an LLM call.

Parameters
ParameterTypeDescription
`prompt`strUser prompt for extraction.
`response_model`type[T]Pydantic BaseModel class to extract and validate.
`context`list | NoneOptional list of additional ChatMessage objects for context. **kwargs: Additional parameters passed to llm_client.complete().
Returns
TypeDescription
Result[T, ExtractionError]``Ok(instance)`` on successful extraction and validation. ``Err(ExtractionError)`` on parse, validation, or max retries failure.
Raises
ExceptionDescription

Extract and parse JSON from LLM responses.
extract
def extract(
    text: str,
    multiple: bool = False
) -> Any

Extract JSON from text.

extract_array
def extract_array(text: str) -> list[Any]

Extract JSON array from text.


Pricing source from local JSON file.

This is the fastest and most reliable source as it doesn’t require network calls and works offline.

Attributes: file_path: Path to JSON pricing file. cache: In-memory cache of loaded pricing.

Example

source = JSONFilePricingSource(Path("custom_pricing.json"))
pricing = await source.get_pricing("gpt-4-turbo")
__init__
def __init__(file_path: Path)

Initialize JSON file pricing source.

Parameters
ParameterTypeDescription
`file_path`PathPath to JSON file containing pricing data.
get_pricing
async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier.
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
get_all_pricing
async def get_all_pricing() -> dict[str, ModelPricing]

Get all pricing data.

Returns
TypeDescription
dict[str, ModelPricing]All pricing data from JSON file.
source_name
property source_name() -> str

Get source name.

invalidate_cache
def invalidate_cache() -> None

Clear cached pricing data to force reload.


Parse LLM responses into JSON dicts.

Handles common LLM output patterns like markdown code fences, prose before/after JSON, and malformed JSON.

Example

parser = JSONOutputParser()
result = parser.parse('{"key": "value"}')
assert result == {"key": "value"}
parse
def parse(text: str) -> dict[str, Any]

Parse text into a JSON dict.

Parameters
ParameterTypeDescription
`text`strRaw LLM response text that may contain JSON.
Returns
TypeDescription
dict[str, Any]Parsed JSON as a dict.
Raises
ExceptionDescription
ParseErrorWhen JSON cannot be extracted or parsed.
get_format_instructions
def get_format_instructions() -> str

Return format instructions for the LLM.

Returns
TypeDescription
strFormat instruction string telling the model to output valid JSON.

In-memory cache for LLM responses with TTL.

Implements LRU eviction when max_size is reached.

Parameters
ParameterTypeDescription
`ttl`Time-to-live in seconds (default: 1 hour).
`max_size`Maximum number of entries (default: 1000).
`max_size_bytes`Maximum cache size in bytes (default: 100MB).

Example

cache = LLMCache(ttl=3600, max_size=500)
result = await cache.get("key")
await cache.set("key", "value")
__init__
def __init__(
    ttl: float = 3600,
    max_size: int = 1000,
    max_size_bytes: int = 100 * 1024 * 1024
)

Initialize LLM cache.

Parameters
ParameterTypeDescription
`ttl`floatTime-to-live in seconds.
`max_size`intMaximum number of entries.
`max_size_bytes`intMaximum total size in bytes.
get
async def get(key: str | dict[str, Any]) -> Any | None

Get value from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or dict).
Returns
TypeDescription
Any | NoneCached value or None if not found/expired.
set
async def set(
    key: str | dict[str, Any],
    value: Any,
    ttl: float | None = None
) -> None

Set value in cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key (string or dict).
`value`AnyValue to cache.
`ttl`float | NoneOptional TTL override.
get_or_compute
async def get_or_compute(
    key: str | dict[str, Any],
    compute_fn: Callable[[], Any],
    ttl: float | None = None
) -> Any

Get from cache or compute and cache result.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
`compute_fn`Callable[[], Any]Function to compute value if cache miss.
`ttl`float | NoneOptional TTL override.
Returns
TypeDescription
AnyCached or computed value.

Example

result = await cache.get_or_compute(
key="greeting",
compute_fn=lambda: llm.complete("Say hello")
)
delete
async def delete(key: str | dict[str, Any]) -> bool

Delete entry from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key to delete.
Returns
TypeDescription
boolTrue if entry was deleted.
clear
async def clear() -> None

Clear all cache entries.

get_stats
def get_stats() -> CacheStats

Get cache statistics.

Returns
TypeDescription
CacheStatsCacheStats object.

Emitted when an LLM completion is received.

Distinct from LLMCallStartedHook (which intercepts); this is the immutable record that a completion happened.

Consumed by: cost accounting, audit, safety review.


Root configuration object for the LLM routing system.

All providers are opt-in: a provider joins the cascade only when its credential environment variable is set. Use from_env to build from LEX_AI_LLM__ environment variables.

Example

config = LLMConfig(
providers=[
ProviderConfig(name="groq", model="llama-3.3-70b-versatile", api_key="gsk_..."),
ProviderConfig(name="gemini", model="gemini-2.5-flash", api_key="AIza..."),
],
defaults=GenerationDefaults(temperature=0.3),
)
config = LLMConfig(
providers=[
ProviderConfig(name="groq", model="llama-3.3-70b-versatile", api_key="gsk_..."),
ProviderConfig(name="gemini", model="gemini-2.5-flash", api_key="AIza..."),
],
defaults=GenerationDefaults(temperature=0.3),
)

Environment variables (prefix LEX_AI_LLM__)

Global:
LEX_AI_LLM__STRATEGY sequential | parallel_race |
cost_optimized | latency_optimized
LEX_AI_LLM__DEFAULTS__TEMPERATURE float (default 0.2)
LEX_AI_LLM__DEFAULTS__MAX_TOKENS int (default: provider default)
LEX_AI_LLM__QUOTA__BACKEND memory | database (default memory)
LEX_AI_LLM__LOG__BACKEND memory | database (default memory)
LEX_AI_LLM__LOG__MAX_ENTRIES int (default 1000)
Per-provider (pattern: LEX_AI_LLM__PROVIDERS__{NAME}__{FIELD}):
__{NAME}__API_KEY str API key -- activates key-auth providers
__{NAME}__BASE_URL str Endpoint -- activates local/custom providers
__{NAME}__MODEL str Model override (has per-provider defaults)
__{NAME}__TIMEOUT int Request timeout in seconds (default 30)
__{NAME}__ENABLED bool Explicit enable/disable (default true)
Supported provider names and their activation:
OPENAI API_KEY required default model: gpt-4o
ANTHROPIC API_KEY required default model: claude-3-5-sonnet-20241022
GROQ API_KEY required default model: llama-3.3-70b-versatile
GEMINI API_KEY required default model: gemini-2.5-flash
MISTRAL API_KEY required default model: mistral-large-latest
COHERE API_KEY required default model: command-r-plus
OPENROUTER API_KEY required default model: openai/gpt-4o-mini
DEEPSEEK API_KEY required default model: deepseek-chat
TOGETHER API_KEY required default model: meta-llama/Llama-3-8b-chat-hf
FIREWORKS API_KEY required default model: accounts/fireworks/models/llama-v3-70b-instruct
OLLAMA BASE_URL required default model: llama3.2 (default base: http://localhost:11434)
OPENAI_COMPATIBLE BASE_URL + MODEL required (generic OpenAI-compatible: LM Studio, VLLM, etc.)
Azure-specific extras (activated by AZURE__API_KEY + AZURE__BASE_URL):
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_RESOURCE
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_DEPLOYMENT
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_API_VERSION
Cloudflare-specific extras (activated by CLOUDFLARE__EXTRAS__CF_ACCOUNT_ID):
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__EXTRAS__CF_ACCOUNT_ID <- activates
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__EXTRAS__CF_API_TOKEN
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__MODEL
AWS Bedrock extras (activated by BEDROCK__EXTRAS__AWS_REGION):
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_REGION <- activates
LEX_AI_LLM__PROVIDERS__BEDROCK__MODEL
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_ACCESS_KEY_ID
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_SECRET_ACCESS_KEY
Google Vertex AI extras (activated by VERTEX__EXTRAS__VERTEX_PROJECT):
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_PROJECT <- activates
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_LOCATION
LEX_AI_LLM__PROVIDERS__VERTEX__MODEL
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_CREDENTIALS_FILE
Global:
LEX_AI_LLM__STRATEGY sequential | parallel_race |
cost_optimized | latency_optimized
LEX_AI_LLM__DEFAULTS__TEMPERATURE float (default 0.2)
LEX_AI_LLM__DEFAULTS__MAX_TOKENS int (default: provider default)
LEX_AI_LLM__QUOTA__BACKEND memory | database (default memory)
LEX_AI_LLM__LOG__BACKEND memory | database (default memory)
LEX_AI_LLM__LOG__MAX_ENTRIES int (default 1000)
Per-provider (pattern: LEX_AI_LLM__PROVIDERS__{NAME}__{FIELD}):
__{NAME}__API_KEY str API key -- activates key-auth providers
__{NAME}__BASE_URL str Endpoint -- activates local/custom providers
__{NAME}__MODEL str Model override (has per-provider defaults)
__{NAME}__TIMEOUT int Request timeout in seconds (default 30)
__{NAME}__ENABLED bool Explicit enable/disable (default true)
Supported provider names and their activation:
OPENAI API_KEY required default model: gpt-4o
ANTHROPIC API_KEY required default model: claude-3-5-sonnet-20241022
GROQ API_KEY required default model: llama-3.3-70b-versatile
GEMINI API_KEY required default model: gemini-2.5-flash
MISTRAL API_KEY required default model: mistral-large-latest
COHERE API_KEY required default model: command-r-plus
OPENROUTER API_KEY required default model: openai/gpt-4o-mini
DEEPSEEK API_KEY required default model: deepseek-chat
TOGETHER API_KEY required default model: meta-llama/Llama-3-8b-chat-hf
FIREWORKS API_KEY required default model: accounts/fireworks/models/llama-v3-70b-instruct
OLLAMA BASE_URL required default model: llama3.2 (default base: http://localhost:11434)
OPENAI_COMPATIBLE BASE_URL + MODEL required (generic OpenAI-compatible: LM Studio, VLLM, etc.)
Azure-specific extras (activated by AZURE__API_KEY + AZURE__BASE_URL):
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_RESOURCE
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_DEPLOYMENT
LEX_AI_LLM__PROVIDERS__AZURE__EXTRAS__AZURE_API_VERSION
Cloudflare-specific extras (activated by CLOUDFLARE__EXTRAS__CF_ACCOUNT_ID):
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__EXTRAS__CF_ACCOUNT_ID <- activates
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__EXTRAS__CF_API_TOKEN
LEX_AI_LLM__PROVIDERS__CLOUDFLARE__MODEL
AWS Bedrock extras (activated by BEDROCK__EXTRAS__AWS_REGION):
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_REGION <- activates
LEX_AI_LLM__PROVIDERS__BEDROCK__MODEL
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_ACCESS_KEY_ID
LEX_AI_LLM__PROVIDERS__BEDROCK__EXTRAS__AWS_SECRET_ACCESS_KEY
Google Vertex AI extras (activated by VERTEX__EXTRAS__VERTEX_PROJECT):
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_PROJECT <- activates
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_LOCATION
LEX_AI_LLM__PROVIDERS__VERTEX__MODEL
LEX_AI_LLM__PROVIDERS__VERTEX__EXTRAS__VERTEX_CREDENTIALS_FILE
from_env
def from_env(cls) -> LLMConfig

Build a routing config from LEX_AI_LLM__ environment variables.

Returns
TypeDescription
LLMConfigPopulated LLMConfig.

LLM client and model-management integration.

Call configure to register an LLMClientProtocol implementation and optional model manager for injection.

Usage

from lexigram.ai.llm.config import ClientConfig
@module(
imports=[
LLMModule.configure(
ClientConfig(provider="openai", model="gpt-4o")
)
]
)
class AppModule(Module):
pass
from lexigram.ai.llm.config import ClientConfig
@module(
imports=[
LLMModule.configure(
ClientConfig(provider="openai", model="gpt-4o")
)
]
)
class AppModule(Module):
pass

Multi-provider routing

from lexigram.ai.llm import LLMModule
@module(
imports=[LLMModule.configure(routing=LLMConfig())]
)
class AppModule(Module):
pass
from lexigram.ai.llm import LLMModule
@module(
imports=[LLMModule.configure(routing=LLMConfig())]
)
class AppModule(Module):
pass
configure
def configure(
    cls,
    config: ClientConfig | Any | None = None,
    *,
    routing: LLMConfig | Any | None = None,
    enable_model_manager: bool = False,
    enable_streaming: bool = True
) -> DynamicModule

Create an LLMModule with a single configured provider.

Parameters
ParameterTypeDescription
`config`ClientConfig | Any | NoneClientConfig or ``None`` to read configuration from environment variables.
`routing`LLMConfig | Any | NoneOptional LLMConfig enabling the multi-provider routing layer instead of the single-provider client.
`enable_model_manager`boolRegister LLMModelManager for local model lifecycle control.
`enable_streaming`boolEnable streaming response support. Defaults to ``True``; set to ``False`` to restrict to non-streaming clients only.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
stub
def stub(
    cls,
    config: ClientConfig | Any | None = None
) -> DynamicModule

Create an LLMModule suitable for unit and integration testing.

Uses a no-op or stub LLM client with minimal external dependencies. Streaming is disabled by default to simplify test assertions.

Parameters
ParameterTypeDescription
`config`ClientConfig | Any | NoneOptional ClientConfig override. Uses safe test defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Provider that registers LLM services with the Lexigram DI container.

Registers an LLMClientProtocol, optional LLM response cache, and an LLMModelManager so all three are injectable throughout the application.

Example

from lexigram.ai.llm.di.provider import LLMProvider
from lexigram.ai.llm.config import ClientConfig
app.use(LLMProvider(ClientConfig(provider="openai", model="gpt-4o")))
# LLMClientProtocol is now injectable:
class MyService:
def __init__(self, llm: LLMClientProtocol) -> None:
self.llm = llm
__init__
def __init__(
    config: ClientConfig | None = None,
    enable_model_manager: bool = False,
    enable_streaming: bool = True,
    name: str = 'llm',
    cache_backend: CacheBackendProtocol | None = None
) -> None

Initialize the LLM Provider.

Parameters
ParameterTypeDescription
`config`ClientConfig | NoneLLM client configuration; defaults to ClientConfig() (reads env).
`enable_model_manager`boolRegister LLMModelManager for local model control.
`enable_streaming`boolEnable streaming response support.
`name`strProvider name used for identification.
`cache_backend`CacheBackendProtocol | NoneInjected cache backend for optional response caching.
register
async def register(container: ContainerRegistrarProtocol) -> None

Register LLM services with the DI container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe Lexigram DI container registrar.
boot
async def boot(container: ContainerResolverProtocol) -> None

Boot the LLM provider — validates API key presence and format.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container resolver.
shutdown
async def shutdown() -> None

Close client connections on application shutdown.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Return basic health information for the registered LLM client.


Payload fired when an LLM provider is registered in the provider registry.

Attributes: provider: Identifier of the provider that was registered.


Payload fired when an LLM request is dispatched to a provider.

Attributes: provider: Provider identifier (e.g. "openai"). model: Model name targeted by the request (e.g. "gpt-4o").


Payload fired when a complete LLM response is received from a provider.

Attributes: provider: Provider identifier that returned the response. model: Model name that produced the response.


Provider that registers the multi-provider LLM router with the DI container.

Builds the LLMRouter from a LLMConfig, chooses the appropriate quota backend and inference logger, and registers everything as singletons.

Example

from lexigram.ai.llm.module import LLMModule
from lexigram.ai.llm.routing import LLMConfig
app.use(LLMModule.configure(routing=LLMConfig.from_env()))
# LLMRouterProtocol is now injectable:
class MyService:
def __init__(self, router: LLMRouterProtocol) -> None:
self.router = router
__init__
def __init__(
    config: LLMConfig | None = None,
    database_provider: DatabaseProviderProtocol | None = None,
    model_selector: ModelSelector | None = None
) -> None

Initialise the LLM routing provider.

Parameters
ParameterTypeDescription
`config`LLMConfig | NoneRouting configuration; defaults to ``LLMConfig.from_env()``.
`database_provider`DatabaseProviderProtocol | NoneInjected DB provider used when ``quota.backend`` or ``logging.backend`` is ``database``.
`model_selector`ModelSelector | NoneOptional model selector for capability-based routing. When provided, ``required_capabilities`` in route kwargs will filter providers whose models lack the requested capabilities.
register
async def register(container: ContainerRegistrarProtocol) -> None

Build and register the LLMRouter with the DI container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe Lexigram DI container registrar.
boot
async def boot(container: ContainerResolverProtocol) -> None

Boot phase — no-op for this provider.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container resolver.
shutdown
async def shutdown() -> None

Close all routing clients on application shutdown.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Return basic health information for the router.

Parameters
ParameterTypeDescription
`timeout`floatUnused; retained for interface compatibility.
Returns
TypeDescription
HealthCheckResultA dict with ``status`` and ``providers`` keys.

Configuration for inference attempt logging.

Example

cfg = LogConfig(backend="database", max_entries=5000)

Client for Mistral AI's LLM API.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports Chat, Stream, and Embeddings with:

  • High-performance European LLMs
  • GDPR compliance and data sovereignty
  • Function calling and JSON mode
__init__
def __init__(config: ClientConfig)

Initialize Mistral client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
api_key
property api_key() -> SecretStr

Get API key from config.

base_url
property base_url() -> str

Get base URL from config.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the Mistral API.

Calls the models endpoint to verify the API key is valid and the service is reachable.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for the response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult.
embed
async def embed(
    model: str = 'mistral-embed',
    input_texts: list[str] | str | None = None,
    **kwargs
) -> list[list[float]]

Generate embeddings.

Parameters
ParameterTypeDescription
`model`strModel ID (default: "mistral-embed").
`input_texts`list[str] | str | NoneText or list of texts to embed. **kwargs: Additional parameters.
Returns
TypeDescription
list[list[float]]List of embedding vectors.

Example

embeddings = await client.embed(
input_texts=["Hello world", "Bonjour monde"]
)
print(f"Embedding dimension: {len(embeddings[0])}")
close
async def close() -> None

Close the HTTP client.

Example

await client.close()

Token counter using mistral-common tokenizer (lazy-loaded).

Tokenizer is loaded on first use, not at construction time.

__init__
def __init__() -> None

Initialize MistralCounter.

model
property model() -> str

Backend identifier.

count
def count(text: str) -> int

Count tokens in a text string.

count_messages
def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages.


Model capabilities and constraints.

Pricing information for a specific LLM model.

Attributes: model: Model identifier (e.g., “gpt-4-turbo”, “claude-3-opus”). prompt_per_1m: Cost per 1 million prompt tokens in USD. completion_per_1m: Cost per 1 million completion tokens in USD. provider: Provider name (e.g., “openai”, “anthropic”). last_updated: When pricing was last updated. source: Where pricing data came from (e.g., “json”, “api”, “static”).

Example

pricing = ModelPricing(
model="gpt-4-turbo",
prompt_per_1m=10.00,
completion_per_1m=30.00,
provider="openai"
)
print(f"${pricing.prompt_per_1m} per 1M prompt tokens")
serialize_model
def serialize_model(handler: Callable[[ModelPricing], dict[str, Any]]) -> dict[str, Any]

Custom serializer to handle datetime objects.


Intelligent model selector with fallback support.

Automatically selects the best model based on prompt characteristics and provides fallback chains for reliability.

Example

selector = ModelSelector(
default_model="gpt-3.5-turbo",
strategies=[
SelectionStrategy(
name="complex",
model="gpt-4-turbo",
conditions={"min_tokens": 1000}
),
SelectionStrategy(
name="simple",
model="claude-3-haiku-20240307",
conditions={"max_tokens": 500}
)
],
fallback_chain=["gpt-4-turbo", "gpt-3.5-turbo"]
)
# Select model for a prompt
model = selector.select("Long prompt here...")
print(model)
'gpt-4-turbo'
>>>
>>> # Get next fallback on error
>>> fallback = selector.get_fallback("gpt-4-turbo")
>>> print(fallback)
'gpt-3.5-turbo'
__init__
def __init__(
    default_model: str | None = None,
    strategies: list[SelectionStrategy] | None = None,
    fallback_chain: list[str] | None = None,
    model_capabilities: dict[str, ModelCapabilities] | None = None,
    token_counter: TokenCounterProtocol | None = None
)

Initialize model selector.

Parameters
ParameterTypeDescription
`default_model`str | NoneDefault model to use
`strategies`list[SelectionStrategy] | NoneList of selection strategies
`fallback_chain`list[str] | NoneOrdered list of fallback models
`model_capabilities`dict[str, ModelCapabilities] | NoneCustom model capabilities
`token_counter`TokenCounterProtocol | NoneToken counter for prompt analysis

Example

selector = ModelSelector(
default_model="gpt-3.5-turbo",
fallback_chain=["gpt-4", "claude-3-sonnet-20240229"]
)
select
def select(
    prompt: str,
    context: dict[str, Any] | None = None,
    required_capabilities: list[str] | None = None
) -> str

Select the best model for the given prompt.

Parameters
ParameterTypeDescription
`prompt`strThe prompt text
`context`dict[str, Any] | NoneAdditional context for selection
`required_capabilities`list[str] | NoneRequired capabilities (e.g., ["supports_functions"])
Returns
TypeDescription
strSelected model name

Example

model = selector.select(
"Analyze this image...",
required_capabilities=["supports_vision"]
)
print(model)
'gpt-4-turbo'
get_fallback
def get_fallback(failed_model: str) -> str | None

Get the next model in the fallback chain.

Parameters
ParameterTypeDescription
`failed_model`strThe model that failed
Returns
TypeDescription
str | NoneNext fallback model, or None if no fallback available

Example

fallback = selector.get_fallback("gpt-4-turbo")
print(fallback)
'gpt-3.5-turbo'
get_capabilities
def get_capabilities(model: str) -> ModelCapabilities | None

Get capabilities for a model.

Parameters
ParameterTypeDescription
`model`strModel name
Returns
TypeDescription
ModelCapabilities | NoneModel capabilities or None if unknown

Example

caps = selector.get_capabilities("gpt-4-turbo")
print(caps.max_tokens)
128000
estimate_cost
def estimate_cost(
    model: str,
    input_tokens: int,
    output_tokens: int
) -> float

Estimate cost for a model call.

Parameters
ParameterTypeDescription
`model`strModel name
`input_tokens`intNumber of input tokens
`output_tokens`intNumber of output tokens
Returns
TypeDescription
floatEstimated cost in USD

Example

cost = selector.estimate_cost("gpt-4-turbo", 1000, 500)
print(f"${cost:.4f}")
$0.0250

Ollama LLM client for local models.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports running LLMs locally with Ollama:

  • Llama 3, Mistral, Phi, and other open models
  • Streaming responses
  • Zero API costs
  • Full data privacy

Example

from lexigram.ai import ClientConfig
config = ClientConfig(
provider="ollama",
model="llama3:8b",
api_base="http://localhost:11434"
)
client = OllamaClient(config)
completion = await client.complete([
ChatMessage(role="user", content="Hello!")
])
__init__
def __init__(config: ClientConfig)

Initialize Ollama client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
Raises
ExceptionDescription
ImportErrorIf ollama package is not installed
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the Ollama daemon.

Calls list() to verify the daemon is running and reachable.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for the response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult.
close
async def close() -> None

Close Ollama client.


OpenAI LLM client implementation.

Conforms to: LLMClientProtocol protocol via structural typing.

Supports GPT-4, GPT-3.5-Turbo, and other OpenAI models with:

  • Streaming responses
  • Function/tool calling
  • Vision models
  • Automatic retry with exponential backoff
  • Error handling and rate limit management

Example

from lexigram.ai import ClientConfig
config = ClientConfig(provider="openai", model="gpt-4-turbo")
client = OpenAIClient(config)
completion = await client.complete([
ChatMessage(role="user", content="Hello!")
])
__init__
def __init__(config: ClientConfig)

Initialize OpenAI client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
Raises
ExceptionDescription
ImportErrorIf openai package is not installed
close
async def close() -> None

Close the OpenAI client and cleanup resources.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Client for OpenRouter (OpenAI-compatible) API.

Conforms to: LLMClientProtocol protocol via structural typing.

__init__
def __init__(config: ClientConfig)

Initialize OpenRouter client.

Parameters
ParameterTypeDescription
`config`ClientConfigLLM configuration
api_key
property api_key() -> SecretStr

Get API key from config.

base_url
property base_url() -> str

Get base URL from config.

model
property model() -> str

Get default model from config.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight health check against the OpenRouter API.

Calls the models listing endpoint to verify the API key is valid and the service is reachable.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for the response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult.
embeddings
async def embeddings(
    texts: list[str],
    **kwargs: Any
) -> list[list[float]]
close
async def close() -> None

Filter LLM output for sensitive information.

Prevents leaking of system prompts, internal data, etc.

filter_output
def filter_output(
    output: str,
    system_prompt: str
) -> str

Filter LLM output for leaks.

Parameters
ParameterTypeDescription
`output`strLLM output
`system_prompt`strSystem prompt (check if leaked)
Returns
TypeDescription
strFiltered output

Registry for managing output parsers by name.

Provides a central registry for looking up parsers by name, similar to LangChain’s parser registry.

Example

registry = ParserRegistry()
registry.register("json", JSONOutputParser())
parser = registry.get("json")
assert parser is not None
__init__
def __init__() -> None

Initialize an empty registry.

register
def register(
    name: str,
    parser: Any
) -> None

Register a parser with a name.

Parameters
ParameterTypeDescription
`name`strUnique name for the parser.
`parser`AnyParser instance to register.
get
def get(name: str) -> Any

Get a parser by name.

Parameters
ParameterTypeDescription
`name`strName of the parser to retrieve.
Returns
TypeDescription
AnyThe registered parser.
Raises
ExceptionDescription
KeyErrorIf no parser is registered with that name.
get_or_none
def get_or_none(name: str) -> Any | None

Get a parser by name, returning None if not found.

Parameters
ParameterTypeDescription
`name`strName of the parser to retrieve.
Returns
TypeDescription
Any | NoneThe registered parser, or None if not found.
list_parsers
def list_parsers() -> list[str]

List all registered parser names.

Returns
TypeDescription
list[str]List of registered parser names.
unregister
def unregister(name: str) -> None

Unregister a parser by name.

Parameters
ParameterTypeDescription
`name`strName of the parser to unregister.
Raises
ExceptionDescription
KeyErrorIf no parser is registered with that name.
with_defaults
def with_defaults(cls) -> ParserRegistry

Create a registry with default parsers pre-registered.

Returns
TypeDescription
ParserRegistryA new ParserRegistry with default parsers.

Manages pricing data from multiple sources with caching.

Sources are queried in order until pricing is found. Typical hierarchy:

  1. JSON file (fastest, most reliable)
  2. API endpoints (for updates)
  3. Static fallback (hardcoded)

Attributes: sources: List of pricing sources in priority order. cache: Pricing cache instance. enable_fuzzy_match: Whether to enable fuzzy model name matching.

Example

# Use defaults
manager = PricingManager.from_defaults()
# Custom configuration
manager = (
PricingManager.builder()
.add_json_source("pricing.json")
.add_api_source("https://api.example.com/pricing")
.with_cache_ttl(3600)
.enable_fuzzy_matching()
.build()
)
pricing = await manager.get_pricing("gpt-4-turbo")
__init__
def __init__(
    sources: Sequence[AbstractPricingSource],
    cache_ttl: int = 86400,
    enable_fuzzy_match: bool = True
)

Initialize pricing manager.

Parameters
ParameterTypeDescription
`sources`Sequence[AbstractPricingSource]List of pricing sources in priority order.
`cache_ttl`intCache TTL in seconds (default: 24 hours).
`enable_fuzzy_match`boolEnable fuzzy model name matching (default: True).
get_pricing
async def get_pricing(
    model: str,
    force_refresh: bool = False
) -> ModelPricing

Get pricing for a specific model.

Queries sources in order:

  1. Cache (if not force_refresh)
  2. Each source in priority order
  3. Fuzzy match if enabled
  4. Default fallback
Parameters
ParameterTypeDescription
`model`strModel identifier (e.g., "gpt-4-turbo").
`force_refresh`boolBypass cache and fetch fresh data.
Returns
TypeDescription
ModelPricingModelPricing for the model.
Raises
ExceptionDescription
ValueErrorIf model not found in any source.
list_models
async def list_models(provider: str | None = None) -> list[str]

List all available models.

Parameters
ParameterTypeDescription
`provider`str | NoneFilter by provider (optional).
Returns
TypeDescription
list[str]List of model names.
clear_cache
async def clear_cache() -> None

Clear pricing cache.

from_defaults
def from_defaults(cls) -> PricingManager

Create manager with default configuration.

Uses LiteLLM API for dynamic, up-to-date pricing data. No static pricing files - always fetches current data.

Returns
TypeDescription
PricingManagerPricingManager with API source.

Example

manager = PricingManager.from_defaults()
pricing = await manager.get_pricing("gpt-4")
from_json
def from_json(
    cls,
    file_path: str | Path,
    cache_ttl: int = 86400
) -> PricingManager

Create manager from JSON file only.

Useful for offline applications or when you want full control over pricing data.

Parameters
ParameterTypeDescription
`file_path`str | PathPath to JSON pricing file.
`cache_ttl`intCache TTL in seconds (default: 24 hours).
Returns
TypeDescription
PricingManagerPricingManager with JSON source only.

Example

manager = PricingManager.from_json("my_pricing.json")
pricing = await manager.get_pricing("custom-model")
from_api
def from_api(
    cls,
    endpoint: str,
    cache_ttl: int = 86400
) -> PricingManager

Create manager from API endpoint only.

Parameters
ParameterTypeDescription
`endpoint`strAPI endpoint URL.
`cache_ttl`intCache TTL in seconds (default: 24 hours).
Returns
TypeDescription
PricingManagerPricingManager with API source only.

Example

manager = PricingManager.from_api("https://api.example.com/pricing")
pricing = await manager.get_pricing("gpt-4")
builder
def builder(cls) -> PricingManagerBuilder

Create a builder for custom configuration.

Returns
TypeDescription
PricingManagerBuilderPricingManagerBuilder instance.

Example

manager = (
PricingManager.builder()
.add_json_source("custom.json")
.add_api_source("https://api.example.com")
.with_cache_ttl(3600)
.build()
)

Builder for PricingManager with validation.

Provides a fluent API for configuring pricing sources safely.

Example

manager = (
PricingManager.builder()
.add_json_source("pricing.json")
.add_api_source("https://api.example.com/pricing")
.add_fallback({"custom-model": ModelPricing(...)})
.with_cache_ttl(3600)
.enable_fuzzy_matching()
.build()
)
__init__
def __init__() -> None

Initialize builder.

add_json_source
def add_json_source(file_path: str | Path) -> PricingManagerBuilder

Add JSON file pricing source.

Parameters
ParameterTypeDescription
`file_path`str | PathPath to JSON file.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
add_api_source
def add_api_source(
    endpoint: str,
    timeout: float = 10.0
) -> PricingManagerBuilder

Add API endpoint pricing source.

Parameters
ParameterTypeDescription
`endpoint`strAPI endpoint URL.
`timeout`floatRequest timeout in seconds (default: 10).
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
add_fallback
def add_fallback(pricing_map: dict[str, ModelPricing]) -> PricingManagerBuilder

Add static fallback pricing.

Parameters
ParameterTypeDescription
`pricing_map`dict[str, ModelPricing]Dictionary of model to pricing.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
add_source
def add_source(source: AbstractPricingSource) -> PricingManagerBuilder

Add custom pricing source.

Parameters
ParameterTypeDescription
`source`AbstractPricingSourceCustom AbstractPricingSource implementation.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
with_cache_ttl
def with_cache_ttl(seconds: int) -> PricingManagerBuilder

Set cache TTL.

Parameters
ParameterTypeDescription
`seconds`intCache TTL in seconds.
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
Raises
ExceptionDescription
ValueErrorIf seconds is negative.
enable_fuzzy_matching
def enable_fuzzy_matching(enabled: bool = True) -> PricingManagerBuilder

Enable or disable fuzzy model name matching.

Parameters
ParameterTypeDescription
`enabled`boolWhether to enable fuzzy matching (default: True).
Returns
TypeDescription
PricingManagerBuilderSelf for chaining.
build
def build() -> PricingManager

Build PricingManager instance.

Returns
TypeDescription
PricingManagerConfigured PricingManager.
Raises
ExceptionDescription
ValueErrorIf no sources were added.

Configuration for a single provider in the routing cascade.

Every provider in the cascade has the same shape regardless of type. Provider-specific fields (Azure deployment, Cloudflare account ID, Bedrock region, Vertex project) go in extras.

Example

cfg = ProviderConfig(
name="groq",
model="llama-3.3-70b-versatile",
api_key="gsk_...",
)

Information about an LLM provider.

Attributes: name: Provider identifier (e.g., “openai”, “anthropic”). client_class: LLMClientProtocol implementation class. default_models: List of default/recommended models. supports_streaming: Whether streaming is supported. supports_tools: Whether function/tool calling is supported. supports_vision: Whether vision/image inputs are supported. base_url: Default base URL for API (optional). docs_url: Documentation URL (optional). pricing_url: Pricing page URL (optional). description: Human-readable description.


Registry for LLM providers.

Singleton registry that maintains information about all available LLM providers, both built-in and custom.

__init__
def __init__() -> None

Initialize provider registry.

register
def register(
    name: str,
    client_class: type[object],
    default_models: list[str] | None = None,
    supports_streaming: bool = True,
    supports_tools: bool = False,
    supports_vision: bool = False,
    base_url: str | None = None,
    docs_url: str | None = None,
    pricing_url: str | None = None,
    description: str = ''
) -> ProviderInfo

Register a new LLM provider.

get_provider
def get_provider(name: str) -> ProviderInfo

Get provider information.

list_providers
def list_providers() -> list[str]

List all registered provider names.

search_providers
def search_providers(
    supports_streaming: bool | None = None,
    supports_tools: bool | None = None,
    supports_vision: bool | None = None
) -> list[ProviderInfo]

Search providers by capabilities.

unregister
def unregister(name: str) -> None

Unregister a provider.

register_provider
async def register_provider(
    name: str,
    client: LLMClientProtocol,
    models: list[ModelInfo]
) -> None

Register a provider following the ProviderRegistryProtocol.

get_client
async def get_client(provider: str) -> LLMClientProtocol | None

Get an initialized client for a provider.

list_models
def list_models(capabilities: set[ModelCapability] | None = None) -> list[ModelInfo]

List all models matching capabilities.

get_model_info
def get_model_info(model_id: str) -> ModelInfo | None

Get information about a specific model.


Parse LLM responses into Pydantic models.

Uses the existing structured parser’s validation logic to parse and validate against a Pydantic model.

Example

from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
parser = PydanticOutputParser(User)
result = parser.parse('{"name": "John", "age": 30}')
assert result.name == "John"
__init__
def __init__(model: type[BaseModel]) -> None

Initialize with a Pydantic model class.

Parameters
ParameterTypeDescription
`model`type[BaseModel]Pydantic BaseModel subclass to parse into.
parse
def parse(text: str) -> BaseModel

Parse text into a Pydantic model instance.

Parameters
ParameterTypeDescription
`text`strRaw LLM response text that may contain JSON.
Returns
TypeDescription
BaseModelValidated Pydantic model instance.
Raises
ExceptionDescription
ParseErrorWhen JSON cannot be extracted.
SchemaValidationErrorWhen validation fails.
get_format_instructions
def get_format_instructions() -> str

Return format instructions for the LLM.

Returns
TypeDescription
strFormat instruction string telling the model to output valid JSON that matches the Pydantic model schema.

Configuration for the quota tracking backend.

Example

cfg = QuotaConfig(backend="database")

Rate limiter for LLM requests (RPM and TPM).

Manages multiple buckets for different models and providers.

__init__
def __init__() -> None

Initialize rate limiter.

check
async def check(
    provider: str,
    model: str,
    tpm_limit: int | None = None,
    rpm_limit: int | None = None,
    estimated_tokens: int = 0
) -> bool

Check if request is allowed under current limits.

Parameters
ParameterTypeDescription
`provider`strAI provider name
`model`strModel name
`tpm_limit`int | NoneTokens Per Minute limit
`rpm_limit`int | NoneRequests Per Minute limit
`estimated_tokens`intEstimated tokens in request
Returns
TypeDescription
boolTrue if allowed, False if blocked

Redis-backed cache for distributed deployments.

Requires redis package to be installed.

Parameters
ParameterTypeDescription
`redis_url`Redis connection URL.
`ttl`Time-to-live in seconds.
`key_prefix`Prefix for all cache keys.

Example

cache = RedisLLMCache(redis_url="redis://localhost:6379")
await cache.connect()
result = await cache.get("key")
__init__
def __init__(
    cache_backend: CacheBackendProtocol,
    ttl: float = 3600,
    key_prefix: str = 'llm_cache:'
)

Initialize Redis cache.

Parameters
ParameterTypeDescription
`cache_backend`CacheBackendProtocolThe platform's cache backend.
`ttl`floatTime-to-live in seconds.
`key_prefix`strPrefix for cache keys.
connect
async def connect() -> None

Compatibility method for lifecycle-managed cache.

disconnect
async def disconnect() -> None

Compatibility method for lifecycle-managed cache.

get
async def get(key: str | dict[str, Any]) -> Any | None

Get value from Redis cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
Returns
TypeDescription
Any | NoneCached value or None.
set
async def set(
    key: str | dict[str, Any],
    value: Any,
    ttl: float | None = None
) -> None

Set value in Redis cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
`value`AnyValue to cache.
`ttl`float | NoneOptional TTL override.
get_or_compute
async def get_or_compute(
    key: str | dict[str, Any],
    compute_fn: Callable[[], Any],
    ttl: float | None = None
) -> Any

Get from cache or compute and cache result.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
`compute_fn`Callable[[], Any]Function to compute value if cache miss.
`ttl`float | NoneOptional TTL override.
Returns
TypeDescription
AnyCached or computed value.
delete
async def delete(key: str | dict[str, Any]) -> bool

Delete entry from cache.

Parameters
ParameterTypeDescription
`key`str | dict[str, Any]Cache key.
Returns
TypeDescription
boolTrue if deleted.
clear
async def clear() -> None

Clear all cache entries (Warning: clears entire backend if not namespaced).

get_stats
def get_stats() -> CacheStats

Get cache statistics.

Returns
TypeDescription
CacheStatsCacheStats object.

Format and convert LLM responses to various types.

Example

formatter = ResponseFormatter()
completion = Completion(content="42", ...)
num = formatter.to_int(completion)
print(num)
42
to_json
def to_json(completion: Completion) -> JSON

Convert response to JSON.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
JSONParsed JSON

Example

data = formatter.to_json(completion)
to_string
def to_string(
    completion: Completion,
    strip: bool = True
) -> str

Convert response to string.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
`strip`boolWhether to strip whitespace
Returns
TypeDescription
strResponse string

Example

text = formatter.to_string(completion)
to_int
def to_int(completion: Completion) -> int

Convert response to integer.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
intParsed integer
Raises
ExceptionDescription
ParseErrorIf conversion fails

Example

num = formatter.to_int(completion)
to_float
def to_float(completion: Completion) -> float

Convert response to float.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
floatParsed float
Raises
ExceptionDescription
ParseErrorIf conversion fails

Example

num = formatter.to_float(completion)
to_bool
def to_bool(completion: Completion) -> bool

Convert response to boolean.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
Returns
TypeDescription
boolParsed boolean

Example

result = formatter.to_bool(completion)
to_list
def to_list(
    completion: Completion,
    separator: str = '\n'
) -> list[str]

Convert response to list of strings.

Parameters
ParameterTypeDescription
`completion`CompletionLLM completion
`separator`strString separator (default: newline)
Returns
TypeDescription
list[str]List of strings

Example

items = formatter.to_list(completion)

Concrete chat message role constants shared across AI packages.

Route input to different runnables based on a predicate.

Like LangChain’s RunnableBranch, this evaluates predicates to select which branch to execute.

__init__
def __init__(
    branches: list[tuple[Callable[[Any], bool], RunnableProtocol]],
    default: RunnableProtocol | None = None
) -> None
invoke
def invoke(input: Any) -> Any

Synchronously route to matching branch.

Parameters
ParameterTypeDescription
`input`AnyInput to evaluate against predicates.
Returns
TypeDescription
AnyOutput from the first matching branch, or default if no match.
ainvoke
async def ainvoke(input: Any) -> Any

Asynchronously route to matching branch.

Parameters
ParameterTypeDescription
`input`AnyInput to evaluate against predicates.
Returns
TypeDescription
AnyOutput from the first matching branch, or default if no match.

Wrap a function as a runnable.

Accepts sync or async functions and wraps them to satisfy RunnableProtocol. Failures become Err(RunnableError(…)).

Parameters
ParameterTypeDescription
`func`A sync or async function to wrap.
__init__
def __init__(func: Callable[[Any], Any]) -> None
invoke
def invoke(input: Any) -> Any

Synchronously invoke the wrapped function.

Parameters
ParameterTypeDescription
`input`AnyInput to the function.
Returns
TypeDescription
AnyFunction output or Err on failure.
ainvoke
async def ainvoke(input: Any) -> Any

Asynchronously invoke the wrapped function.

Parameters
ParameterTypeDescription
`input`AnyInput to the function.
Returns
TypeDescription
AnyFunction output or Err on failure.

Mixin that adds pipe operator to runnables.

Provides the | operator that composes runnables into RunnableSequence. Analogous to LangChain’s RunnableBinding.

invoke
def invoke(input: Any) -> Any

Process input synchronously. Override in subclass.

ainvoke
async def ainvoke(input: Any) -> Any

Process input asynchronously. Override in subclass.


Run multiple runnables concurrently.

Each runnable receives the same input and results are returned as a dict.

__init__
def __init__(**runnables: RunnableProtocol) -> None
invoke
def invoke(input: Any) -> dict[str, Any]

Synchronously invoke all runnables.

Parameters
ParameterTypeDescription
`input`AnyInput to pass to all runnables.
Returns
TypeDescription
dict[str, Any]Dict mapping names to outputs.
ainvoke
async def ainvoke(input: Any) -> dict[str, Any]

Asynchronously invoke all runnables concurrently.

Parameters
ParameterTypeDescription
`input`AnyInput to pass to all runnables.
Returns
TypeDescription
dict[str, Any]Dict mapping names to outputs.

Pass input through with optional key assignment.

Returns the input unchanged but can assign it to a key in the output dict. Useful for combining with RunnableParallel.

__init__
def __init__(name: str | None = None) -> None
invoke
def invoke(input: Any) -> Any

Return input, optionally wrapped in dict with named key.

Parameters
ParameterTypeDescription
`input`AnyInput to pass through.
Returns
TypeDescription
AnyInput as-is, or dict with named key if name is set.
ainvoke
async def ainvoke(input: Any) -> Any

Return input, optionally wrapped in dict with named key.

Parameters
ParameterTypeDescription
`input`AnyInput to pass through.
Returns
TypeDescription
AnyInput as-is, or dict with named key if name is set.

Chain multiple runnables in sequence.

The output of each runnable becomes the input to the next. Short-circuits on Err results.

__init__
def __init__(
    first: RunnableProtocol,
    second: RunnableProtocol
) -> None
invoke
def invoke(input: Any) -> Any

Synchronously invoke the chain.

Parameters
ParameterTypeDescription
`input`AnyInput to the first runnable.
Returns
TypeDescription
AnyOutput from the last runnable, or Err if any step fails.
ainvoke
async def ainvoke(input: Any) -> Any

Asynchronously invoke the chain.

Parameters
ParameterTypeDescription
`input`AnyInput to the first runnable.
Returns
TypeDescription
AnyOutput from the last runnable, or Err if any step fails.

LLM client with injection protection and safety features.
__init__
def __init__(
    llm_provider: Annotated[LLMClientProtocol, Inject],
    system_prompt: str = 'You are a helpful assistant.',
    enable_output_filtering: bool = True,
    rate_limiter: Annotated[RateLimiter | None, Inject] = None,
    rpm_limit: int = 60
) -> None

Initialize secure LLM client.

Parameters
ParameterTypeDescription
`llm_provider`Annotated[LLMClientProtocol, Inject]Underlying LLM provider (injected)
`system_prompt`strSystem prompt template
`enable_output_filtering`boolEnable output filtering
chat
async def chat(
    user_input: str,
    user_id: str,
    context: Sequence[dict[str, str]] | None = None,
    strict_validation: bool = True
) -> str

Send chat message with safety protections.

Parameters
ParameterTypeDescription
`user_input`strUser message
`user_id`strUser identifier (for rate limiting)
`context`Sequence[dict[str, str]] | NonePrevious conversation context
`strict_validation`boolReject invalid input vs sanitize
Returns
TypeDescription
strLLM response
Raises
ExceptionDescription
ValueErrorIf input invalid (strict mode)
update_system_prompt
def update_system_prompt(system_prompt: str) -> None

Update system prompt.

Parameters
ParameterTypeDescription
`system_prompt`strNew system prompt

Structured prompt template with injection protection.

Uses clear delimiters to separate system instructions from user input. Implements multi-layered injection detection.

detect_injection
def detect_injection(prompt: str) -> tuple[bool, list[str]]

Multi-layered injection detection.

Parameters
ParameterTypeDescription
`prompt`strInput to analyze
Returns
TypeDescription
tuple[bool, list[str]]Tuple of (is_malicious, reasons)
validate_input
def validate_input(user_input: str) -> tuple[bool, str | None]

Validate user input for injection attempts.

Parameters
ParameterTypeDescription
`user_input`strUser input to validate
Returns
TypeDescription
tuple[bool, str | None]Tuple of (is_valid, error_message)
sanitize_input
def sanitize_input(user_input: str) -> str

Sanitize user input by removing dangerous patterns.

Parameters
ParameterTypeDescription
`user_input`strUser input to sanitize
Returns
TypeDescription
strSanitized input
format
def format(
    user_input: str,
    strict: bool = True
) -> str

Format prompt with user input.

Parameters
ParameterTypeDescription
`user_input`strUser input
`strict`boolIf True, reject invalid input. If False, sanitize.
Returns
TypeDescription
strFormatted prompt
Raises
ExceptionDescription
ValueErrorIf input invalid and strict=True

Criteria for model selection.

Strategy for selecting models based on conditions.

Example

strategy = SelectionStrategy(
name="long_context",
model="gpt-4-turbo-preview",
conditions={
"min_tokens": 2000,
"max_tokens": 100000
}
)
matches
def matches(context: dict[str, Any]) -> bool

Check if this strategy matches the given context.

Parameters
ParameterTypeDescription
`context`dict[str, Any]Context dictionary with prompt info
Returns
TypeDescription
boolTrue if all conditions are met

Example

context = {"token_count": 2500, "has_code": True}
strategy.matches(context)
True

Pricing source from static dictionary.

Hardcoded pricing data as a fallback when other sources are unavailable. Useful for custom internal models or as ultimate fallback.

Attributes: pricing_map: Dictionary of model name to pricing.

Example

source = StaticPricingSource({
"my-model": ModelPricing(
model="my-model",
prompt_per_1m=5.0,
completion_per_1m=10.0,
provider="custom"
)
})
__init__
def __init__(pricing_map: dict[str, ModelPricing])

Initialize static pricing source.

Parameters
ParameterTypeDescription
`pricing_map`dict[str, ModelPricing]Dictionary mapping model names to pricing.
get_pricing
async def get_pricing(model: str) -> ModelPricing | None

Get pricing for a specific model.

Parameters
ParameterTypeDescription
`model`strModel identifier.
Returns
TypeDescription
ModelPricing | NoneModelPricing if found, None otherwise.
get_all_pricing
async def get_all_pricing() -> dict[str, ModelPricing]

Get all pricing data.

Returns
TypeDescription
dict[str, ModelPricing]All static pricing data.
source_name
property source_name() -> str

Get source name.


A chunk of streamed completion.

Implements streaming semantics with DomainModel for validation.

Example

chunk = StreamChunk(delta="Hello", model="gpt-4-turbo", finish_reason=None)

Schema-aware parser that validates LLM responses against a model.

Wraps extract_json_block, validate_against_model, and build_json_schema into a convenient class-based API.

Parameters
ParameterTypeDescription
`output_model`Model class for validation.
`strict`Whether to enforce strict validation (default ``True``).
__init__
def __init__(
    output_model: type[Any],
    *,
    strict: bool = True
) -> None

Initialise with model class.

parse
def parse(completion: Any) -> Any

Parse and validate a completion into an output_model instance.

Parameters
ParameterTypeDescription
`completion`AnyCompletion object with ``.content`` attribute, or a string.
Returns
TypeDescription
AnyValidated model instance.
Raises
ExceptionDescription
ParseErrorWhen JSON cannot be extracted.
SchemaValidationErrorWhen validation fails.
parse_array
def parse_array(completion: Any) -> list[Any]

Parse and validate an array of output_model instances.

Parameters
ParameterTypeDescription
`completion`AnyCompletion object with ``.content`` attribute.
Returns
TypeDescription
list[Any]List of validated model instances.
Raises
ExceptionDescription
ParseErrorWhen JSON is not an array.
SchemaValidationErrorWhen validation fails.
get_json_schema
def get_json_schema() -> dict[str, Any]

Return JSON Schema dict for the output model.

get_schema_prompt
def get_schema_prompt() -> str

Return a human-readable schema prompt string.


A plain-text content part in a multimodal message.

Attributes: text: The text content. type: Discriminator field, always "text".


Token counter using tiktoken (OpenAI/compatible models).

Implements TokenCounterProtocol using tiktoken for precise counting. tiktoken is a required dependency for this counter.

Parameters
ParameterTypeDescription
`model`Model name (e.g. 'gpt-4', 'gpt-3.5-turbo').
`encoding_name`Optional tiktoken encoding name override.
__init__
def __init__(
    model: str = 'gpt-3.5-turbo',
    encoding_name: str | None = None
) -> None

Initialize TiktokenCounter.

Parameters
ParameterTypeDescription
`model`strModel name for token counting.
`encoding_name`str | NoneOptional tiktoken encoding name override.
Raises
ExceptionDescription
ImportErrorIf tiktoken is not installed.
model
property model() -> str

The model this counter is calibrated for.

count
def count(text: str) -> int

Count tokens in a text string.

count_messages
def count_messages(messages: list[ChatMessage]) -> int

Count tokens in a list of chat messages, including overhead.


Token count result with metadata.

Attributes: total: Total number of tokens. prompt_tokens: Number of tokens in the prompt. completion_tokens: Number of tokens in the completion (if applicable). model: Model name used for counting. timestamp: When the count was performed.


Registry mapping model-name patterns to TokenCounterProtocol backends.

Uses named backend keys and regex patterns for flexible model mapping.

Usage

registry = TokenCounterRegistry.with_defaults()
counter = registry.for_model("gpt-4o")
tokens = counter.count("Hello!")
registry = TokenCounterRegistry.with_defaults()
counter = registry.for_model("gpt-4o")
tokens = counter.count("Hello!")
__init__
def __init__() -> None

Create an empty registry.

with_defaults
def with_defaults(cls) -> TokenCounterRegistry

Create registry with all available tokenizer backends.

Registers:

  • char_estimate (always available, fallback)
  • tiktoken (if installed, for OpenAI/Anthropic models)
  • huggingface (if installed, for HuggingFace models)
  • mistral (if installed, for Mistral models)
Returns
TypeDescription
TokenCounterRegistryTokenCounterRegistry pre-populated with default backends.
register
def register(
    key: str,
    counter: TokenCounterProtocol
) -> None

Register a counter backend under a named key.

Parameters
ParameterTypeDescription
`key`strBackend name (e.g., 'tiktoken', 'huggingface', 'char_estimate').
`counter`TokenCounterProtocolCounter implementing TokenCounterProtocol.
map_models
def map_models(
    pattern: str,
    counter_key: str
) -> None

Map a regex pattern of model names to a backend key.

Parameters
ParameterTypeDescription
`pattern`strRegex pattern matching model names (case-insensitive).
`counter_key`strBackend key (must be registered).
for_model
def for_model(model: str) -> TokenCounterProtocol

Get the best counter for the given model name.

Tries exact regex match in _patterns first, falls back to ‘char_estimate’.

Parameters
ParameterTypeDescription
`model`strModel name.
Returns
TypeDescription
TokenCounterProtocolTokenCounterProtocol implementation.

Token usage statistics.

Tool call request from LLM.

complete_with_json
async def complete_with_json(
    client: LLMClientProtocol,
    prompt: str,
    system_prompt: str | None = None,
    **kwargs: Any
) -> JSON
Complete and parse response as JSON.
Parameters
ParameterTypeDescription
`client`LLMClientProtocolLLM client
`prompt`strUser prompt
`system_prompt`str | NoneOptional system prompt **kwargs: Additional completion arguments
Returns
TypeDescription
JSONParsed JSON

Example

data = await complete_with_json(
client,
"Generate a config with 3 fields"
)

complete_with_schema
async def complete_with_schema(
    client: LLMClientProtocol,
    prompt: str,
    schema: type[T],
    system_prompt: str | None = None,
    **kwargs: Any
) -> T
Complete with automatic schema parsing and validation.
Parameters
ParameterTypeDescription
`client`LLMClientProtocolLLM client
`prompt`strUser prompt
`schema`type[T]Pydantic model for validation
`system_prompt`str | NoneOptional system prompt **kwargs: Additional completion arguments
Returns
TypeDescription
TValidated schema instance

Example

from lexigram.ai.llm import OpenAIClient
client = OpenAIClient(api_key="sk-...")
person = await complete_with_schema(
client,
"Extract person from: John Doe, age 30",
schema=Person
)

create_assistant_template
def create_assistant_template() -> SecurePromptTemplate
Create template for general assistant.
Returns
TypeDescription
SecurePromptTemplateConfigured template

create_balanced_selector
def create_balanced_selector() -> ModelSelector
Create a balanced model selector.

create_cost_optimized_selector
def create_cost_optimized_selector(budget_per_1k_tokens: float = 2.0) -> ModelSelector
Create a cost-optimized model selector.

create_data_extraction_template
def create_data_extraction_template() -> SecurePromptTemplate
Create template for data extraction (high security).
Returns
TypeDescription
SecurePromptTemplateConfigured template

create_json_mode_messages
def create_json_mode_messages(
    prompt: str,
    schema: type[DomainModel] | None = None,
    system_prompt: str | None = None
) -> list[dict[str, str]]
Create messages for JSON mode with optional schema.
Parameters
ParameterTypeDescription
`prompt`strUser prompt
`schema`type[DomainModel] | NoneOptional Pydantic model for schema
`system_prompt`str | NoneOptional system prompt (default: JSON instruction)
Returns
TypeDescription
list[dict[str, str]]Messages list for LLM

Example

messages = create_json_mode_messages(
"Extract person info",
schema=Person
)

create_quality_optimized_selector
def create_quality_optimized_selector() -> ModelSelector
Create a quality-optimized model selector.

create_token_counter
def create_token_counter(
    model: str = 'gpt-3.5-turbo',
    encoding_name: str | None = None
) -> TiktokenCounter
Factory function for creating token counters.
Parameters
ParameterTypeDescription
`model`strModel name.
`encoding_name`str | NoneOptional encoding name override.
Returns
TypeDescription
TiktokenCounterTiktokenCounter instance.

Example

from lexigram.ai.llm import create_token_counter
counter = create_token_counter("gpt-4")
count = counter.count("Hello!")
print(count)

normalize_thinking_text
def normalize_thinking_text(text: str) -> tuple[str, str | None]
Extract thinking text from raw LLM output.

Tries each pattern in THINKING_PATTERNS order. Returns (clean_content, thinking_text_or_None). clean_content has thinking block removed and is stripped. thinking_text is the raw thinking content (stripped), or None if not found.

Pattern matching is by substring presence of start_marker (and end_marker after it), NOT by model name. The bare-closing-tag pattern (end_marker="", no start) matches only when start_marker is NOT found but end_marker IS found — this covers models that output …thinking…\nresponse.

Falls back: after removing a thinking block, if clean_content is empty but thinking text was found, tries to extract from the first { or [ in the original text to recover any JSON that may have been embedded.

Parameters
ParameterTypeDescription
`text`strRaw LLM response text, possibly containing inline thinking tags.
Returns
TypeDescription
tuple[str, str | None]A tuple of (clean_content, thinking_text_or_None). - clean_content: The response text with thinking stripped out, stripped of whitespace. - thinking_text_or_None: The thinking/reasoning text, or None if no thinking found.

Base class for structured extraction errors in lexigram-ai-llm.

Error raised when extraction max retries are exhausted.

Error raised when extraction response cannot be parsed as JSON.

Error raised when parsed extraction response fails schema validation.

Error raised when a request to an LLM provider is invalid.

Invalid API key or credentials — infrastructure error, raised not wrapped.

Raised as an exception (NOT wrapped in Result). Indicates a misconfiguration the application cannot route around.


Content blocked by provider safety filter — recoverable via reformulation.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should reformulate the prompt or inform the user.


Base exception for all LLM-domain errors in lexigram-ai-llm.

Model unavailable or not found — recoverable via fallback routing.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should route to a different model or provider.


API quota or billing limit exceeded — recoverable by routing elsewhere.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should route the request to a different provider or account.


Rate limit exceeded — recoverable via backoff/retry.

Returned as Err from LLMClientProtocol.complete() / stream_chat(). The caller should implement exponential backoff or route to another provider.


Model unavailable or not found — recoverable via fallback routing.

Raised when response cannot be parsed.

Error raised when connection to an LLM provider fails.

Raised when parsed response fails validation.

Error raised during LLM response streaming.

Base exception for structured output errors.

Error raised when the token limit for a request is exceeded.