API Reference
Protocols
Section titled “Protocols”AIHealthMonitorProtocol
Section titled “AIHealthMonitorProtocol”Protocol for AI health monitoring.
Allows the DI container to resolve health status of AI sub-systems (LLM availability, vector store connectivity, etc.) through a stable interface.
Run health checks and return a HealthCheckResult-like object.
| Type | Description |
|---|---|
| Any | An object with at least a ``status`` attribute. |
Register a named health-check callable.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Unique check name. |
| `check` | Any | An async callable returning a health status. |
AIMetricsProtocol
Section titled “AIMetricsProtocol”Protocol for AI metrics collection.
Implementations record counters, histograms, and gauges for LLM/vector operations without coupling to a specific metrics backend (Prometheus, StatsD, etc.).
Record a successful LLM completion.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Provider name. |
| `model` | str | Model identifier. |
| `tokens` | int | Total tokens consumed. |
| `cost` | float | Estimated dollar cost. |
Record an LLM or vector store error.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Provider name. |
| `error_type` | str | Short error category string. |
AITracerProtocol
Section titled “AITracerProtocol”Protocol for AI distributed tracing.
Implementations wrap individual LLM/vector calls in named spans, allowing distributed trace propagation without coupling to a specific tracing backend (OpenTelemetry, Jaeger, etc.).
Return a context manager that wraps an LLM call in a trace span.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Provider name (e.g. ``"openai"``). |
| `model` | str | Model identifier. |
| `streaming` | bool | Whether this is a streaming call. |
| Type | Description |
|---|---|
| Any | A synchronous context manager. |
ObservabilityProtocol
Section titled “ObservabilityProtocol”Protocol for AI-specific metrics and tracing.
Record a single LLM generation event.
Start a trace block, returning a trace ID.
End a trace block.
Classes
Section titled “Classes”AIHealthMonitor
Section titled “AIHealthMonitor”Health monitoring for intelligence components.
Performs health checks on:
- LLM endpoints
- Vector stores
- Cache services
- Embedding services
Example
from lexigram.logging import get_loggerlogger = get_logger(__name__)monitor = AIHealthMonitor()# Add health checksmonitor.add_llm_check("openai", check_openai_health)monitor.add_vector_check("pgvector", check_pgvector_health)# Run all checksresults = await monitor.check_all()if all(r.is_healthy() for r in results.values()):logger.info("health_check", status="all_systems_healthy")Initialize health monitor.
Add LLM health check.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Add vector store health check.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Vector store provider name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Add cache service health check.
| Parameter | Type | Description |
|---|---|---|
| `service` | str | Cache service name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Add embedding service health check.
| Parameter | Type | Description |
|---|---|---|
| `model` | str | Embedding model name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Check LLM endpoint health.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name |
| Type | Description |
|---|---|
| HealthCheckResult | Health check result |
Check vector store health.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Vector store provider name |
| Type | Description |
|---|---|
| HealthCheckResult | Health check result |
Check cache service health.
| Parameter | Type | Description |
|---|---|---|
| `service` | str | Cache service name |
| Type | Description |
|---|---|
| HealthCheckResult | Health check result |
Run all health checks.
| Type | Description |
|---|---|
| dict[str, HealthCheckResult] | Dictionary mapping component names to health check results |
Check if all components are ready (healthy or degraded).
| Type | Description |
|---|---|
| bool | True if all components are ready, False otherwise |
Check if service is alive (at least one component healthy).
| Type | Description |
|---|---|
| bool | True if at least one component is healthy, False otherwise |
AIMetrics
Section titled “AIMetrics”Centralized metrics collection for intelligence operations.
Provides counters, gauges, and histograms for tracking:
- LLM API calls, tokens, costs, and latency
- Vector store operations and performance
- Embedding cache hit rates
- RAG pipeline end-to-end performance
Example
metrics = AIMetrics()# Track LLM requestmetrics.llm_requests_total.increment(labels={"provider": "openai", "model": "gpt-4", "status": "success"})# Track tokensmetrics.llm_tokens_total.increment(amount=1500,labels={"provider": "openai", "model": "gpt-4", "type": "completion"})# Track durationmetrics.llm_duration_seconds.observe(value=0.523,labels={"provider": "openai", "model": "gpt-4"})Initialize intelligence metrics.
| Parameter | Type | Description |
|---|---|---|
| `collector` | Annotated[MetricsCollectorProtocol, Inject] | None | Metrics collector to use (DI-injected). |
Get the underlying metrics collector.
| Type | Description |
|---|---|
| MetricsCollectorProtocol | The MetricsCollectorProtocol instance for advanced usage. |
AIObservabilityStartedHook
Section titled “AIObservabilityStartedHook”Payload fired after the AI observability subsystem has initialised.
AITracer
Section titled “AITracer”Distributed tracer for intelligence operations.
Provides span management and context propagation for:
- LLM completions and streaming
- Vector store operations
- RAG pipeline execution
- Embedding generation
Also implements CallbackHandlerProtocol for event-driven tracing.
Example
tracer = AITracer()async with tracer.trace_llm_call("openai", "gpt-4") as span:response = await client.complete(messages)span.set_attribute("tokens.total", response.usage.total_tokens)span.set_attribute("cost", response.cost)Initialize intelligence tracer.
| Parameter | Type | Description |
|---|---|---|
| `tracer` | Tracer | Tracer instance to use for tracing. |
Create a span for LLM API call.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name (e.g., "openai", "anthropic") |
| `model` | str | Model name (e.g., "gpt-4", "claude-3-opus") **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
tracer = AITracer()with tracer.trace_llm_call("openai", "gpt-4") as span:response = await client.complete(messages)span.set_attribute("tokens.total", response.usage.total_tokens)Generic operation tracing helper.
This mirrors the Tracer.trace_operation API and is used by
worker code that needs a generic operation span (e.g., document
parsing/chunking).
Create a span for vector store operation.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | Operation type (e.g., "add", "search", "delete") |
| `provider` | str | Vector store provider (e.g., "pgvector", "chroma") |
| `collection` | str | None | Optional collection/table name **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_vector_operation("search", "pgvector", "documents") as span:results = await store.search(query, limit=10)span.set_attribute("results.count", len(results))Create a span for embedding generation.
| Parameter | Type | Description |
|---|---|---|
| `model` | str | Embedding model name |
| `batch_size` | int | None | Optional number of texts being embedded **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_embedding_operation("text-embedding-ada-002", 5) as span:embeddings = await embedder.embed(texts)span.set_attribute("embeddings.dimensions", len(embeddings[0]))Create a span for RAG pipeline stage.
| Parameter | Type | Description |
|---|---|---|
| `stage` | str | Stage name (e.g., "retrieval", "ranking", "synthesis") |
| `pipeline` | str | Pipeline name **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_rag_stage("retrieval", "default") as span:documents = await retriever.retrieve(query)span.set_attribute("documents.count", len(documents))Create a span for complete RAG query.
| Parameter | Type | Description |
|---|---|---|
| `query` | str | Query text |
| `pipeline` | str | Pipeline name **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_rag_query("What is Python?") as span:result = await rag_pipeline.query(query)span.set_attribute("answer.length", len(result.answer))Get the currently active span.
| Type | Description |
|---|---|
| Span | None | Current span or None |
Called when an LLM call starts.
Called for each new token in a streaming LLM response.
Called when an LLM call completes successfully.
Called when an LLM call fails.
Called when a chain/pipeline starts executing.
Called when a chain/pipeline completes.
Called when a tool starts executing.
Called when a tool finishes executing.
Called when an agent takes an action.
Called when an agent finishes executing.
Called when a retriever starts a search.
Called when a retriever completes a search.
HealthCheckRunHook
Section titled “HealthCheckRunHook”Payload fired after an AI health check completes.
Attributes:
component: Name of the component that was checked (e.g. "llm").
healthy: True if the component reported a healthy state.
LLMCallTracedHook
Section titled “LLMCallTracedHook”Payload fired when a completed LLM call is recorded by the tracer.
Attributes:
provider: Provider identifier whose call was traced (e.g. "openai").
model: Model name that was traced (e.g. "gpt-4o").
ObservabilityConfig
Section titled “ObservabilityConfig”Configuration for AI observability.
Loaded from the ai_observability: key in application.yaml, with environment
variable overrides via LEX_AI_OBSERVABILITY__* prefix.
Check config is safe for the target environment.
ObservabilityModule
Section titled “ObservabilityModule”AI Observability module — registers ObservabilityProvider.
Create an ObservabilityModule with explicit configuration.
Return a no-op ObservabilityModule for testing.
Registers observability infrastructure with noop tracing and metrics. No external telemetry systems are connected.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule with noop observability configuration. |
ObservabilityProvider
Section titled “ObservabilityProvider”Provider for AI Observability.
Registers AIMetrics, AITracer, and AIHealthMonitor.
During boot(), self-wires observability decorators around any
LLMClientProtocol and VectorStoreProtocol that are already
registered in the container, so the wrapping is transparent to callers.
Factory method for DI container setup.
Register the observability services.
Boot phase — self-wire observability wrappers into the container.
If LLMClientProtocol or VectorStoreProtocol are registered,
they are replaced with instrumented wrappers. Both AITracer and
AIMetrics must be available; if either is missing the wrapping is
skipped gracefully.
Shutdown phase.
Health check — always healthy (in-process domain provider).
No external backend to ping.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Ignored for in-process providers. |
| Type | Description |
|---|---|
| HealthCheckResult | Always HEALTHY — no external backend to ping. |
ObservableLLMClient
Section titled “ObservableLLMClient”Decorator that adds tracing and metrics to any LLMClientProtocol.
Wraps the delegate client so callers interact with the same
LLMClientProtocol protocol while every
complete() and stream_chat() call is:
Either dependency may be None (e.g. when the monitoring module is not
installed), in which case the wrapper transparently delegates to the
underlying client.
Example
from lexigram.ai.observability.observable_llm import ObservableLLMClientclient = ObservableLLMClient(raw_client, provider="openai",model="gpt-4o", tracer=tracer,metrics=metrics)response = await client.complete(messages)Complete with tracing and metrics.
Stream with tracing — returns AsyncStream directly.
The stream is established lazily when iteration begins. Tracing context is captured synchronously and applied during iteration.
Delegate health check.
Delegate close.
ObservableVectorStore
Section titled “ObservableVectorStore”Proxy that adds tracing and metrics to any VectorStoreProtocol.
Wraps the delegate store so callers interact with the same
VectorStoreProtocol protocol while every
add(), search(), and delete() call is:
Either dependency may be None (e.g. when the monitoring module is not
installed), in which case the wrapper transparently delegates to the
underlying store.
Example
store = ObservableVectorStore(raw_store, backend="pgvector",collection="documents", tracer=tracer,metrics=metrics)results = await store.search(query="find similar docs", k=5)Add documents with tracing and metrics.
Search with tracing and metrics.
Delete documents with tracing and metrics.
Delegate health check transparently.
Functions
Section titled “Functions”trace_llm
Section titled “trace_llm”Decorator to automatically trace LLM calls.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name |
| `model` | str | Model name |
| `tracer` | AITracer | AITracer instance to use for tracing |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
tracer = AITracer(some_tracer)@trace_llm(provider="openai", model="gpt-4", tracer=tracer)async def complete(messages):response = await client.complete(messages)return responsetrace_rag
Section titled “trace_rag”Decorator to automatically trace RAG pipeline stages.
| Parameter | Type | Description |
|---|---|---|
| `stage` | str | Stage name (e.g., "retrieval", "ranking", "synthesis") |
| `tracer` | AITracer | AITracer instance to use for tracing |
| `pipeline` | str | Pipeline name |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
tracer = AITracer(some_tracer)@trace_rag(stage="retrieval", tracer=tracer, pipeline="default")async def retrieve(query):documents = await retriever.retrieve(query)return documentstrace_vector
Section titled “trace_vector”Decorator to automatically trace vector operations.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | Operation type (e.g., "add", "search", "delete") |
| `provider` | str | Vector store provider |
| `tracer` | AITracer | AITracer instance to use for tracing |
| `collection` | str | None | Optional collection name |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
tracer = AITracer(some_tracer)@trace_vector(operation="search", provider="pgvector", tracer=tracer, collection="docs")async def search(query, limit=10):results = await store.search(query, limit)return resultstrack_embedding_operation
Section titled “track_embedding_operation”Decorator to automatically track embedding operation metrics.
| Parameter | Type | Description |
|---|---|---|
| `model` | str | Embedding model name (e.g., "text-embedding-ada-002") |
| `metrics` | AIMetrics | None | AIMetrics instance to use. If None, creates a new one. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
@track_embedding_operation(model="text-embedding-ada-002")async def embed_batch(texts):embeddings = await embedder.embed(texts)return embeddingstrack_llm_call
Section titled “track_llm_call”Decorator to automatically track LLM call metrics.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name (e.g., "openai", "anthropic") |
| `model` | str | Model name (e.g., "gpt-4", "claude-3-opus") |
| `metrics` | AIMetrics | None | AIMetrics instance to use. If None, creates a new one. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
@track_llm_call(provider="openai", model="gpt-4")async def complete(messages):response = await client.complete(messages)return responsetrack_vector_operation
Section titled “track_vector_operation”Decorator to automatically track vector store operation metrics.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | Operation type (e.g., "add", "search", "delete") |
| `provider` | str | Vector store provider (e.g., "pgvector", "chroma", "qdrant") |
| `metrics` | AIMetrics | None | AIMetrics instance to use. If None, creates a new one. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
@track_vector_operation(operation="search", provider="pgvector")async def search(query_embedding, limit=10):results = await store.search(query_embedding, limit)return resultsExceptions
Section titled “Exceptions”HealthCheckError
Section titled “HealthCheckError”Raised when a health check infrastructure operation fails.
MetricsError
Section titled “MetricsError”Raised when a metrics recording or retrieval operation fails.
ObservabilityError
Section titled “ObservabilityError”Base exception for all observability-related errors.
TracingError
Section titled “TracingError”Raised when a tracing operation fails.