API Reference
Protocols
Section titled “Protocols”AlertDispatcherProtocol
Section titled “AlertDispatcherProtocol”Protocol for dispatching operational alerts.
Implementations route alerts to notification channels (logging,
PagerDuty, Slack, etc.). lexigram-monitor ships a built-in
LoggingAlertDispatcher that writes
alerts to the structured logger.
Example
class SlackAlertDispatcher: async def send_alert( self, title: str, message: str, severity: str, context: dict[str, Any] | None = None, ) -> None: await self._slack.post( channel="#ops", text=f"[{severity}] {title}: {message}", )async def send_alert( title: str, message: str, severity: str, context: dict[str, Any] | None = None ) -> None
Dispatch a free-form operational alert.
| Parameter | Type | Description |
|---|---|---|
| `title` | str | Short, human-readable alert title. |
| `message` | str | Detailed alert message. |
| `severity` | str | Severity level string, e.g. ``"low"``, ``"high"``, ``"critical"``. |
| `context` | dict[str, Any] | None | Optional free-form mapping of additional metadata. |
async def send_metric_alert( metric_name: str, current_value: float, threshold: float, context: dict[str, Any] | None = None ) -> None
Dispatch an alert triggered by a metric threshold breach.
| Parameter | Type | Description |
|---|---|---|
| `metric_name` | str | Name of the metric that breached its threshold. |
| `current_value` | float | Observed metric value at the time of the alert. |
| `threshold` | float | The configured threshold that was exceeded. |
| `context` | dict[str, Any] | None | Optional free-form mapping of additional metadata. |
HealthCheckerProtocol
Section titled “HealthCheckerProtocol”Structural contract for a single health-check component.
Any object implementing check() satisfies this protocol, regardless
of inheritance from any concrete class.
MetricProtocol
Section titled “MetricProtocol”Protocol for metric implementations.
Metrics track numeric measurements over time.
MetricProtocol name.
MetricProtocol description.
Record a metric value.
| Parameter | Type | Description |
|---|---|---|
| `value` | float | Numeric value to record. |
| `labels` | dict[str, str] | None | Optional labels/tags. |
MetricsBackendProtocol
Section titled “MetricsBackendProtocol”Protocol for metrics backend implementations (metrics only).
Backends export metrics to external systems.
Initialize the metrics backend.
Shutdown the metrics backend.
def record_metric( name: str, value: Any, metric_type: str, labels: dict[str, str] | None = None ) -> None
Record a metric value.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | MetricProtocol name. |
| `value` | Any | MetricProtocol value. |
| `metric_type` | str | Type of metric (counter, gauge, histogram). |
| `labels` | dict[str, str] | None | Optional labels. |
SpanExporter
Section titled “SpanExporter”
def export(spans: list[Span]) -> None
Classes
Section titled “Classes”AlertFiredHook
Section titled “AlertFiredHook”Payload fired when a monitoring alert is triggered.
Attributes:
alert_name: Name of the alert rule that fired.
severity: Severity level (e.g. "warning", "critical").
BackendType
Section titled “BackendType”Supported monitoring backend types.
BufferedMetricEntry
Section titled “BufferedMetricEntry”A single buffered metric recording.
Attributes: name: MetricProtocol name. value: Numeric value to record. labels: Optional label key-value pairs. timestamp: Monotonic timestamp when the entry was created.
BufferedMetricRecorder
Section titled “BufferedMetricRecorder”Buffers metric recordings and flushes them in configurable batches.
Reduces metric backend overhead for high-throughput code paths by accumulating entries and flushing at intervals or when the buffer is full.
| Parameter | Type | Description |
|---|---|---|
| `backend` | MetricProtocol backend instance with metric attributes exposing ``record()``. | |
| `flush_interval` | Seconds between automatic periodic flushes. Default: 5.0. | |
| `max_buffer_size` | Maximum entries before a forced flush. Default: 1000. |
Example
recorder = BufferedMetricRecorder(backend=monitor_backend, flush_interval=2.0)await recorder.start()
recorder.record("request_count", 1.0, {"method": "GET"})# ... many more records ...
await recorder.stop() # flushes remaining entriesBuffer a metric entry for deferred recording.
If the buffer reaches max_buffer_size, schedules an immediate flush.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | MetricProtocol name (must match an attribute on the backend). |
| `value` | float | Numeric value to record. |
| `labels` | dict[str, str] | None | Optional label key-value pairs. |
Flush all buffered entries to the backend.
| Type | Description |
|---|---|
| int | Number of entries successfully flushed. |
Start the periodic background flush task.
Stop the background flush task and flush remaining entries.
ConsoleMetricsExporterHandler
Section titled “ConsoleMetricsExporterHandler”Handler for console metrics exporter.
Creates a ConsoleMetricExporter for outputting metrics to the console.
Check if this handler can handle the exporter type.
| Parameter | Type | Description |
|---|---|---|
| `exporter_type` | str | The type of exporter to check. |
| Type | Description |
|---|---|
| bool | True if exporter_type is "console", False otherwise. |
Create a console metrics exporter instance.
| Parameter | Type | Description |
|---|---|---|
| `exp_config` | Any | Configuration for the exporter (unused for console). |
| Type | Description |
|---|---|
| Any | A ConsoleMetricExporter instance. |
ConsoleSpanExporter
Section titled “ConsoleSpanExporter”
def export(spans: list[Span]) -> None
ConsoleTracingExporterHandler
Section titled “ConsoleTracingExporterHandler”Handler for console tracing exporter.
Creates a ConsoleSpanExporter for outputting traces to the console.
Check if this handler can handle the exporter type.
| Parameter | Type | Description |
|---|---|---|
| `exporter_type` | str | The type of exporter to check. |
| Type | Description |
|---|---|
| bool | True if exporter_type is "console", False otherwise. |
Create a console tracing exporter instance.
| Parameter | Type | Description |
|---|---|---|
| `exp_config` | Any | Configuration for the exporter (unused for console). |
| Type | Description |
|---|---|
| Any | A ConsoleSpanExporter instance. |
Counter
Section titled “Counter”Monotonically increasing counter.
Alias for record — delegates to the canonical implementation.
Reset the counter to zero and clear the observation history.
FunctionHealthCheck
Section titled “FunctionHealthCheck”Health check implemented as a function.
FunctionProfileResult
Section titled “FunctionProfileResult”Result of profiling a function.
Gauge that can go up and down.
Alias for record — delegates to the canonical implementation.
Alias for record — increments gauge by amount via canonical implementation.
Alias for record — decrements gauge by amount via canonical implementation.
Reset the gauge to zero and clear the observation history.
HealthCheck
Section titled “HealthCheck”Base class for health checks.
HealthCheckCategory
Section titled “HealthCheckCategory”Categorises health checks by their role in the Kubernetes health model.
Attributes:
LIVENESS: Checks that the application is alive (not deadlocked).
Used by /health/live endpoints. Failure triggers a restart.
READINESS: Checks that the application is ready to accept traffic.
Used by /health/ready endpoints. Failure removes the instance
from the load-balancer rotation.
STARTUP: Checks that the application has finished its startup sequence.
Used by /health/startup endpoints. Succeeded once; after that
the probe switches to liveness/readiness.
HealthCheckConfig
Section titled “HealthCheckConfig”Configuration for health checks.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
Attributes: enabled: Whether health checks are enabled. path: HTTP path for health endpoint. include_details: Include detailed component health in response. timeout: Timeout for health check operations (seconds). checks: List of health check names to include.
HealthCheckProvider
Section titled “HealthCheckProvider”ASGI middleware that exposes a ``GET /health`` liveness/readiness endpoint.
Intercepts requests to path and returns a JSON health-status payload.
All other requests are passed through with a 404 response (or forwarded
to the next ASGI app when composed in a middleware stack).
The HTTP status code reflects the overall application health:
200— all checks healthy207— at least one check degraded503— at least one check unhealthy
To add custom checks, subclass HealthCheckProvider and override check_health.
| Parameter | Type | Description |
|---|---|---|
| `path` | URL path for the health endpoint. Defaults to ``"/health"``. |
Example
from lexigram.monitor.middleware import HealthCheckProvider
app = HealthCheckProvider(path="/ready")# Mount *app* in your ASGI server / router.Return the current application health as a serialisable mapping.
Override this method in a subclass to perform real dependency checks (database ping, cache ping, etc.).
| Type | Description |
|---|---|
| dict | A ``dict`` with at minimum a ``"status"`` key (one of ``"healthy"``, ``"degraded"``, or ``"unhealthy"``) and a ``"timestamp"`` float. Additional ``"checks"`` entries can be added by subclasses. |
HealthCheckRegistry
Section titled “HealthCheckRegistry”Registry for managing health checks.
Extends Registry for unified introspection. Sidecar lists track which checks participate in liveness vs readiness probes.
def add( name: str, check: Callable[[], Any], *, timeout: float | None = None, critical: bool = True, category: Any = None ) -> None
Add a health check to satisfy HealthCheckRegistryProtocol.
Satisfy HealthCheckProtocol.
HealthCheckResult
Section titled “HealthCheckResult”Result of a health check.
Convert to dictionary format.
Check if status is healthy.
Check if status is degraded.
HealthCheckRunHook
Section titled “HealthCheckRunHook”Payload fired after a health check probe completes.
Attributes:
check_name: Name of the health check that ran.
healthy: True if the check passed.
HealthChecker
Section titled “HealthChecker”Aggregates and runs named health checks with optional per-check timeouts.
Checks can be tagged with a HealthCheckCategory
(LIVENESS, READINESS, or STARTUP) and run selectively via
run_liveness, run_readiness, or run_startup.
Methods that omit a category default to READINESS.
def add( name: str, check: HealthCheckProtocol | Callable[[], Any], *, timeout: float | None = None, category: HealthCheckCategory = HealthCheckCategory.READINESS ) -> None
Add a named health check with an optional timeout and category.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Unique identifier for this check. |
| `check` | HealthCheckProtocol | Callable[[], Any] | A HealthCheckProtocol instance or a callable returning a health indicator (bool, dict, or HealthCheckResult). |
| `timeout` | float | None | Per-check timeout in seconds. If the check exceeds this duration it is reported as UNHEALTHY. |
| `category` | HealthCheckCategory | Whether this check is a liveness, readiness, or startup check. Defaults to ``READINESS``. |
Remove a named health check.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | The check name to remove. |
| Exception | Description |
|---|---|
| KeyError | If no check with this name is registered. |
Check whether a health check with the given name is registered.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | The check name. |
| Type | Description |
|---|---|
| bool | True if the check exists. |
Sorted list of all registered health check names.
| Type | Description |
|---|---|
| list[str] | A sorted list of check name strings. |
Register a callable decorated with health_checker.
The check name is read from the _health_check_name attribute
stamped by the health_checker decorator.
| Parameter | Type | Description |
|---|---|---|
| `check` | Callable[[], Any] | A decorated health check callable. |
| Exception | Description |
|---|---|
| ValueError | If *check* was not decorated with health_checker. |
Run all health checks and return unified results.
Per-check timeouts are enforced when configured. Timed-out checks are reported as UNHEALTHY.
| Type | Description |
|---|---|
| dict[str, HealthCheckResult] | A dict mapping check names to their ``HealthCheckResult``. |
Determine the overall health status from individual check results.
Returns HEALTHY only if every check is HEALTHY. Returns DEGRADED if any check is DEGRADED but none are UNHEALTHY. Returns UNHEALTHY otherwise.
| Parameter | Type | Description |
|---|---|---|
| `results` | dict[str, HealthCheckResult] | The dict returned by run_all. |
| Type | Description |
|---|---|
| HealthStatus | The aggregate ``HealthStatus``. |
Run all health checks and return the aggregate status together with details.
Convenience wrapper that calls run_all and aggregate_status in one step.
| Type | Description |
|---|---|
| tuple[HealthStatus, dict[str, HealthCheckResult]] | A tuple of ``(aggregate_status, per_check_results)``. |
Run only LIVENESS checks and return aggregate status and details.
| Type | Description |
|---|---|
| tuple[HealthStatus, dict[str, HealthCheckResult]] | A tuple of ``(aggregate_status, per_check_results)``. |
Run only READINESS checks and return aggregate status and details.
| Type | Description |
|---|---|
| tuple[HealthStatus, dict[str, HealthCheckResult]] | A tuple of ``(aggregate_status, per_check_results)``. |
Run only STARTUP checks and return aggregate status and details.
| Type | Description |
|---|---|
| tuple[HealthStatus, dict[str, HealthCheckResult]] | A tuple of ``(aggregate_status, per_check_results)``. |
HealthCheckerRegistry
Section titled “HealthCheckerRegistry”Singleton-style registry for named CachedHealthChecker instances.
Manages the lifecycle of zero or more named checkers. The registry is registered as a container singleton by MonitorProvider; application code should resolve it via constructor injection rather than instantiating it directly.
Lifecycle
# At application startup — wired by MonitorProvider automatically.registry = HealthCheckerRegistry()checker = registry.get_or_create("api", db_provider=db)
# At application shutdown — MonitorProvider calls this.await registry.cleanup()def get_or_create( key: str = 'default', db_provider: Any = None, cache_ttl: float = 5.0, check_timeout: float = 2.0 ) -> CachedHealthChecker
Return the named checker, creating it if it does not yet exist.
Subsequent calls with the same key always return the same instance regardless of db_provider, cache_ttl, or check_timeout — those arguments are ignored after the first call for a given key.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Logical name for this checker. Use distinct keys to manage independent groups of health checks (e.g. ``"api"`` vs ``"worker"``). |
| `db_provider` | Any | Optional database provider injected into the checker for database connectivity verification. |
| `cache_ttl` | float | Seconds to cache the last health-check result before re-running the underlying checks. Defaults to ``5.0``. |
| `check_timeout` | float | Per-check execution timeout in seconds. Defaults to ``2.0``. |
| Type | Description |
|---|---|
| CachedHealthChecker | The CachedHealthChecker registered under *key*. |
Return the checker registered under key, or None if absent.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | The logical name used when the checker was created. |
| Type | Description |
|---|---|
| CachedHealthChecker | None | The CachedHealthChecker or ``None``. |
Return a snapshot of all registered checkers keyed by name.
| Type | Description |
|---|---|
| dict[str, CachedHealthChecker] | A shallow copy of the internal registry mapping. |
Stop background refresh tasks for every checker and clear the registry.
Called automatically by MonitorProvider during application shutdown. It is safe to call multiple times.
HealthStatus
Section titled “HealthStatus”Unified health status.
Histogram
Section titled “Histogram”Histogram for measuring distributions.
def __init__( name: str, description: str = '', labels: dict[str, str] | None = None, buckets: list[float] | None = None )
Alias for record — delegates to the canonical implementation.
Clear all observations and values for metric rotation.
InMemoryTraceProvider
Section titled “InMemoryTraceProvider”In-memory trace provider using Tracer.
def __init__( service_name: str = 'lexigram-service', max_spans: int = DEFAULT_MAX_SPANS, exporter: SpanExporter | None = None ) -> None
def get_current_span() -> Span | None
def set_current_span(span: Span | None) -> None
def get_all_spans() -> list[Span]
LoggingAlertDispatcher
Section titled “LoggingAlertDispatcher”Dispatches alerts to the structured logger.
Implements AlertDispatcherProtocol.
Low and medium-severity alerts are logged at WARNING level;
high and critical alerts are logged at ERROR level.
This implementation is always available with no external dependencies and is registered by MonitorProvider as the default AlertDispatcherProtocol binding when no other dispatcher is configured.
async def send_alert( title: str, message: str, severity: str, context: dict[str, Any] | None = None ) -> None
Dispatch a free-form operational alert to the structured logger.
| Parameter | Type | Description |
|---|---|---|
| `title` | str | Short, human-readable alert title. |
| `message` | str | Detailed alert message. |
| `severity` | str | Severity level string, e.g. ``"low"``, ``"high"``, ``"critical"``. |
| `context` | dict[str, Any] | None | Optional free-form mapping of additional metadata included as structured key-value pairs on the log record. |
async def send_metric_alert( metric_name: str, current_value: float, threshold: float, context: dict[str, Any] | None = None ) -> None
Dispatch a metric-threshold alert to the structured logger.
The alert is always logged at WARNING level unless the breach
exceeds the threshold by more than 100 %, in which case ERROR
is used.
| Parameter | Type | Description |
|---|---|---|
| `metric_name` | str | Name of the metric that breached its threshold. |
| `current_value` | float | Observed metric value at the time of the alert. |
| `threshold` | float | The configured threshold that was exceeded. |
| `context` | dict[str, Any] | None | Optional free-form mapping of additional metadata. |
LoggingConfig
Section titled “LoggingConfig”Configuration for structured logging.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
Attributes: enabled: Whether structured logging is enabled. level: Default log level. format: Log format (json, text). include_trace_context: Include trace context in logs. redact_fields: Fields to redact from logs.
MetricProxy
Section titled “MetricProxy”Proxy for a specific metric that provides a stateful API.
Bridges the gap between the MetricsCollectorProtocol (which is name-based)
and consumers who expect a persistent metric object.
MetricProtocol name.
Increment the counter.
Record a histogram value.
Current counter value (tracked locally).
MetricRecordedHook
Section titled “MetricRecordedHook”Payload fired when a metric data point is recorded.
Attributes: metric_name: Name of the metric being recorded. value: Numeric value of the data point.
MetricValue
Section titled “MetricValue”Represents a metric measurement.
MetricsCollectorProtocol
Section titled “MetricsCollectorProtocol”Central metrics collection and reporting.
def register_metric(metric: MetricProtocol) -> None
Register an existing metric instrument.
def create_counter( name: str, description: str = '', labels: dict[str, str] | None = None ) -> Counter
def create_gauge( name: str, description: str = '', labels: dict[str, str] | None = None ) -> Gauge
def create_summary( name: str, description: str = '', labels: dict[str, str] | None = None ) -> Summary
def create_histogram( name: str, description: str = '', labels: dict[str, str] | None = None, buckets: list[float] | None = None ) -> Histogram
def get_metric(name: str) -> MetricProtocol | None
def get_all_metrics() -> dict[str, MetricProtocol]
Increment a counter metric.
Set a gauge metric.
Record a histogram value.
MetricsConfig
Section titled “MetricsConfig”Configuration for metrics collection.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
Attributes: enabled: Whether metrics collection is enabled. prefix: Prefix for all metric names. default_labels: Default labels to add to all metrics. histogram_buckets: Default bucket boundaries for histograms. collection_interval: Interval for periodic metrics collection (seconds).
Create a prefixed metric name.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | The metric name. |
| Type | Description |
|---|---|
| str | Prefixed metric name. |
MetricsExporterRegistry
Section titled “MetricsExporterRegistry”Central registry for metrics exporters.
Manages a collection of handlers for creating metrics exporters. Handlers are checked in order until one is found that can handle the exporter type.
Example
registry = MetricsExporterRegistry() registry.register(CustomMetricsExporterHandler()) exporter = registry.create_exporter(config)
Initialize the registry without any handlers.
def with_defaults(cls) -> MetricsExporterRegistry
Create a registry pre-populated with the built-in default handlers.
| Type | Description |
|---|---|
| MetricsExporterRegistry | A new registry instance with all default metrics exporter handlers registered. |
Register a new metrics exporter handler.
| Parameter | Type | Description |
|---|---|---|
| `handler` | MetricsExporterHandler | The handler to register. Added at the beginning of the handler list. |
Create an exporter for the given config.
| Parameter | Type | Description |
|---|---|---|
| `exp_config` | Any | Configuration object with a 'type' attribute. |
| Type | Description |
|---|---|
| Any | An exporter instance, or None if no handler can handle the type. |
MonitorConfig
Section titled “MonitorConfig”Hierarchical root configuration for Lexigram Monitor.
Attributes: enabled: Whether monitoring is enabled name: Configuration name (default: “monitor”) backend_type: Monitoring backend type metrics: Metrics configuration tracing: Tracing configuration health: Health check configuration logging: Logging configuration opentelemetry: OpenTelemetry configuration prometheus: Prometheus configuration environment: Environment name debug: Debug mode
Validate monitoring configuration for the given environment.
| Parameter | Type | Description |
|---|---|---|
| `env` | Environment | None | Target environment; resolved from ``LEX_ENV`` when ``None``. |
| Type | Description |
|---|---|
| list[ConfigIssue] | List of ConfigIssue instances. |
def get_backend_config() -> OpenTelemetryConfig | PrometheusConfig | None
Get the configuration for the selected backend.
| Type | Description |
|---|---|
| OpenTelemetryConfig | PrometheusConfig | None | Backend-specific configuration or None for memory backend. |
Construct an optional metrics exporter appropriate for the backend.
| Type | Description |
|---|---|
| Any | None | MetricsExporter instance or None if no exporter is available. |
MonitorModule
Section titled “MonitorModule”Metrics collection, distributed tracing, health checks, and profiling.
Call configure to configure the monitoring subsystem.
Usage (no-op / development)
from lexigram.monitor.backends.noop import NoopMetricsBackend
@module( imports=[MonitorModule.configure(backend=NoopMetricsBackend())])class AppModule(Module): passUsage (Prometheus)
from lexigram.monitor.backends.exporters.prometheus import PrometheusMetricsExporterfrom lexigram.monitor.backends.prometheus import PrometheusBackend
@module( imports=[MonitorModule.configure(backend=PrometheusBackend())])class AppModule(Module): passdef configure( cls, backend: Any = None, config: Any | None = None ) -> DynamicModule
Create a MonitorModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `backend` | Any | ``MetricsBackendProtocol`` implementation. Defaults to ``NoOpMetricsBackend`` when omitted. |
| `config` | Any | None | ``MonitorConfig`` for advanced tracing and profiling settings. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def stub(cls) -> DynamicModule
Return a no-op MonitorModule for unit testing.
Registers a NoOpMetricsBackend that discards all metrics. No external telemetry systems are connected.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule with noop metrics and tracing. |
MonitorProvider
Section titled “MonitorProvider”Monitoring and observability provider for Lexigram Framework
def from_config( cls, config: MonitorConfig, **context: Any ) -> MonitorProvider
Create a MonitorProvider from config.
Delegates to the create_provider_from_config factory.
async def register(container: ContainerRegistrarProtocol) -> None
Register monitoring services with the container.
Binds all monitoring singletons into the DI container. DB-backed
exporter wiring is deferred to boot() where the full container
graph is available.
Start the monitoring provider and wire the observability facade.
After all providers are registered the container graph is complete, so this is the correct place to resolve optional cross-provider dependencies such as the database exporter.
Shutdown the monitoring provider
Check monitoring provider health
Record an HTTP request
Record connection count change
Create a counter metric
Create a gauge metric
def create_histogram( name: str, description: str = '', labels: dict[str, str] | None = None, buckets: list[float] | None = None )
Create a histogram metric
NoOpHealthCheckRegistry
Section titled “NoOpHealthCheckRegistry”No-op registry for categorised health checks.
def add( name: str, check: Callable[[], Any], *, timeout: float | None = None, critical: bool = True, category: Any = None ) -> None
Ignore health check registration.
Return an empty aggregate result.
Return an empty liveness result.
Return an empty readiness result.
Return an empty startup result.
Report the no-op registry as healthy.
NoOpMetricsCollector
Section titled “NoOpMetricsCollector”Metrics collector that drops all observations.
Return a placeholder metric name.
Return a placeholder metric description.
def register_metric(metric: MetricProtocol) -> None
Ignore pre-created metrics.
Ignore counter increments.
Ignore gauge values.
Ignore histogram samples.
def create_counter( name: str, description: str = '', labels: dict[str, str] | None = None ) -> NoOpMetricsCollector
Return the collector itself.
def create_gauge( name: str, description: str = '', labels: dict[str, str] | None = None ) -> NoOpMetricsCollector
Return the collector itself.
def create_histogram( name: str, description: str = '', labels: dict[str, str] | None = None, buckets: list[float] | None = None ) -> NoOpMetricsCollector
Return the collector itself.
Ignore recorded metric values.
NoOpSpan
Section titled “NoOpSpan”No-op span that silently drops all tracing operations.
Store span attributes for introspection in tests.
Ignore span events.
Ignore recorded exceptions.
Ignore span status updates.
Mark the span as ended without side effects.
NoOpTracer
Section titled “NoOpTracer”No-op tracer that returns NoOpSpan instances.
def start_span( name: str, attributes: dict[str, Any] | None = None, context: Any | None = None ) -> NoOpSpan
Start a no-op span.
Return no active span.
Leave the carrier unchanged.
Return no extracted context.
OTLPMetricsExporterHandler
Section titled “OTLPMetricsExporterHandler”Handler for OTLP metrics exporter.
Creates an OTLPMetricExporter for sending metrics to an OTLP-compatible backend.
Check if this handler can handle the exporter type.
| Parameter | Type | Description |
|---|---|---|
| `exporter_type` | str | The type of exporter to check. |
| Type | Description |
|---|---|
| bool | True if exporter_type is "otlp", False otherwise. |
Create an OTLP metrics exporter instance.
| Parameter | Type | Description |
|---|---|---|
| `exp_config` | Any | Configuration with endpoint and headers. |
| Type | Description |
|---|---|
| Any | An OTLPMetricExporter instance. |
OTLPTracingExporterHandler
Section titled “OTLPTracingExporterHandler”Handler for OTLP tracing exporter.
Creates an OTLPSpanExporter for sending traces to an OTLP-compatible backend.
Check if this handler can handle the exporter type.
| Parameter | Type | Description |
|---|---|---|
| `exporter_type` | str | The type of exporter to check. |
| Type | Description |
|---|---|
| bool | True if exporter_type is "otlp", False otherwise. |
Create an OTLP tracing exporter instance.
| Parameter | Type | Description |
|---|---|---|
| `exp_config` | Any | Configuration with endpoint and headers. |
| Type | Description |
|---|---|
| Any | An OTLPSpanExporter instance. |
OTelMiddleware
Section titled “OTelMiddleware”Middleware for OpenTelemetry tracing and metrics in Lexigram-Web.
Provides automatic tracing and metrics collection for HTTP requests. Gracefully degrades when OpenTelemetry is not available or has import issues.
Features:
- Automatic span creation for each HTTP request
- Extracts parent trace context from incoming headers
- Records request count and duration metrics
- Captures HTTP status codes and error conditions
Example
from lexigram.monitor import OTelMiddleware
app = OTelMiddleware(my_asgi_app)
Initialize the OTel middleware.
| Parameter | Type | Description |
|---|---|---|
| `app` | ASGIApp | The ASGI application to wrap. |
ObservabilityProvider
Section titled “ObservabilityProvider”Provides observability: metrics, tracing, and health checks.
Registers lightweight no-op stubs for every observability contract so
that application code can always resolve MetricsCollectorProtocol,
TracerProtocol, and HealthCheckRegistryProtocol via DI — even
when lexigram-monitor is not installed.
When lexigram-monitor is installed, its MonitorProvider runs
at a higher priority and overrides these registrations with the full
implementations. This provider must not import from lexigram.monitor
directly; doing so would violate the core ↔ extension hierarchy.
async def register(container: ContainerRegistrarProtocol) -> None
Register no-op observability stubs into the container.
lexigram-monitor’s own provider will override these at its
higher priority if the package is present.
async def boot(container: ContainerResolverProtocol) -> None
No boot-time work required for the observability module.
No resources to release for the observability module.
ObservabilityService
Section titled “ObservabilityService”Unified service for observability (traces, metrics, health).
Automatically detects if the lexigram-monitor extension is
available and delegates to it. Otherwise, uses NoOp implementations
that still allow application code to instrument without errors.
| Parameter | Type | Description |
|---|---|---|
| `tracer` | Optional tracer backend. If ``None``, uses NoOpTracer. | |
| `meter` | Optional meter backend. If ``None``, uses NoOpMetricsCollector. |
def __init__( tracer: TracerProtocol | None = None, meter: MetricsCollectorProtocol | None = None ) -> None
def register_metric(metric: MetricProtocol) -> None
Register an existing metric instrument.
| Parameter | Type | Description |
|---|---|---|
| `metric` | MetricProtocol | Pre-defined metric instance. |
Create a tracing span.
If a real tracer is configured, delegates to its start_span
method. Otherwise yields a NoOpSpan.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Span name. **attributes: Initial span attributes. |
| Type | Description |
|---|---|
| Iterator[SpanProtocol] | A span object with ``set_attribute()`` and ``add_event()`` methods. |
def counter(name: str) -> MetricProxy
Get or create a counter metric.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Counter name. |
| Type | Description |
|---|---|
| MetricProxy | A counter with ``increment()`` method. |
def histogram(name: str) -> MetricProxy
Get or create a histogram metric.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Histogram name. |
| Type | Description |
|---|---|
| MetricProxy | A histogram with ``record()`` method. |
Context manager that records execution duration as a histogram.
| Parameter | Type | Description |
|---|---|---|
| `metric_name` | str | The histogram metric name. |
| Type | Description |
|---|---|
| Iterator[None] | None — timing is recorded on exit. |
OpenTelemetryBackend
Section titled “OpenTelemetryBackend”OpenTelemetry monitoring backend.
def __init__( service_name: str = 'lexigram-app', endpoint: str | None = None, config: Any | None = None, tracing_exporter_registry: TracingExporterRegistry | None = None, metrics_exporter_registry: MetricsExporterRegistry | None = None )
Initialize OpenTelemetry.
def record_metric( name: str, value: Any, metric_type: str, labels: dict[str, str] | None = None ) -> None
def create_span( name: str, parent_context: SpanContext | None = None ) -> Span
OpenTelemetryConfig
Section titled “OpenTelemetryConfig”Configuration for OpenTelemetry backend.
Attributes: endpoint: OTLP endpoint URL. headers: Headers to send with OTLP requests. insecure: Use insecure connection (no TLS). timeout: Export timeout in seconds. compression: Compression type (none, gzip). batch_size: Batch size for exports. export_interval: Export interval in seconds.
PerformanceMetrics
Section titled “PerformanceMetrics”Aggregated performance metrics.
PerformanceMonitor
Section titled “PerformanceMonitor”Async performance monitor with periodic CPU/memory sampling.
Collects snapshots at config.sampling_interval intervals while active.
Use start_monitoring / stop_monitoring or the async
monitor_context context manager to bracket monitored regions.
def __init__(config: PerformanceMonitorConfig | None = None) -> None
Initialise monitor with optional configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | PerformanceMonitorConfig | None | Performance monitoring configuration (defaults apply). |
property state() -> PerformanceMonitorState
Return the current monitoring state.
property metrics() -> PerformanceMetrics
Return the accumulated performance metrics.
Start periodic metrics collection.
| Exception | Description |
|---|---|
| PerformanceMonitorError | If monitoring is already active. |
Stop metrics collection and finalise metrics.
async def monitor_context() -> AsyncGenerator[PerformanceMonitor, None]
Async context manager that starts and stops monitoring automatically.
| Type | Description |
|---|---|
| AsyncGenerator[PerformanceMonitor, None] | This monitor instance. |
async def profile_function( func: Any, *args: Any, **kwargs: Any ) -> FunctionProfileResult
Profile a synchronous or asynchronous function.
| Parameter | Type | Description |
|---|---|---|
| `func` | Any | Callable to profile (sync or async). *args: Positional arguments for *func*. **kwargs: Keyword arguments for *func*. |
| Type | Description |
|---|---|
| FunctionProfileResult | FunctionProfileResult with timing and memory data. |
PerformanceMonitorConfig
Section titled “PerformanceMonitorConfig”Configuration for performance monitoring.
PerformanceMonitorState
Section titled “PerformanceMonitorState”State of the performance monitor.
PerformanceSnapshot
Section titled “PerformanceSnapshot”A single performance metrics snapshot.
PrometheusBackend
Section titled “PrometheusBackend”Prometheus monitoring backend.
Initialize Prometheus backend.
| Parameter | Type | Description |
|---|---|---|
| `port` | int | Port for metrics HTTP server |
| Exception | Description |
|---|---|
| BackendNotAvailableError | If prometheus-client is not installed |
Initialize Prometheus metrics server.
If the configured port is already in use, log a warning and continue without failing the entire application startup.
Shutdown Prometheus server.
def record_metric( name: str, value: Any, metric_type: str, labels: dict[str, str] | None = None ) -> None
Record a metric using Prometheus.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | MetricProtocol name |
| `value` | Any | MetricProtocol value |
| `metric_type` | str | Type of metric (counter, gauge, histogram) |
| `labels` | dict[str, str] | None | Optional metric labels |
def create_span( name: str, parent_context: SpanContext | None = None ) -> Span
Create a span (Prometheus doesn’t do tracing, so this is a no-op).
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Span name |
| `parent_context` | SpanContext | None | Optional parent span context |
| Type | Description |
|---|---|
| Span | Dummy span instance |
PrometheusConfig
Section titled “PrometheusConfig”Configuration for Prometheus backend.
Attributes: port: Port for metrics HTTP server. path: Path for metrics endpoint. enable_default_metrics: Enable default process metrics. pushgateway_url: Optional Pushgateway URL for push-based metrics. push_interval: Interval for pushing metrics to Pushgateway (seconds). store_in_db: Whether to persist metric observations to the database. metrics_table: Name of the metrics table to write samples to.
PrometheusMetricsExporter
Section titled “PrometheusMetricsExporter”Async-compatible adapter for prometheus_client.
Wraps prometheus_client.Counter, Gauge, and Histogram objects
with a thread-safe, async-friendly API. Blocking calls are offloaded to a
thread pool via asyncio.to_thread so the event loop is never stalled.
Label schemas for each metric name are locked-in on first use; subsequent calls must supply the same label keys. If a call omits a previously-seen label key the value defaults to an empty string so the series is still valid.
A Prometheus-client ASGI application (/metrics endpoint) is
available via metrics_app. Mount it in your web application
# Inside a WebProvider or application startupmetrics_route = await container.resolve("prometheus_metrics_app")app.mount("/metrics", metrics_route)| Parameter | Type | Description |
|---|---|---|
| `registry` | Optional custom ``CollectorRegistry``. Defaults to a fresh isolated registry so multiple exporters in the same process do not collide. |
ASGI application that exposes the Prometheus /metrics endpoint.
| Type | Description |
|---|---|
| Any | An ASGI-compatible callable (from ``prometheus_client.make_asgi_app``). |
| Exception | Description |
|---|---|
| RuntimeError | If ``prometheus-client`` is not installed. |
Increment a counter by value.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | MetricProtocol name. |
| `value` | int | Amount to increment. |
| `tags` | dict[str, str] | None | Label key/value pairs. |
Set a gauge to value.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | MetricProtocol name. |
| `value` | float | New gauge value. |
| `tags` | dict[str, str] | None | Label key/value pairs. |
Observe value on a histogram.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | MetricProtocol name. |
| `value` | float | Observation value. |
| `tags` | dict[str, str] | None | Label key/value pairs. |
No-op — prometheus_client pushes are pull-based.
PrometheusMiddleware
Section titled “PrometheusMiddleware”ASGI middleware that automatically collects HTTP request metrics.
Wraps any ASGI application and records per-request counters, duration
histograms, and an active-request gauge. When prometheus_client is
installed the metrics are exposed in the standard Prometheus text format at
path (default /metrics);
otherwise the framework’s MetricsCollectorProtocol
is used so the application degrades gracefully without the optional
dependency.
| Parameter | Type | Description |
|---|---|---|
| `path` | URL path at which to serve the Prometheus scrape endpoint. Defaults to ``"/metrics"``. | |
| `metrics_collector` | Optional pre-configured MetricsCollectorProtocol. A new default collector is created when ``None`` is passed. |
Example
from lexigram.monitor.middleware import PrometheusMiddleware
app = PrometheusMiddleware(my_asgi_app, path="/metrics")# Mount *app* in your ASGI server of choice.Defines an acceptable performance target for a monitored metric.
An SLO specifies the maximum tolerable value for a percentile of a
metric (e.g. p99 latency < 200 ms) and the error-budget burn-rate
threshold above which an alert should be raised.
Attributes:
name: Unique name for this SLO (e.g. "api.p99_latency").
metric: Name of the metric to evaluate (e.g.
"http.request.duration").
percentile: Target percentile expressed as a fraction between
0.0 and 1.0 (e.g. 0.99 for p99).
threshold_ms: Maximum acceptable measured value in milliseconds.
window: Measurement window for rolling evaluation.
burn_rate_threshold: Fire an alert when the measured/threshold ratio
equals or exceeds this value. A value of 1.0 means “alert
the moment the threshold is exceeded”.
description: Optional human-readable description of the SLO.
SLOMonitor
Section titled “SLOMonitor”Tracks SLO compliance and fires alerts on error-budget exhaustion.
Records metric samples in memory, then evaluates each registered SLO when evaluate is called. Violations are returned as a list of SLOViolation objects and also emitted as structured log warnings.
Example
from datetime import timedeltafrom lexigram.monitor.slo import SLO, SLOMonitor
monitor = SLOMonitor()monitor.register(SLO( name="api.p99_latency", metric="http.request.duration", percentile=0.99, threshold_ms=200.0, window=timedelta(hours=1),))
monitor.record_sample("http.request.duration", 150.0)monitor.record_sample("http.request.duration", 350.0)
violations = await monitor.evaluate()def register(slo: SLO) -> None
Register an SLO for monitoring.
| Exception | Description |
|---|---|
| ValueError | If an SLO with the same name is already registered. |
Record a measurement sample for a metric.
| Parameter | Type | Description |
|---|---|---|
| `metric` | str | The metric name (matches registered SLO ``metric`` fields). |
| `value_ms` | float | The measurement value in milliseconds. |
async def evaluate() -> list[SLOViolation]
Evaluate all registered SLOs against recorded samples.
| Type | Description |
|---|---|
| list[SLOViolation] | A list of SLOViolation instances for every SLO that is in breach. Returns an empty list when all SLOs are within bounds or when no samples have been recorded yet. |
property registered_slos() -> dict[str, SLO]
Return registered SLOs keyed by name.
Remove recorded samples.
| Parameter | Type | Description |
|---|---|---|
| `metric` | str | None | If given, only clears samples for that metric. When ``None``, clears all recorded samples. |
SLOViolation
Section titled “SLOViolation”Represents a detected SLO threshold breach.
Attributes:
slo: The SLO that was violated.
measured_value: The percentile value that exceeded the threshold (ms).
burn_rate: ratio of measured_value to slo.threshold_ms at the
time of detection.
detected_at: UTC timestamp when the violation was detected.
message: Human-readable summary, auto-generated when empty.
SamplerType
Section titled “SamplerType”Supported tracing sampler types.
Distributed tracing span.
def set_status( status: SpanStatus, message: str | None = None ) -> None
Alias for end() for compatibility.
SpanContext
Section titled “SpanContext”Span context for propagation.
def from_traceparent( cls, traceparent: str ) -> SpanContext | None
SpanKind
Section titled “SpanKind”Span kind following OpenTelemetry specification.
SpanStatus
Section titled “SpanStatus”Span status following OpenTelemetry specification.
Summary
Section titled “Summary”Summary metric for calculating quantiles.
Alias for record — delegates to the canonical implementation.
Clear all observations and values for metric rotation.
Distributed tracer logic.
def __init__( service_name: str, exporter: SpanExporter, max_spans: int | None = None ) -> None
def start_span( name: str, context: SpanContext | None = None, attributes: dict[str, Any] | None = None, kind: SpanKind = SpanKind.INTERNAL ) -> Span
def get_all_spans() -> list[Span]
def get_current_span() -> Span | None
def inject_context( carrier: dict[str, str], context: SpanContext | None = None ) -> None
Inject trace context into carrier dict.
def extract_context(carrier: Mapping[str, str]) -> SpanContext | None
Extract trace context from carrier.
TracingConfig
Section titled “TracingConfig”Configuration for distributed tracing.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
Attributes: enabled: Whether tracing is enabled. service_name: Name of the service for traces. sampler_type: Type of sampling strategy. sample_rate: Sampling rate (0.0 to 1.0) for probability sampler. max_traces_per_second: Max traces/sec for rate limiting sampler. propagation_formats: Trace context propagation formats. max_attributes: Maximum attributes per span. max_events: Maximum events per span. max_links: Maximum links per span.
TracingExporterRegistry
Section titled “TracingExporterRegistry”Central registry for tracing exporters.
Manages a collection of handlers for creating tracing exporters. Handlers are checked in order until one is found that can handle the exporter type.
Example
registry = TracingExporterRegistry() registry.register(CustomTracingExporterHandler()) exporter = registry.create_exporter(config)
Initialize the registry without any handlers.
def with_defaults(cls) -> TracingExporterRegistry
Create a registry pre-populated with the built-in default handlers.
| Type | Description |
|---|---|
| TracingExporterRegistry | A new registry instance with all default tracing exporter handlers registered. |
Register a new tracing exporter handler.
| Parameter | Type | Description |
|---|---|---|
| `handler` | TracingExporterHandler | The handler to register. Added at the beginning of the handler list. |
Create an exporter for the given config.
| Parameter | Type | Description |
|---|---|---|
| `exp_config` | Any | Configuration object with a 'type' attribute. |
| Type | Description |
|---|---|
| Any | An exporter instance, or None if no handler can handle the type. |
Functions
Section titled “Functions”get_performance_summary
Section titled “get_performance_summary”
async def get_performance_summary(metrics: PerformanceMetrics) -> dict[str, Any]
Return a dictionary summary of performance metrics.
| Parameter | Type | Description |
|---|---|---|
| `metrics` | PerformanceMetrics | Accumulated PerformanceMetrics to summarise. |
| Type | Description |
|---|---|
| dict[str, Any] | Dictionary containing duration, sample counts, CPU/memory averages, task counts, and a ``has_profile_data`` flag. |
health_checker
Section titled “health_checker”
Decorator that marks a callable as a named health check.
The decorated function can later be registered on a HealthChecker via HealthChecker.register.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Human-readable name for this check (e.g. ``"database"``). |
Example
@health_checker("database")async def check_db() -> bool: return await db.ping()
checker = HealthChecker()checker.register(check_db)inject_trace_context
Section titled “inject_trace_context”
Inject current trace context into carrier for propagation.
Call this before sending a message to propagate trace context.
instrument_database
Section titled “instrument_database”
Instrument a Lexigram DatabaseService with OpenTelemetry tracing.
Wraps the provider’s execute and execute_query methods to automatically create spans and record metrics for each database operation.
Features:
- Automatic span creation for each query
- Query duration tracking
- Error recording and status tracking
- Metrics for query count and duration
| Parameter | Type | Description |
|---|---|---|
| `provider` | Any | The database provider to instrument. Must have 'execute' and optionally 'execute_query' methods. |
Example
from lexigram.monitor import instrument_database >>> from lexigram.sql import DatabaseService
provider = DatabaseService(config) instrument_database(provider)
Queries are now automatically traced
Section titled “Queries are now automatically traced”
metered
Section titled “metered”
def metered( metric_name: str | None = None, *, service: ObservabilityService | None = None ) -> Callable[[F], F]
Decorator that records function execution time as a histogram metric.
| Parameter | Type | Description |
|---|---|---|
| `metric_name` | str | None | Explicit metric name. Defaults to ``{module}.{qualname}.duration``. |
| `facade` | Specific facade instance. Uses module default if ``None``. |
monitor
Section titled “monitor”
Wrap a function with combined structured logging and wall-clock timing.
Convenience alternative to stacking @traced and @metered. Emits
operation_start, operation_end, and operation_error log events
with the elapsed duration in milliseconds. Works for both async and sync
callables.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | None | Metric/span name. Defaults to ``"{module}.{qualname}"``. |
| `log_args` | bool | When ``True``, the number of positional arguments is included in the log context. Defaults to ``False`` to avoid inadvertently logging sensitive data. |
| Type | Description |
|---|---|
| Callable[[F], F] | Decorator that wraps the target function with monitoring machinery. |
Example
@monitor("user_service.create")async def create_user(email: str) -> User: return await repo.save(User(email=email))
@monitor()def compute_score(data: list[float]) -> float: return sum(data) / len(data)monitor_async_operation
Section titled “monitor_async_operation”
async def monitor_async_operation(config: PerformanceMonitorConfig | None = None) -> AsyncGenerator[PerformanceMonitor, None]
Async context manager that monitors a region of async code.
Creates a PerformanceMonitor, starts it before entering the body, and stops it on exit.
| Parameter | Type | Description |
|---|---|---|
| `config` | PerformanceMonitorConfig | None | Optional PerformanceMonitorConfig. Default config is used when not provided. |
| Type | Description |
|---|---|
| AsyncGenerator[PerformanceMonitor, None] | Running PerformanceMonitor instance. |
Example
async with monitor_async_operation() as monitor: … await do_some_work() … print(monitor.metrics.duration)
profile_async_function
Section titled “profile_async_function”
async def profile_async_function( func: Any, *args: Any, **kwargs: Any ) -> tuple[Any, FunctionProfileResult]
Profile a single async (or sync) function call.
| Parameter | Type | Description |
|---|---|---|
| `func` | Any | Callable to profile. *args: Positional arguments forwarded to *func*. **kwargs: Keyword arguments forwarded to *func*. |
| Type | Description |
|---|---|
| tuple[Any, FunctionProfileResult] | A ``(result, profile)`` tuple where *result* is the return value of *func* and *profile* is a FunctionProfileResult. |
Example
result, profile = await profile_async_function(my_async_fn, arg1) … print(profile.execution_time)
trace_consume
Section titled “trace_consume”
async def trace_consume( channel: str, message_type: str = 'unknown', carrier: dict[str, Any] | None = None, **attributes: Any ) -> AsyncGenerator[Span, None]
Context manager for tracing message consumption.
Usage
async with trace_consume(“notifications”, carrier=headers) as span: await process_message(…)
trace_publish
Section titled “trace_publish”
async def trace_publish( channel: str, message_type: str = 'email', recipient: str | None = None, **attributes: Any ) -> AsyncGenerator[Span, None]
Context manager for tracing message publishing.
Usage
async with trace_publish(“notifications”, “email”, “user@example.com”) as span: await send_email(…)
def traced( span_name: str | None = None, *, service: ObservabilityService | None = None ) -> Callable[[F], F]
Decorator that wraps a function in a tracing span.
Automatically captures function name, arguments, and any raised exceptions as span events.
| Parameter | Type | Description |
|---|---|---|
| `span_name` | str | None | Explicit span name. Defaults to the qualified function name. |
| `facade` | Specific facade instance. Uses module default if ``None``. |
Exceptions
Section titled “Exceptions”BackendNotAvailableError
Section titled “BackendNotAvailableError”Raised when a required monitoring backend is not available.
InvalidMetricError
Section titled “InvalidMetricError”Raised when metric parameters are invalid.
MetricNotFoundError
Section titled “MetricNotFoundError”Raised when a requested metric is not found.
MonitorError
Section titled “MonitorError”Base exception for monitoring errors.
PerformanceMonitorError
Section titled “PerformanceMonitorError”Base exception for performance monitoring errors.
SpanError
Section titled “SpanError”Base exception for span-related errors.