Skip to content
GitHubDiscord

API Reference

Protocol for AI health monitoring.

Allows the DI container to resolve health status of AI sub-systems (LLM availability, vector store connectivity, etc.) through a stable interface.

async def check() -> Any

Run health checks and return a HealthCheckResult-like object.

Returns
TypeDescription
AnyAn object with at least a ``status`` attribute.
def register_check(
    name: str,
    check: Any
) -> None

Register a named health-check callable.

Parameters
ParameterTypeDescription
`name`strUnique check name.
`check`AnyAn async callable returning a health status.

Protocol for AI metrics collection.

Implementations record counters, histograms, and gauges for LLM/vector operations without coupling to a specific metrics backend (Prometheus, StatsD, etc.).

def record_completion(
    provider: str,
    model: str,
    tokens: int,
    cost: float
) -> None

Record a successful LLM completion.

Parameters
ParameterTypeDescription
`provider`strProvider name.
`model`strModel identifier.
`tokens`intTotal tokens consumed.
`cost`floatEstimated dollar cost.
def record_error(
    provider: str,
    error_type: str
) -> None

Record an LLM or vector store error.

Parameters
ParameterTypeDescription
`provider`strProvider name.
`error_type`strShort error category string.

Protocol for AI distributed tracing.

Implementations wrap individual LLM/vector calls in named spans, allowing distributed trace propagation without coupling to a specific tracing backend (OpenTelemetry, Jaeger, etc.).

def trace_llm_call(
    provider: str,
    model: str,
    *,
    streaming: bool = False
) -> Any

Return a context manager that wraps an LLM call in a trace span.

Parameters
ParameterTypeDescription
`provider`strProvider name (e.g. ``"openai"``).
`model`strModel identifier.
`streaming`boolWhether this is a streaming call.
Returns
TypeDescription
AnyA synchronous context manager.

Protocol for AI-specific metrics and tracing.
async def record_generation(
    model: str,
    provider: str,
    tokens_prompt: int,
    tokens_completion: int,
    latency_ms: float,
    successful: bool
) -> None

Record a single LLM generation event.

async def start_trace(
    name: str,
    metadata: dict[str, Any] | None = None
) -> str

Start a trace block, returning a trace ID.

async def end_trace(
    trace_id: str,
    metadata: dict[str, Any] | None = None
) -> None

End a trace block.


Health monitoring for intelligence components.

Performs health checks on:

  • LLM endpoints
  • Vector stores
  • Cache services
  • Embedding services

Example

from lexigram.logging import get_logger logger = get_logger(name) monitor = AIHealthMonitor()

monitor.add_llm_check(“openai”, check_openai_health) monitor.add_vector_check(“pgvector”, check_pgvector_health)

results = await monitor.check_all() if all(r.is_healthy() for r in results.values()): … logger.info(“health_check”, status=“all_systems_healthy”)

def __init__() -> None

Initialize health monitor.

def add_llm_check(
    provider: str,
    check_func: Any
) -> None

Add LLM health check.

Parameters
ParameterTypeDescription
`provider`strLLM provider name
`check_func`AnyAsync function that returns HealthCheckResult
def add_vector_check(
    provider: str,
    check_func: Any
) -> None

Add vector store health check.

Parameters
ParameterTypeDescription
`provider`strVector store provider name
`check_func`AnyAsync function that returns HealthCheckResult
def add_cache_check(
    service: str,
    check_func: Any
) -> None

Add cache service health check.

Parameters
ParameterTypeDescription
`service`strCache service name
`check_func`AnyAsync function that returns HealthCheckResult
def add_embedding_check(
    model: str,
    check_func: Any
) -> None

Add embedding service health check.

Parameters
ParameterTypeDescription
`model`strEmbedding model name
`check_func`AnyAsync function that returns HealthCheckResult
async def check_llm(provider: str) -> HealthCheckResult

Check LLM endpoint health.

Parameters
ParameterTypeDescription
`provider`strLLM provider name
Returns
TypeDescription
HealthCheckResultHealth check result
async def check_vector(provider: str) -> HealthCheckResult

Check vector store health.

Parameters
ParameterTypeDescription
`provider`strVector store provider name
Returns
TypeDescription
HealthCheckResultHealth check result
async def check_cache(service: str) -> HealthCheckResult

Check cache service health.

Parameters
ParameterTypeDescription
`service`strCache service name
Returns
TypeDescription
HealthCheckResultHealth check result
async def check_all() -> dict[str, HealthCheckResult]

Run all health checks.

Returns
TypeDescription
dict[str, HealthCheckResult]Dictionary mapping component names to health check results
async def is_ready() -> bool

Check if all components are ready (healthy or degraded).

Returns
TypeDescription
boolTrue if all components are ready, False otherwise
async def is_live() -> bool

Check if service is alive (at least one component healthy).

Returns
TypeDescription
boolTrue if at least one component is healthy, False otherwise

Centralized metrics collection for intelligence operations.

Provides counters, gauges, and histograms for tracking:

  • LLM API calls, tokens, costs, and latency
  • Vector store operations and performance
  • Embedding cache hit rates
  • RAG pipeline end-to-end performance

Example

metrics = AIMetrics()

metrics.llm_requests_total.increment( … labels={“provider”: “openai”, “model”: “gpt-4”, “status”: “success”} … )

metrics.llm_tokens_total.increment( … amount=1500, … labels={“provider”: “openai”, “model”: “gpt-4”, “type”: “completion”} … )

metrics.llm_duration_seconds.observe( … value=0.523, … labels={“provider”: “openai”, “model”: “gpt-4”} … )

def __init__(collector: Annotated[MetricsCollectorProtocol, Inject] | None = None) -> None

Initialize intelligence metrics.

Parameters
ParameterTypeDescription
`collector`Annotated[MetricsCollectorProtocol, Inject] | NoneMetrics collector to use (DI-injected).
def get_collector() -> MetricsCollectorProtocol

Get the underlying metrics collector.

Returns
TypeDescription
MetricsCollectorProtocolThe MetricsCollectorProtocol instance for advanced usage.

Payload fired after the AI observability subsystem has initialised.

Distributed tracer for intelligence operations.

Provides span management and context propagation for:

  • LLM completions and streaming
  • Vector store operations
  • RAG pipeline execution
  • Embedding generation

Example

tracer = AITracer() async with tracer.trace_llm_call(“openai”, “gpt-4”) as span: … response = await client.complete(messages) … span.set_attribute(“tokens.total”, response.usage.total_tokens) … span.set_attribute(“cost”, response.cost)

def __init__(tracer: Tracer) -> None

Initialize intelligence tracer.

Parameters
ParameterTypeDescription
`tracer`TracerTracer instance to use for tracing.
def trace_llm_call(
    provider: str,
    model: str,
    **attributes: Any
) -> ContextManager[Span]

Create a span for LLM API call.

Parameters
ParameterTypeDescription
`provider`strLLM provider name (e.g., "openai", "anthropic")
`model`strModel name (e.g., "gpt-4", "claude-3-opus") **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

tracer = AITracer() with tracer.trace_llm_call(“openai”, “gpt-4”) as span: … response = await client.complete(messages) … span.set_attribute(“tokens.total”, response.usage.total_tokens)

def trace_operation(
    name: str,
    **attrs: Any
) -> ContextManager[Span]

Generic operation tracing helper.

This mirrors the Tracer.trace_operation API and is used by worker code that needs a generic operation span (e.g., document parsing/chunking).

def trace_vector_operation(
    operation: str,
    provider: str,
    collection: str | None = None,
    **attributes: Any
) -> ContextManager[Span]

Create a span for vector store operation.

Parameters
ParameterTypeDescription
`operation`strOperation type (e.g., "add", "search", "delete")
`provider`strVector store provider (e.g., "pgvector", "chroma")
`collection`str | NoneOptional collection/table name **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_vector_operation(“search”, “pgvector”, “documents”) as span: … results = await store.search(query, limit=10) … span.set_attribute(“results.count”, len(results))

def trace_embedding_operation(
    model: str,
    batch_size: int | None = None,
    **attributes: Any
) -> ContextManager[Span]

Create a span for embedding generation.

Parameters
ParameterTypeDescription
`model`strEmbedding model name
`batch_size`int | NoneOptional number of texts being embedded **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_embedding_operation(“text-embedding-ada-002”, 5) as span: … embeddings = await embedder.embed(texts) … span.set_attribute(“embeddings.dimensions”, len(embeddings[0]))

def trace_rag_stage(
    stage: str,
    pipeline: str = 'default',
    **attributes: Any
) -> ContextManager[Span]

Create a span for RAG pipeline stage.

Parameters
ParameterTypeDescription
`stage`strStage name (e.g., "retrieval", "ranking", "synthesis")
`pipeline`strPipeline name **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_rag_stage(“retrieval”, “default”) as span: … documents = await retriever.retrieve(query) … span.set_attribute(“documents.count”, len(documents))

def trace_rag_query(
    query: str,
    pipeline: str = 'default',
    **attributes: Any
) -> ContextManager[Span]

Create a span for complete RAG query.

Parameters
ParameterTypeDescription
`query`strQuery text
`pipeline`strPipeline name **attributes: Additional span attributes
Returns
TypeDescription
ContextManager[Span]Span context manager

Example

with tracer.trace_rag_query(“What is Python?”) as span: … result = await rag_pipeline.query(query) … span.set_attribute(“answer.length”, len(result.answer))

def get_current_span() -> Span | None

Get the currently active span.

Returns
TypeDescription
Span | NoneCurrent span or None

Payload fired after an AI health check completes.

Attributes: component: Name of the component that was checked (e.g. "llm"). healthy: True if the component reported a healthy state.


Payload fired when a completed LLM call is recorded by the tracer.

Attributes: provider: Provider identifier whose call was traced (e.g. "openai"). model: Model name that was traced (e.g. "gpt-4o").


Configuration for AI observability.

Loaded from the ai_observability: key in application.yaml, with environment variable overrides via LEX_AI_OBSERVABILITY__* prefix.

def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Check config is safe for the target environment.


AI Observability module — registers ObservabilityProvider.
def configure(
    cls,
    config: Any | None = None
) -> DynamicModule

Create an ObservabilityModule with explicit configuration.

def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return a no-op ObservabilityModule for testing.

Registers observability infrastructure with noop tracing and metrics. No external telemetry systems are connected.

Returns
TypeDescription
DynamicModuleA DynamicModule with noop observability configuration.

Provider for AI Observability.

Registers AIMetrics, AITracer, and AIHealthMonitor.

During boot(), self-wires observability decorators around any LLMClientProtocol and VectorStoreProtocol that are already registered in the container, so the wrapping is transparent to callers.

def __init__(config: ObservabilityConfig | None = None) -> None
def from_config(
    cls,
    config: ObservabilityConfig,
    **context
) -> ObservabilityProvider

Factory method for DI container setup.

async def register(container: ContainerRegistrarProtocol) -> None

Register the observability services.

async def boot(container: BootContainerProtocol) -> None

Boot phase — self-wire observability wrappers into the container.

If LLMClientProtocol or VectorStoreProtocol are registered, they are replaced with instrumented wrappers. Both AITracer and AIMetrics must be available; if either is missing the wrapping is skipped gracefully.

async def shutdown() -> None

Shutdown phase.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check — always healthy (in-process domain provider).

No external backend to ping.

Parameters
ParameterTypeDescription
`timeout`floatIgnored for in-process providers.
Returns
TypeDescription
HealthCheckResultAlways HEALTHY — no external backend to ping.

Decorator that adds tracing and metrics to any LLMClientProtocol.

Wraps the delegate client so callers interact with the same LLMClientProtocol protocol while every complete() and stream_chat() call is:

Either dependency may be None (e.g. when the monitoring module is not installed), in which case the wrapper transparently delegates to the underlying client.

Example

from lexigram.ai.observability.observable_llm import ObservableLLMClient client = ObservableLLMClient(raw_client, provider=“openai”, … model=“gpt-4o”, tracer=tracer, … metrics=metrics) response = await client.complete(messages)

def __init__(
    delegate: LLMClientProtocol,
    *,
    provider: str,
    model: str,
    tracer: AITracer | None = None,
    metrics: AIMetrics | None = None,
    audit_store: AIAuditStoreProtocol | None = None
) -> None
async def complete(
    messages: list[Any],
    **kwargs: Any
) -> Any

Complete with tracing and metrics.

def stream_chat(
    messages: list[Any],
    **kwargs: Any
) -> Any

Stream with tracing — returns AsyncStream directly.

The stream is established lazily when iteration begins. Tracing context is captured synchronously and applied during iteration.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Delegate health check.

async def close() -> None

Delegate close.


Proxy that adds tracing and metrics to any VectorStoreProtocol.

Wraps the delegate store so callers interact with the same VectorStoreProtocol protocol while every add(), search(), and delete() call is:

Either dependency may be None (e.g. when the monitoring module is not installed), in which case the wrapper transparently delegates to the underlying store.

Example

store = ObservableVectorStore(raw_store, backend=“pgvector”, … collection=“documents”, tracer=tracer, … metrics=metrics) results = await store.search(query=“find similar docs”, k=5)

def __init__(
    delegate: Any,
    *,
    backend: str,
    collection: str | None = None,
    tracer: AITracer | None = None,
    metrics: AIMetrics | None = None
) -> None
async def add(documents: list[Any]) -> Result[list[str], VectorError]

Add documents with tracing and metrics.

async def search(
    query_vector: list[float] | None = None,
    query: Any = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict[str, Any] | None = None,
    **kwargs: Any
) -> Result[list[SearchResultProtocol], VectorError]

Search with tracing and metrics.

async def delete(ids: list[str]) -> Result[int, VectorError]

Delete documents with tracing and metrics.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Delegate health check transparently.


def trace_llm(
    provider: str,
    model: str,
    tracer: AITracer
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to automatically trace LLM calls.

Parameters
ParameterTypeDescription
`provider`strLLM provider name
`model`strModel name
`tracer`AITracerAITracer instance to use for tracing
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

tracer = AITracer(some_tracer) @trace_llm(provider=“openai”, model=“gpt-4”, tracer=tracer) … async def complete(messages): … response = await client.complete(messages) … return response


def trace_rag(
    stage: str,
    tracer: AITracer,
    pipeline: str = 'default'
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to automatically trace RAG pipeline stages.

Parameters
ParameterTypeDescription
`stage`strStage name (e.g., "retrieval", "ranking", "synthesis")
`tracer`AITracerAITracer instance to use for tracing
`pipeline`strPipeline name
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

tracer = AITracer(some_tracer) @trace_rag(stage=“retrieval”, tracer=tracer, pipeline=“default”) … async def retrieve(query): … documents = await retriever.retrieve(query) … return documents


def trace_vector(
    operation: str,
    provider: str,
    tracer: AITracer,
    collection: str | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to automatically trace vector operations.

Parameters
ParameterTypeDescription
`operation`strOperation type (e.g., "add", "search", "delete")
`provider`strVector store provider
`tracer`AITracerAITracer instance to use for tracing
`collection`str | NoneOptional collection name
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

tracer = AITracer(some_tracer) @trace_vector(operation=“search”, provider=“pgvector”, tracer=tracer, collection=“docs”) … async def search(query, limit=10): … results = await store.search(query, limit) … return results


def track_embedding_operation(
    model: str,
    metrics: AIMetrics | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to automatically track embedding operation metrics.

Parameters
ParameterTypeDescription
`model`strEmbedding model name (e.g., "text-embedding-ada-002")
`metrics`AIMetrics | NoneAIMetrics instance to use. If None, creates a new one.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@track_embedding_operation(model=“text-embedding-ada-002”) … async def embed_batch(texts): … embeddings = await embedder.embed(texts) … return embeddings


def track_llm_call(
    provider: str,
    model: str,
    metrics: AIMetrics | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to automatically track LLM call metrics.

Parameters
ParameterTypeDescription
`provider`strLLM provider name (e.g., "openai", "anthropic")
`model`strModel name (e.g., "gpt-4", "claude-3-opus")
`metrics`AIMetrics | NoneAIMetrics instance to use. If None, creates a new one.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@track_llm_call(provider=“openai”, model=“gpt-4”) … async def complete(messages): … response = await client.complete(messages) … return response


def track_vector_operation(
    operation: str,
    provider: str,
    metrics: AIMetrics | None = None
) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator to automatically track vector store operation metrics.

Parameters
ParameterTypeDescription
`operation`strOperation type (e.g., "add", "search", "delete")
`provider`strVector store provider (e.g., "pgvector", "chroma", "qdrant")
`metrics`AIMetrics | NoneAIMetrics instance to use. If None, creates a new one.
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]Decorator function

Example

@track_vector_operation(operation=“search”, provider=“pgvector”) … async def search(query_embedding, limit=10): … results = await store.search(query_embedding, limit) … return results


Raised when a health check infrastructure operation fails.

Raised when a metrics recording or retrieval operation fails.

Base exception for all observability-related errors.

Raised when a tracing operation fails.