Skip to content
GitHub

API Reference

Protocol for AI health monitoring.

Allows the DI container to resolve health status of AI sub-systems (LLM availability, vector store connectivity, etc.) through a stable interface.

check
async def check() -> Any

Run health checks and return a HealthCheckResult-like object.

Returns
TypeDescription
AnyAn object with at least a ``status`` attribute.
register_check
def register_check(
    name: str,
    check: Any
) -> None

Register a named health-check callable.

Parameters
ParameterTypeDescription
`name`strUnique check name.
`check`AnyAn async callable returning a health status.

Protocol for AI metrics collection.

Implementations record counters, histograms, and gauges for LLM/vector operations without coupling to a specific metrics backend (Prometheus, StatsD, etc.).

record_completion
def record_completion(
    provider: str,
    model: str,
    tokens: int,
    cost: float
) -> None

Record a successful LLM completion.

Parameters
ParameterTypeDescription
`provider`strProvider name.
`model`strModel identifier.
`tokens`intTotal tokens consumed.
`cost`floatEstimated dollar cost.
record_error
def record_error(
    provider: str,
    error_type: str
) -> None

Record an LLM or vector store error.

Parameters
ParameterTypeDescription
`provider`strProvider name.
`error_type`strShort error category string.

Protocol for AI distributed tracing.

Implementations wrap individual LLM/vector calls in named spans, allowing distributed trace propagation without coupling to a specific tracing backend (OpenTelemetry, Jaeger, etc.).

trace_llm_call
def trace_llm_call(
    provider: str,
    model: str,
    *,
    streaming: bool = False
) -> Any

Return a context manager that wraps an LLM call in a trace span.

Parameters
ParameterTypeDescription
`provider`strProvider name (e.g. ``"openai"``).
`model`strModel identifier.
`streaming`boolWhether this is a streaming call.
Returns
TypeDescription
AnyA synchronous context manager.

Protocol for AI-specific metrics and tracing.
record_generation
async def record_generation(
    model: str,
    provider: str,
    tokens_prompt: int,
    tokens_completion: int,
    latency_ms: float,
    successful: bool
) -> None

Record a single LLM generation event.

start_trace
async def start_trace(
    name: str,
    metadata: dict[str, Any] | None = None
) -> str

Start a trace block, returning a trace ID.

end_trace
async def end_trace(
    trace_id: str,
    metadata: dict[str, Any] | None = None
) -> None

End a trace block.


Health monitoring for intelligence components.

Performs health checks on:

  • LLM endpoints
  • Vector stores
  • Cache services
  • Embedding services

Example

from lexigram.logging import get_logger
logger = get_logger(__name__)
monitor = AIHealthMonitor()
# Add health checks
monitor.add_llm_check("openai", check_openai_health)
monitor.add_vector_check("pgvector", check_pgvector_health)
# Run all checks
results = await monitor.check_all()
if all(r.is_healthy() for r in results.values()):
logger.info("health_check", status="all_systems_healthy")
__init__
def __init__() -> None

Initialize health monitor.

add_llm_check
def add_llm_check(
    provider: str,
    check_func: Any
) -> None

Add LLM health check.

Parameters
ParameterTypeDescription
`provider`strLLM provider name
`check_func`AnyAsync function that returns HealthCheckResult
add_vector_check
def add_vector_check(
    provider: str,
    check_func: Any
) -> None

Add vector store health check.

Parameters
ParameterTypeDescription
`provider`strVector store provider name
`check_func`AnyAsync function that returns HealthCheckResult
add_cache_check
def add_cache_check(
    service: str,
    check_func: Any
) -> None

Add cache service health check.

Parameters
ParameterTypeDescription
`service`strCache service name
`check_func`AnyAsync function that returns HealthCheckResult
add_embedding_check
def add_embedding_check(
    model: str,
    check_func: Any
) -> None

Add embedding service health check.

Parameters
ParameterTypeDescription
`model`strEmbedding model name
`check_func`AnyAsync function that returns HealthCheckResult
check_llm
async def check_llm(provider: str) -> HealthCheckResult

Check LLM endpoint health.

Parameters
ParameterTypeDescription
`provider`strLLM provider name
Returns
TypeDescription
HealthCheckResultHealth check result
check_vector
async def check_vector(provider: str) -> HealthCheckResult

Check vector store health.

Parameters
ParameterTypeDescription
`provider`strVector store provider name
Returns
TypeDescription
HealthCheckResultHealth check result
check_cache
async def check_cache(service: str) -> HealthCheckResult

Check cache service health.

Parameters
ParameterTypeDescription
`service`strCache service name
Returns
TypeDescription
HealthCheckResultHealth check result
check_all
async def check_all() -> dict[str, HealthCheckResult]

Run all health checks.

Returns
TypeDescription
dict[str, HealthCheckResult]Dictionary mapping component names to health check results
is_ready
async def is_ready() -> bool

Check if all components are ready (healthy or degraded).

Returns
TypeDescription
boolTrue if all components are ready, False otherwise
is_live
async def is_live() -> bool

Check if service is alive (at least one component healthy).

Returns
TypeDescription
boolTrue if at least one component is healthy, False otherwise

Centralized metrics collection for intelligence operations.

Provides counters, gauges, and histograms for tracking:

  • LLM API calls, tokens, costs, and latency
  • Vector store operations and performance
  • Embedding cache hit rates
  • RAG pipeline end-to-end performance

Example

metrics = AIMetrics()
# Track LLM request
metrics.llm_requests_total.increment(
labels={"provider": "openai", "model": "gpt-4", "status": "success"}
)
# Track tokens
metrics.llm_tokens_total.increment(
amount=1500,
labels={"provider": "openai", "model": "gpt-4", "type": "completion"}
)
# Track duration
metrics.llm_duration_seconds.observe(
value=0.523,
labels={"provider": "openai", "model": "gpt-4"}
)
__init__
def __init__(collector: Annotated[MetricsCollectorProtocol, Inject] | None = None) -> None

Initialize intelligence metrics.

Parameters
ParameterTypeDescription
`collector`Annotated[MetricsCollectorProtocol, Inject] | NoneMetrics collector to use (DI-injected).
get_collector
def get_collector() -> MetricsCollectorProtocol

Get the underlying metrics collector.

Returns
TypeDescription
MetricsCollectorProtocolThe MetricsCollectorProtocol instance for advanced usage.

Payload fired after the AI observability subsystem has initialised.

Distributed tracer for intelligence operations.

Provides span management and context propagation for:

  • LLM completions and streaming
  • Vector store operations
  • RAG pipeline execution
  • Embedding generation

Also implements CallbackHandlerProtocol for event-driven tracing.

Example

tracer = AITracer()
async with tracer.trace_llm_call("openai", "gpt-4") as span:
response = await client.complete(messages)
span.set_attribute("tokens.total", response.usage.total_tokens)
span.set_attribute("cost", response.cost)
__init__
def __init__(tracer: Tracer) -> None

Initialize intelligence tracer.

Parameters
ParameterTypeDescription
`tracer`TracerTracer instance to use for tracing.
trace_llm_call
def trace_llm_call(
    provider: str,
    model: str,
    **attributes: Any
) -> ContextManager[Span]

Create a span for LLM API call.

Parameters
ParameterTypeDescription
`provider`strLLM provider name (e.g., "openai", "anthropic")
`model`strModel name (e.g., "gpt-4", "claude-3-opus") **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

tracer = AITracer()
with tracer.trace_llm_call("openai", "gpt-4") as span:
response = await client.complete(messages)
span.set_attribute("tokens.total", response.usage.total_tokens)
trace_operation
def trace_operation(
    name: str,
    **attrs: Any
) -> ContextManager[Span]

Generic operation tracing helper.

This mirrors the Tracer.trace_operation API and is used by worker code that needs a generic operation span (e.g., document parsing/chunking).

trace_vector_operation
def trace_vector_operation(
    operation: str,
    provider: str,
    collection: str | None = None,
    **attributes: Any
) -> ContextManager[Span]

Create a span for vector store operation.

Parameters
ParameterTypeDescription
`operation`strOperation type (e.g., "add", "search", "delete")
`provider`strVector store provider (e.g., "pgvector", "chroma")
`collection`str | NoneOptional collection/table name **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_vector_operation("search", "pgvector", "documents") as span:
results = await store.search(query, limit=10)
span.set_attribute("results.count", len(results))
trace_embedding_operation
def trace_embedding_operation(
    model: str,
    batch_size: int | None = None,
    **attributes: Any
) -> ContextManager[Span]

Create a span for embedding generation.

Parameters
ParameterTypeDescription
`model`strEmbedding model name
`batch_size`int | NoneOptional number of texts being embedded **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_embedding_operation("text-embedding-ada-002", 5) as span:
embeddings = await embedder.embed(texts)
span.set_attribute("embeddings.dimensions", len(embeddings[0]))
trace_rag_stage
def trace_rag_stage(
    stage: str,
    pipeline: str = 'default',
    **attributes: Any
) -> ContextManager[Span]

Create a span for RAG pipeline stage.

Parameters
ParameterTypeDescription
`stage`strStage name (e.g., "retrieval", "ranking", "synthesis")
`pipeline`strPipeline name **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_rag_stage("retrieval", "default") as span:
documents = await retriever.retrieve(query)
span.set_attribute("documents.count", len(documents))
trace_rag_query
def trace_rag_query(
    query: str,
    pipeline: str = 'default',
    **attributes: Any
) -> ContextManager[Span]

Create a span for complete RAG query.

Parameters
ParameterTypeDescription
`query`strQuery text
`pipeline`strPipeline name **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_rag_query("What is Python?") as span:
result = await rag_pipeline.query(query)
span.set_attribute("answer.length", len(result.answer))
get_current_span
def get_current_span() -> Span | None

Get the currently active span.

Returns
TypeDescription
Span | NoneCurrent span or None
on_llm_start
async def on_llm_start(
    messages: list[ChatMessage],
    model: str,
    **kwargs: Any
) -> None

Called when an LLM call starts.

on_llm_new_token
async def on_llm_new_token(
    token: str,
    **kwargs: Any
) -> None

Called for each new token in a streaming LLM response.

on_llm_end
async def on_llm_end(
    response: Completion,
    **kwargs: Any
) -> None

Called when an LLM call completes successfully.

on_llm_error
async def on_llm_error(
    error: Exception,
    **kwargs: Any
) -> None

Called when an LLM call fails.

on_chain_start
async def on_chain_start(
    name: str,
    inputs: dict[str, Any],
    **kwargs: Any
) -> None

Called when a chain/pipeline starts executing.

on_chain_end
async def on_chain_end(
    name: str,
    outputs: dict[str, Any],
    **kwargs: Any
) -> None

Called when a chain/pipeline completes.

on_tool_start
async def on_tool_start(
    tool_name: str,
    arguments: dict[str, Any],
    **kwargs: Any
) -> None

Called when a tool starts executing.

on_tool_end
async def on_tool_end(
    tool_name: str,
    result: Any,
    **kwargs: Any
) -> None

Called when a tool finishes executing.

on_agent_action
async def on_agent_action(
    action: dict[str, Any],
    **kwargs: Any
) -> None

Called when an agent takes an action.

on_agent_finish
async def on_agent_finish(
    response: dict[str, Any],
    **kwargs: Any
) -> None

Called when an agent finishes executing.

on_retriever_start
async def on_retriever_start(
    query: str,
    **kwargs: Any
) -> None

Called when a retriever starts a search.

on_retriever_end
async def on_retriever_end(
    documents: list[Any],
    **kwargs: Any
) -> None

Called when a retriever completes a search.


Payload fired after an AI health check completes.

Attributes: component: Name of the component that was checked (e.g. "llm"). healthy: True if the component reported a healthy state.


Payload fired when a completed LLM call is recorded by the tracer.

Attributes: provider: Provider identifier whose call was traced (e.g. "openai"). model: Model name that was traced (e.g. "gpt-4o").


Configuration for AI observability.

Loaded from the ai_observability: key in application.yaml, with environment variable overrides via LEX_AI_OBSERVABILITY__* prefix.

validate_for_environment
def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Check config is safe for the target environment.


AI Observability module — registers ObservabilityProvider.
configure
def configure(
    cls,
    config: Any | None = None
) -> DynamicModule

Create an ObservabilityModule with explicit configuration.

stub
def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return a no-op ObservabilityModule for testing.

Registers observability infrastructure with noop tracing and metrics. No external telemetry systems are connected.

Returns
TypeDescription
DynamicModuleA DynamicModule with noop observability configuration.

Provider for AI Observability.

Registers AIMetrics, AITracer, and AIHealthMonitor.

During boot(), self-wires observability decorators around any LLMClientProtocol and VectorStoreProtocol that are already registered in the container, so the wrapping is transparent to callers.

__init__
def __init__(config: ObservabilityConfig | None = None) -> None
from_config
def from_config(
    cls,
    config: ObservabilityConfig,
    **context
) -> ObservabilityProvider

Factory method for DI container setup.

register
async def register(container: ContainerRegistrarProtocol) -> None

Register the observability services.

boot
async def boot(container: BootContainerProtocol) -> None

Boot phase — self-wire observability wrappers into the container.

If LLMClientProtocol or VectorStoreProtocol are registered, they are replaced with instrumented wrappers. Both AITracer and AIMetrics must be available; if either is missing the wrapping is skipped gracefully.

shutdown
async def shutdown() -> None

Shutdown phase.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check — always healthy (in-process domain provider).

No external backend to ping.

Parameters
ParameterTypeDescription
`timeout`floatIgnored for in-process providers.
Returns
TypeDescription
HealthCheckResultAlways HEALTHY — no external backend to ping.

Decorator that adds tracing and metrics to any LLMClientProtocol.

Wraps the delegate client so callers interact with the same LLMClientProtocol protocol while every complete() and stream_chat() call is:

Either dependency may be None (e.g. when the monitoring module is not installed), in which case the wrapper transparently delegates to the underlying client.

Example

from lexigram.ai.observability.observable_llm import ObservableLLMClient
client = ObservableLLMClient(raw_client, provider="openai",
model="gpt-4o", tracer=tracer,
metrics=metrics)
response = await client.complete(messages)
__init__
def __init__(
    delegate: LLMClientProtocol,
    *,
    provider: str,
    model: str,
    tracer: AITracer | None = None,
    metrics: AIMetrics | None = None,
    audit_store: AIAuditStoreProtocol | None = None
) -> None
complete
async def complete(
    messages: list[Any],
    **kwargs: Any
) -> Any

Complete with tracing and metrics.

stream_chat
def stream_chat(
    messages: list[Any],
    **kwargs: Any
) -> Any

Stream with tracing — returns AsyncStream directly.

The stream is established lazily when iteration begins. Tracing context is captured synchronously and applied during iteration.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Delegate health check.

close
async def close() -> None

Delegate close.


Proxy that adds tracing and metrics to any VectorStoreProtocol.

Wraps the delegate store so callers interact with the same VectorStoreProtocol protocol while every add(), search(), and delete() call is:

Either dependency may be None (e.g. when the monitoring module is not installed), in which case the wrapper transparently delegates to the underlying store.

Example

store = ObservableVectorStore(raw_store, backend="pgvector",
collection="documents", tracer=tracer,
metrics=metrics)
results = await store.search(query="find similar docs", k=5)
__init__
def __init__(
    delegate: Any,
    *,
    backend: str,
    collection: str | None = None,
    tracer: AITracer | None = None,
    metrics: AIMetrics | None = None
) -> None
add
async def add(documents: list[Any]) -> Result[list[str], VectorError]

Add documents with tracing and metrics.

search
async def search(
    query_vector: list[float] | None = None,
    query: Any = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict[str, Any] | None = None,
    **kwargs: Any
) -> Result[list[SearchResultProtocol], VectorError]

Search with tracing and metrics.

delete
async def delete(ids: list[str]) -> Result[int, VectorError]

Delete documents with tracing and metrics.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Delegate health check transparently.


trace_llm
def trace_llm(
    provider: str,
    model: str,
    tracer: AITracer
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically trace LLM calls.
Parameters
ParameterTypeDescription
`provider`strLLM provider name
`model`strModel name
`tracer`AITracerAITracer instance to use for tracing
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

tracer = AITracer(some_tracer)
@trace_llm(provider="openai", model="gpt-4", tracer=tracer)
async def complete(messages):
response = await client.complete(messages)
return response

trace_rag
def trace_rag(
    stage: str,
    tracer: AITracer,
    pipeline: str = 'default'
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically trace RAG pipeline stages.
Parameters
ParameterTypeDescription
`stage`strStage name (e.g., "retrieval", "ranking", "synthesis")
`tracer`AITracerAITracer instance to use for tracing
`pipeline`strPipeline name
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

tracer = AITracer(some_tracer)
@trace_rag(stage="retrieval", tracer=tracer, pipeline="default")
async def retrieve(query):
documents = await retriever.retrieve(query)
return documents

trace_vector
def trace_vector(
    operation: str,
    provider: str,
    tracer: AITracer,
    collection: str | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically trace vector operations.
Parameters
ParameterTypeDescription
`operation`strOperation type (e.g., "add", "search", "delete")
`provider`strVector store provider
`tracer`AITracerAITracer instance to use for tracing
`collection`str | NoneOptional collection name
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

tracer = AITracer(some_tracer)
@trace_vector(operation="search", provider="pgvector", tracer=tracer, collection="docs")
async def search(query, limit=10):
results = await store.search(query, limit)
return results

track_embedding_operation
def track_embedding_operation(
    model: str,
    metrics: AIMetrics | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically track embedding operation metrics.
Parameters
ParameterTypeDescription
`model`strEmbedding model name (e.g., "text-embedding-ada-002")
`metrics`AIMetrics | NoneAIMetrics instance to use. If None, creates a new one.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@track_embedding_operation(model="text-embedding-ada-002")
async def embed_batch(texts):
embeddings = await embedder.embed(texts)
return embeddings

track_llm_call
def track_llm_call(
    provider: str,
    model: str,
    metrics: AIMetrics | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically track LLM call metrics.
Parameters
ParameterTypeDescription
`provider`strLLM provider name (e.g., "openai", "anthropic")
`model`strModel name (e.g., "gpt-4", "claude-3-opus")
`metrics`AIMetrics | NoneAIMetrics instance to use. If None, creates a new one.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@track_llm_call(provider="openai", model="gpt-4")
async def complete(messages):
response = await client.complete(messages)
return response

track_vector_operation
def track_vector_operation(
    operation: str,
    provider: str,
    metrics: AIMetrics | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator to automatically track vector store operation metrics.
Parameters
ParameterTypeDescription
`operation`strOperation type (e.g., "add", "search", "delete")
`provider`strVector store provider (e.g., "pgvector", "chroma", "qdrant")
`metrics`AIMetrics | NoneAIMetrics instance to use. If None, creates a new one.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@track_vector_operation(operation="search", provider="pgvector")
async def search(query_embedding, limit=10):
results = await store.search(query_embedding, limit)
return results

Raised when a health check infrastructure operation fails.

Raised when a metrics recording or retrieval operation fails.

Base exception for all observability-related errors.

Raised when a tracing operation fails.