API Reference
Protocols
Section titled “Protocols”AIHealthMonitorProtocol
Section titled “AIHealthMonitorProtocol”Protocol for AI health monitoring.
Allows the DI container to resolve health status of AI sub-systems (LLM availability, vector store connectivity, etc.) through a stable interface.
Run health checks and return a HealthCheckResult-like object.
| Type | Description |
|---|---|
| Any | An object with at least a ``status`` attribute. |
Register a named health-check callable.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Unique check name. |
| `check` | Any | An async callable returning a health status. |
AIMetricsProtocol
Section titled “AIMetricsProtocol”Protocol for AI metrics collection.
Implementations record counters, histograms, and gauges for LLM/vector operations without coupling to a specific metrics backend (Prometheus, StatsD, etc.).
Record a successful LLM completion.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Provider name. |
| `model` | str | Model identifier. |
| `tokens` | int | Total tokens consumed. |
| `cost` | float | Estimated dollar cost. |
Record an LLM or vector store error.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Provider name. |
| `error_type` | str | Short error category string. |
AITracerProtocol
Section titled “AITracerProtocol”Protocol for AI distributed tracing.
Implementations wrap individual LLM/vector calls in named spans, allowing distributed trace propagation without coupling to a specific tracing backend (OpenTelemetry, Jaeger, etc.).
Return a context manager that wraps an LLM call in a trace span.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Provider name (e.g. ``"openai"``). |
| `model` | str | Model identifier. |
| `streaming` | bool | Whether this is a streaming call. |
| Type | Description |
|---|---|
| Any | A synchronous context manager. |
ObservabilityProtocol
Section titled “ObservabilityProtocol”Protocol for AI-specific metrics and tracing.
async def record_generation( model: str, provider: str, tokens_prompt: int, tokens_completion: int, latency_ms: float, successful: bool ) -> None
Record a single LLM generation event.
Start a trace block, returning a trace ID.
End a trace block.
Classes
Section titled “Classes”AIHealthMonitor
Section titled “AIHealthMonitor”Health monitoring for intelligence components.
Performs health checks on:
- LLM endpoints
- Vector stores
- Cache services
- Embedding services
Example
from lexigram.logging import get_logger logger = get_logger(name) monitor = AIHealthMonitor()
Add health checks
Section titled “Add health checks”monitor.add_llm_check(“openai”, check_openai_health) monitor.add_vector_check(“pgvector”, check_pgvector_health)
Run all checks
Section titled “Run all checks”results = await monitor.check_all() if all(r.is_healthy() for r in results.values()): … logger.info(“health_check”, status=“all_systems_healthy”)
Initialize health monitor.
Add LLM health check.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Add vector store health check.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Vector store provider name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Add cache service health check.
| Parameter | Type | Description |
|---|---|---|
| `service` | str | Cache service name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Add embedding service health check.
| Parameter | Type | Description |
|---|---|---|
| `model` | str | Embedding model name |
| `check_func` | Any | Async function that returns HealthCheckResult |
Check LLM endpoint health.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name |
| Type | Description |
|---|---|
| HealthCheckResult | Health check result |
Check vector store health.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | Vector store provider name |
| Type | Description |
|---|---|
| HealthCheckResult | Health check result |
Check cache service health.
| Parameter | Type | Description |
|---|---|---|
| `service` | str | Cache service name |
| Type | Description |
|---|---|
| HealthCheckResult | Health check result |
Run all health checks.
| Type | Description |
|---|---|
| dict[str, HealthCheckResult] | Dictionary mapping component names to health check results |
Check if all components are ready (healthy or degraded).
| Type | Description |
|---|---|
| bool | True if all components are ready, False otherwise |
Check if service is alive (at least one component healthy).
| Type | Description |
|---|---|
| bool | True if at least one component is healthy, False otherwise |
AIMetrics
Section titled “AIMetrics”Centralized metrics collection for intelligence operations.
Provides counters, gauges, and histograms for tracking:
- LLM API calls, tokens, costs, and latency
- Vector store operations and performance
- Embedding cache hit rates
- RAG pipeline end-to-end performance
Example
metrics = AIMetrics()
Track LLM request
Section titled “Track LLM request”metrics.llm_requests_total.increment( … labels={“provider”: “openai”, “model”: “gpt-4”, “status”: “success”} … )
Track tokens
Section titled “Track tokens”metrics.llm_tokens_total.increment( … amount=1500, … labels={“provider”: “openai”, “model”: “gpt-4”, “type”: “completion”} … )
Track duration
Section titled “Track duration”metrics.llm_duration_seconds.observe( … value=0.523, … labels={“provider”: “openai”, “model”: “gpt-4”} … )
Initialize intelligence metrics.
| Parameter | Type | Description |
|---|---|---|
| `collector` | Annotated[MetricsCollectorProtocol, Inject] | None | Metrics collector to use (DI-injected). |
Get the underlying metrics collector.
| Type | Description |
|---|---|
| MetricsCollectorProtocol | The MetricsCollectorProtocol instance for advanced usage. |
AIObservabilityStartedHook
Section titled “AIObservabilityStartedHook”Payload fired after the AI observability subsystem has initialised.
AITracer
Section titled “AITracer”Distributed tracer for intelligence operations.
Provides span management and context propagation for:
- LLM completions and streaming
- Vector store operations
- RAG pipeline execution
- Embedding generation
Example
tracer = AITracer() async with tracer.trace_llm_call(“openai”, “gpt-4”) as span: … response = await client.complete(messages) … span.set_attribute(“tokens.total”, response.usage.total_tokens) … span.set_attribute(“cost”, response.cost)
def __init__(tracer: Tracer) -> None
Initialize intelligence tracer.
| Parameter | Type | Description |
|---|---|---|
| `tracer` | Tracer | Tracer instance to use for tracing. |
def trace_llm_call( provider: str, model: str, **attributes: Any ) -> ContextManager[Span]
Create a span for LLM API call.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name (e.g., "openai", "anthropic") |
| `model` | str | Model name (e.g., "gpt-4", "claude-3-opus") **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
tracer = AITracer() with tracer.trace_llm_call(“openai”, “gpt-4”) as span: … response = await client.complete(messages) … span.set_attribute(“tokens.total”, response.usage.total_tokens)
def trace_operation( name: str, **attrs: Any ) -> ContextManager[Span]
Generic operation tracing helper.
This mirrors the Tracer.trace_operation API and is used by
worker code that needs a generic operation span (e.g., document
parsing/chunking).
def trace_vector_operation( operation: str, provider: str, collection: str | None = None, **attributes: Any ) -> ContextManager[Span]
Create a span for vector store operation.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | Operation type (e.g., "add", "search", "delete") |
| `provider` | str | Vector store provider (e.g., "pgvector", "chroma") |
| `collection` | str | None | Optional collection/table name **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_vector_operation(“search”, “pgvector”, “documents”) as span: … results = await store.search(query, limit=10) … span.set_attribute(“results.count”, len(results))
def trace_embedding_operation( model: str, batch_size: int | None = None, **attributes: Any ) -> ContextManager[Span]
Create a span for embedding generation.
| Parameter | Type | Description |
|---|---|---|
| `model` | str | Embedding model name |
| `batch_size` | int | None | Optional number of texts being embedded **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_embedding_operation(“text-embedding-ada-002”, 5) as span: … embeddings = await embedder.embed(texts) … span.set_attribute(“embeddings.dimensions”, len(embeddings[0]))
def trace_rag_stage( stage: str, pipeline: str = 'default', **attributes: Any ) -> ContextManager[Span]
Create a span for RAG pipeline stage.
| Parameter | Type | Description |
|---|---|---|
| `stage` | str | Stage name (e.g., "retrieval", "ranking", "synthesis") |
| `pipeline` | str | Pipeline name **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_rag_stage(“retrieval”, “default”) as span: … documents = await retriever.retrieve(query) … span.set_attribute(“documents.count”, len(documents))
def trace_rag_query( query: str, pipeline: str = 'default', **attributes: Any ) -> ContextManager[Span]
Create a span for complete RAG query.
| Parameter | Type | Description |
|---|---|---|
| `query` | str | Query text |
| `pipeline` | str | Pipeline name **attributes: Additional span attributes |
| Type | Description |
|---|---|
| ContextManager[Span] | Span context manager |
Example
with tracer.trace_rag_query(“What is Python?”) as span: … result = await rag_pipeline.query(query) … span.set_attribute(“answer.length”, len(result.answer))
def get_current_span() -> Span | None
Get the currently active span.
| Type | Description |
|---|---|
| Span | None | Current span or None |
HealthCheckRunHook
Section titled “HealthCheckRunHook”Payload fired after an AI health check completes.
Attributes:
component: Name of the component that was checked (e.g. "llm").
healthy: True if the component reported a healthy state.
LLMCallTracedHook
Section titled “LLMCallTracedHook”Payload fired when a completed LLM call is recorded by the tracer.
Attributes:
provider: Provider identifier whose call was traced (e.g. "openai").
model: Model name that was traced (e.g. "gpt-4o").
ObservabilityConfig
Section titled “ObservabilityConfig”Configuration for AI observability.
Loaded from the ai_observability: key in application.yaml, with environment
variable overrides via LEX_AI_OBSERVABILITY__* prefix.
Check config is safe for the target environment.
ObservabilityModule
Section titled “ObservabilityModule”AI Observability module — registers ObservabilityProvider.
def configure( cls, config: Any | None = None ) -> DynamicModule
Create an ObservabilityModule with explicit configuration.
def stub( cls, config: Any = None ) -> DynamicModule
Return a no-op ObservabilityModule for testing.
Registers observability infrastructure with noop tracing and metrics. No external telemetry systems are connected.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule with noop observability configuration. |
ObservabilityProvider
Section titled “ObservabilityProvider”Provider for AI Observability.
Registers AIMetrics, AITracer, and AIHealthMonitor.
During boot(), self-wires observability decorators around any
LLMClientProtocol and VectorStoreProtocol that are already
registered in the container, so the wrapping is transparent to callers.
def __init__(config: ObservabilityConfig | None = None) -> None
def from_config( cls, config: ObservabilityConfig, **context ) -> ObservabilityProvider
Factory method for DI container setup.
async def register(container: ContainerRegistrarProtocol) -> None
Register the observability services.
Boot phase — self-wire observability wrappers into the container.
If LLMClientProtocol or VectorStoreProtocol are registered,
they are replaced with instrumented wrappers. Both AITracer and
AIMetrics must be available; if either is missing the wrapping is
skipped gracefully.
Shutdown phase.
Health check — always healthy (in-process domain provider).
No external backend to ping.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Ignored for in-process providers. |
| Type | Description |
|---|---|
| HealthCheckResult | Always HEALTHY — no external backend to ping. |
ObservableLLMClient
Section titled “ObservableLLMClient”Decorator that adds tracing and metrics to any LLMClientProtocol.
Wraps the delegate client so callers interact with the same
LLMClientProtocol protocol while every
complete() and stream_chat() call is:
Either dependency may be None (e.g. when the monitoring module is not
installed), in which case the wrapper transparently delegates to the
underlying client.
Example
from lexigram.ai.observability.observable_llm import ObservableLLMClient client = ObservableLLMClient(raw_client, provider=“openai”, … model=“gpt-4o”, tracer=tracer, … metrics=metrics) response = await client.complete(messages)
def __init__( delegate: LLMClientProtocol, *, provider: str, model: str, tracer: AITracer | None = None, metrics: AIMetrics | None = None, audit_store: AIAuditStoreProtocol | None = None ) -> None
Complete with tracing and metrics.
Stream with tracing — returns AsyncStream directly.
The stream is established lazily when iteration begins. Tracing context is captured synchronously and applied during iteration.
Delegate health check.
Delegate close.
ObservableVectorStore
Section titled “ObservableVectorStore”Proxy that adds tracing and metrics to any VectorStoreProtocol.
Wraps the delegate store so callers interact with the same
VectorStoreProtocol protocol while every
add(), search(), and delete() call is:
Either dependency may be None (e.g. when the monitoring module is not
installed), in which case the wrapper transparently delegates to the
underlying store.
Example
store = ObservableVectorStore(raw_store, backend=“pgvector”, … collection=“documents”, tracer=tracer, … metrics=metrics) results = await store.search(query=“find similar docs”, k=5)
def __init__( delegate: Any, *, backend: str, collection: str | None = None, tracer: AITracer | None = None, metrics: AIMetrics | None = None ) -> None
async def add(documents: list[Any]) -> Result[list[str], VectorError]
Add documents with tracing and metrics.
async def search( query_vector: list[float] | None = None, query: Any = None, k: int | None = None, top_k: int | None = None, filter: dict[str, Any] | None = None, **kwargs: Any ) -> Result[list[SearchResultProtocol], VectorError]
Search with tracing and metrics.
async def delete(ids: list[str]) -> Result[int, VectorError]
Delete documents with tracing and metrics.
Delegate health check transparently.
Functions
Section titled “Functions”trace_llm
Section titled “trace_llm”
def trace_llm( provider: str, model: str, tracer: AITracer ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically trace LLM calls.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name |
| `model` | str | Model name |
| `tracer` | AITracer | AITracer instance to use for tracing |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
tracer = AITracer(some_tracer) @trace_llm(provider=“openai”, model=“gpt-4”, tracer=tracer) … async def complete(messages): … response = await client.complete(messages) … return response
trace_rag
Section titled “trace_rag”
def trace_rag( stage: str, tracer: AITracer, pipeline: str = 'default' ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically trace RAG pipeline stages.
| Parameter | Type | Description |
|---|---|---|
| `stage` | str | Stage name (e.g., "retrieval", "ranking", "synthesis") |
| `tracer` | AITracer | AITracer instance to use for tracing |
| `pipeline` | str | Pipeline name |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
tracer = AITracer(some_tracer) @trace_rag(stage=“retrieval”, tracer=tracer, pipeline=“default”) … async def retrieve(query): … documents = await retriever.retrieve(query) … return documents
trace_vector
Section titled “trace_vector”
def trace_vector( operation: str, provider: str, tracer: AITracer, collection: str | None = None ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically trace vector operations.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | Operation type (e.g., "add", "search", "delete") |
| `provider` | str | Vector store provider |
| `tracer` | AITracer | AITracer instance to use for tracing |
| `collection` | str | None | Optional collection name |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
tracer = AITracer(some_tracer) @trace_vector(operation=“search”, provider=“pgvector”, tracer=tracer, collection=“docs”) … async def search(query, limit=10): … results = await store.search(query, limit) … return results
track_embedding_operation
Section titled “track_embedding_operation”
def track_embedding_operation( model: str, metrics: AIMetrics | None = None ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically track embedding operation metrics.
| Parameter | Type | Description |
|---|---|---|
| `model` | str | Embedding model name (e.g., "text-embedding-ada-002") |
| `metrics` | AIMetrics | None | AIMetrics instance to use. If None, creates a new one. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
@track_embedding_operation(model=“text-embedding-ada-002”) … async def embed_batch(texts): … embeddings = await embedder.embed(texts) … return embeddings
track_llm_call
Section titled “track_llm_call”
def track_llm_call( provider: str, model: str, metrics: AIMetrics | None = None ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically track LLM call metrics.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | LLM provider name (e.g., "openai", "anthropic") |
| `model` | str | Model name (e.g., "gpt-4", "claude-3-opus") |
| `metrics` | AIMetrics | None | AIMetrics instance to use. If None, creates a new one. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
@track_llm_call(provider=“openai”, model=“gpt-4”) … async def complete(messages): … response = await client.complete(messages) … return response
track_vector_operation
Section titled “track_vector_operation”
def track_vector_operation( operation: str, provider: str, metrics: AIMetrics | None = None ) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically track vector store operation metrics.
| Parameter | Type | Description |
|---|---|---|
| `operation` | str | Operation type (e.g., "add", "search", "delete") |
| `provider` | str | Vector store provider (e.g., "pgvector", "chroma", "qdrant") |
| `metrics` | AIMetrics | None | AIMetrics instance to use. If None, creates a new one. |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]] | Decorator function |
Example
@track_vector_operation(operation=“search”, provider=“pgvector”) … async def search(query_embedding, limit=10): … results = await store.search(query_embedding, limit) … return results
Exceptions
Section titled “Exceptions”HealthCheckError
Section titled “HealthCheckError”Raised when a health check infrastructure operation fails.
MetricsError
Section titled “MetricsError”Raised when a metrics recording or retrieval operation fails.
ObservabilityError
Section titled “ObservabilityError”Base exception for all observability-related errors.
TracingError
Section titled “TracingError”Raised when a tracing operation fails.