Skip to content
GitHub

API Reference

Protocol for dispatching operational alerts.

Implementations route alerts to notification channels (logging, PagerDuty, Slack, etc.). lexigram-monitor ships a built-in LoggingAlertDispatcher that writes alerts to the structured logger.

Example

class SlackAlertDispatcher:
async def send_alert(
self,
title: str,
message: str,
severity: str,
context: dict[str, Any] | None = None,
) -> None:
await self._slack.post(
channel="#ops",
text=f"[{severity}] {title}: {message}",
)
class SlackAlertDispatcher:
async def send_alert(
self,
title: str,
message: str,
severity: str,
context: dict[str, Any] | None = None,
) -> None:
await self._slack.post(
channel="#ops",
text=f"[{severity}] {title}: {message}",
)
send_alert
async def send_alert(
    title: str,
    message: str,
    severity: str,
    context: dict[str, Any] | None = None
) -> None

Dispatch a free-form operational alert.

Parameters
ParameterTypeDescription
`title`strShort, human-readable alert title.
`message`strDetailed alert message.
`severity`strSeverity level string, e.g. ``"low"``, ``"high"``, ``"critical"``.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata.
send_metric_alert
async def send_metric_alert(
    metric_name: str,
    current_value: float,
    threshold: float,
    context: dict[str, Any] | None = None
) -> None

Dispatch an alert triggered by a metric threshold breach.

Parameters
ParameterTypeDescription
`metric_name`strName of the metric that breached its threshold.
`current_value`floatObserved metric value at the time of the alert.
`threshold`floatThe configured threshold that was exceeded.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata.

Structural contract for a single health-check component.

Any object implementing check() satisfies this protocol, regardless of inheritance from any concrete class.

check
async def check() -> HealthCheckResult

Perform the health check and return a result.


Protocol for metric implementations.

Metrics track numeric measurements over time.

name
property name() -> str

MetricProtocol name.

description
property description() -> str

MetricProtocol description.

record
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Record a metric value.

Parameters
ParameterTypeDescription
`value`floatNumeric value to record.
`labels`dict[str, str] | NoneOptional labels/tags.

Protocol for metrics backend implementations (metrics only).

Backends export metrics to external systems.

initialize
async def initialize() -> None

Initialize the metrics backend.

shutdown
async def shutdown() -> None

Shutdown the metrics backend.

record_metric
def record_metric(
    name: str,
    value: Any,
    metric_type: str,
    labels: dict[str, str] | None = None
) -> None

Record a metric value.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`AnyMetricProtocol value.
`metric_type`strType of metric (counter, gauge, histogram).
`labels`dict[str, str] | NoneOptional labels.

export
def export(spans: list[Span]) -> None

Payload fired when a monitoring alert is triggered.

Attributes: alert_name: Name of the alert rule that fired. severity: Severity level (e.g. "warning", "critical").


Supported monitoring backend types.

A single buffered metric recording.

Attributes: name: MetricProtocol name. value: Numeric value to record. labels: Optional label key-value pairs. timestamp: Monotonic timestamp when the entry was created.


Buffers metric recordings and flushes them in configurable batches.

Reduces metric backend overhead for high-throughput code paths by accumulating entries and flushing at intervals or when the buffer is full.

Parameters
ParameterTypeDescription
`backend`MetricProtocol backend instance with metric attributes exposing ``record()``.
`flush_interval`Seconds between automatic periodic flushes. Default: 5.0.
`max_buffer_size`Maximum entries before a forced flush. Default: 1000.

Example

recorder = BufferedMetricRecorder(backend=monitor_backend, flush_interval=2.0)
await recorder.start()
recorder.record("request_count", 1.0, {"method": "GET"})
# ... many more records ...
await recorder.stop() # flushes remaining entries
recorder = BufferedMetricRecorder(backend=monitor_backend, flush_interval=2.0)
await recorder.start()
recorder.record("request_count", 1.0, {"method": "GET"})
# ... many more records ...
await recorder.stop() # flushes remaining entries
__init__
def __init__(
    backend: Any,
    flush_interval: float = 5.0,
    max_buffer_size: int = 1000
) -> None
record
def record(
    name: str,
    value: float,
    labels: dict[str, str] | None = None
) -> None

Buffer a metric entry for deferred recording.

If the buffer reaches max_buffer_size, schedules an immediate flush.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name (must match an attribute on the backend).
`value`floatNumeric value to record.
`labels`dict[str, str] | NoneOptional label key-value pairs.
flush
async def flush() -> int

Flush all buffered entries to the backend.

Returns
TypeDescription
intNumber of entries successfully flushed.
start
async def start() -> None

Start the periodic background flush task.

stop
async def stop() -> None

Stop the background flush task and flush remaining entries.


Handler for console metrics exporter.

Creates a ConsoleMetricExporter for outputting metrics to the console.

can_handle
def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "console", False otherwise.
create_exporter
def create_exporter(exp_config: Any) -> Any

Create a console metrics exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration for the exporter (unused for console).
Returns
TypeDescription
AnyA ConsoleMetricExporter instance.

export
def export(spans: list[Span]) -> None

Handler for console tracing exporter.

Creates a ConsoleSpanExporter for outputting traces to the console.

can_handle
def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "console", False otherwise.
create_exporter
def create_exporter(exp_config: Any) -> Any

Create a console tracing exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration for the exporter (unused for console).
Returns
TypeDescription
AnyA ConsoleSpanExporter instance.

Monotonically increasing counter.
__init__
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)
name
property name() -> str
description
property description() -> str
record
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
increment
def increment(
    amount: int = 1,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

get_count
def get_count() -> int
reset
def reset() -> None

Reset the counter to zero and clear the observation history.


Health check implemented as a function.
__init__
def __init__(
    name: str,
    func: Callable,
    critical: bool = True
)
check
async def check() -> HealthCheckResult

Result of profiling a function.

Gauge that can go up and down.
__init__
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)
name
property name() -> str
description
property description() -> str
record
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
set
def set(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

increment
def increment(
    amount: float = 1,
    labels: dict[str, str] | None = None
) -> None

Alias for record — increments gauge by amount via canonical implementation.

decrement
def decrement(
    amount: float = 1,
    labels: dict[str, str] | None = None
) -> None

Alias for record — decrements gauge by amount via canonical implementation.

get_value
def get_value() -> float
reset
def reset() -> None

Reset the gauge to zero and clear the observation history.


Base class for health checks.
__init__
def __init__(
    name: str,
    critical: bool = True
)
check
async def check() -> HealthCheckResult

Categorises health checks by their role in the Kubernetes health model.

Attributes: LIVENESS: Checks that the application is alive (not deadlocked). Used by /health/live endpoints. Failure triggers a restart. READINESS: Checks that the application is ready to accept traffic. Used by /health/ready endpoints. Failure removes the instance from the load-balancer rotation. STARTUP: Checks that the application has finished its startup sequence. Used by /health/startup endpoints. Succeeded once; after that the probe switches to liveness/readiness.


Configuration for health checks.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether health checks are enabled. path: HTTP path for health endpoint. include_details: Include detailed component health in response. timeout: Timeout for health check operations (seconds). checks: List of health check names to include.


ASGI middleware that exposes a ``GET /health`` liveness/readiness endpoint.

Intercepts requests to path and returns a JSON health-status payload. All other requests are passed through with a 404 response (or forwarded to the next ASGI app when composed in a middleware stack).

The HTTP status code reflects the overall application health:

  • 200 — all checks healthy
  • 207 — at least one check degraded
  • 503 — at least one check unhealthy

To add custom checks, subclass HealthCheckProvider and override check_health.

Parameters
ParameterTypeDescription
`path`URL path for the health endpoint. Defaults to ``"/health"``.

Example

from lexigram.monitor.middleware import HealthCheckProvider
app = HealthCheckProvider(path="/ready")
# Mount *app* in your ASGI server / router.
from lexigram.monitor.middleware import HealthCheckProvider
app = HealthCheckProvider(path="/ready")
# Mount *app* in your ASGI server / router.
__init__
def __init__(path: str = '/health') -> None
check_health
async def check_health() -> dict

Return the current application health as a serialisable mapping.

Override this method in a subclass to perform real dependency checks (database ping, cache ping, etc.).

Returns
TypeDescription
dictA ``dict`` with at minimum a ``"status"`` key (one of ``"healthy"``, ``"degraded"``, or ``"unhealthy"``) and a ``"timestamp"`` float. Additional ``"checks"`` entries can be added by subclasses.

Registry for managing health checks.

Extends Registry for unified introspection. Sidecar lists track which checks participate in liveness vs readiness probes.

__init__
def __init__() -> None
add
def add(
    name: str,
    check: Callable[[], Any],
    *,
    timeout: float | None = None,
    critical: bool = True,
    category: Any = None
) -> None

Add a health check to satisfy HealthCheckRegistryProtocol.

run_all
async def run_all() -> tuple[Any, dict[str, Any]]
run_liveness
async def run_liveness() -> tuple[Any, dict[str, Any]]
run_readiness
async def run_readiness() -> tuple[Any, dict[str, Any]]
run_startup
async def run_startup() -> tuple[Any, dict[str, Any]]
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Satisfy HealthCheckProtocol.


Result of a health check.
to_dict
def to_dict() -> dict[str, Any]

Convert to dictionary format.

is_healthy
def is_healthy() -> bool

Check if status is healthy.

is_degraded
def is_degraded() -> bool

Check if status is degraded.


Payload fired after a health check probe completes.

Attributes: check_name: Name of the health check that ran. healthy: True if the check passed.


Aggregates and runs named health checks with optional per-check timeouts.

Checks can be tagged with a HealthCheckCategory (LIVENESS, READINESS, or STARTUP) and run selectively via run_liveness, run_readiness, or run_startup. Methods that omit a category default to READINESS.

__init__
def __init__() -> None
add
def add(
    name: str,
    check: HealthCheckProtocol | Callable[[], Any],
    *,
    timeout: float | None = None,
    category: HealthCheckCategory = HealthCheckCategory.READINESS
) -> None

Add a named health check with an optional timeout and category.

Parameters
ParameterTypeDescription
`name`strUnique identifier for this check.
`check`HealthCheckProtocol | Callable[[], Any]A HealthCheckProtocol instance or a callable returning a health indicator (bool, dict, or HealthCheckResult).
`timeout`float | NonePer-check timeout in seconds. If the check exceeds this duration it is reported as UNHEALTHY.
`category`HealthCheckCategoryWhether this check is a liveness, readiness, or startup check. Defaults to ``READINESS``.
remove
def remove(name: str) -> None

Remove a named health check.

Parameters
ParameterTypeDescription
`name`strThe check name to remove.
Raises
ExceptionDescription
KeyErrorIf no check with this name is registered.
has
def has(name: str) -> bool

Check whether a health check with the given name is registered.

Parameters
ParameterTypeDescription
`name`strThe check name.
Returns
TypeDescription
boolTrue if the check exists.
check_names
property check_names() -> list[str]

Sorted list of all registered health check names.

Returns
TypeDescription
list[str]A sorted list of check name strings.
register
def register(check: Callable[[], Any]) -> None

Register a callable decorated with health_checker.

The check name is read from the _health_check_name attribute stamped by the health_checker decorator.

Parameters
ParameterTypeDescription
`check`Callable[[], Any]A decorated health check callable.
Raises
ExceptionDescription
ValueErrorIf *check* was not decorated with health_checker.
run_all
async def run_all() -> dict[str, HealthCheckResult]

Run all health checks and return unified results.

Per-check timeouts are enforced when configured. Timed-out checks are reported as UNHEALTHY.

Returns
TypeDescription
dict[str, HealthCheckResult]A dict mapping check names to their ``HealthCheckResult``.
aggregate_status
def aggregate_status(results: dict[str, HealthCheckResult]) -> HealthStatus

Determine the overall health status from individual check results.

Returns HEALTHY only if every check is HEALTHY. Returns DEGRADED if any check is DEGRADED but none are UNHEALTHY. Returns UNHEALTHY otherwise.

Parameters
ParameterTypeDescription
`results`dict[str, HealthCheckResult]The dict returned by run_all.
Returns
TypeDescription
HealthStatusThe aggregate ``HealthStatus``.
run_all_with_summary
async def run_all_with_summary() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run all health checks and return the aggregate status together with details.

Convenience wrapper that calls run_all and aggregate_status in one step.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.
run_liveness
async def run_liveness() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run only LIVENESS checks and return aggregate status and details.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.
run_readiness
async def run_readiness() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run only READINESS checks and return aggregate status and details.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.
run_startup
async def run_startup() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run only STARTUP checks and return aggregate status and details.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.

Singleton-style registry for named CachedHealthChecker instances.

Manages the lifecycle of zero or more named checkers. The registry is registered as a container singleton by MonitorProvider; application code should resolve it via constructor injection rather than instantiating it directly.

Lifecycle

# At application startup — wired by MonitorProvider automatically.
registry = HealthCheckerRegistry()
checker = registry.get_or_create("api", db_provider=db)
# At application shutdown — MonitorProvider calls this.
await registry.cleanup()
# At application startup — wired by MonitorProvider automatically.
registry = HealthCheckerRegistry()
checker = registry.get_or_create("api", db_provider=db)
# At application shutdown — MonitorProvider calls this.
await registry.cleanup()
__init__
def __init__() -> None
get_or_create
def get_or_create(
    key: str = 'default',
    db_provider: Any = None,
    cache_ttl: float = 5.0,
    check_timeout: float = 2.0
) -> CachedHealthChecker

Return the named checker, creating it if it does not yet exist.

Subsequent calls with the same key always return the same instance regardless of db_provider, cache_ttl, or check_timeout — those arguments are ignored after the first call for a given key.

Parameters
ParameterTypeDescription
`key`strLogical name for this checker. Use distinct keys to manage independent groups of health checks (e.g. ``"api"`` vs ``"worker"``).
`db_provider`AnyOptional database provider injected into the checker for database connectivity verification.
`cache_ttl`floatSeconds to cache the last health-check result before re-running the underlying checks. Defaults to ``5.0``.
`check_timeout`floatPer-check execution timeout in seconds. Defaults to ``2.0``.
Returns
TypeDescription
CachedHealthCheckerThe CachedHealthChecker registered under *key*.
get_checker
def get_checker(key: str = 'default') -> CachedHealthChecker | None

Return the checker registered under key, or None if absent.

Parameters
ParameterTypeDescription
`key`strThe logical name used when the checker was created.
Returns
TypeDescription
CachedHealthChecker | NoneThe CachedHealthChecker or ``None``.
list_checkers
def list_checkers() -> dict[str, CachedHealthChecker]

Return a snapshot of all registered checkers keyed by name.

Returns
TypeDescription
dict[str, CachedHealthChecker]A shallow copy of the internal registry mapping.
cleanup
async def cleanup() -> None

Stop background refresh tasks for every checker and clear the registry.

Called automatically by MonitorProvider during application shutdown. It is safe to call multiple times.


Unified health status.

Histogram for measuring distributions.
__init__
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
)
name
property name() -> str
description
property description() -> str
record
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
observe
def observe(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

get_observations
def get_observations() -> list[float]
get_bucket_counts
def get_bucket_counts() -> dict[float, int]
reset
def reset() -> None

Clear all observations and values for metric rotation.


In-memory trace provider using Tracer.
__init__
def __init__(
    service_name: str = 'lexigram-service',
    max_spans: int = DEFAULT_MAX_SPANS,
    exporter: SpanExporter | None = None
) -> None
create_span
def create_span(
    name: str,
    parent: Span | None = None
) -> Span
start_span
def start_span(
    name: str,
    attributes: dict[str, Any] | None = None,
    context: Any | None = None
) -> Span
inject_context
def inject_context(
    carrier: dict[str, str],
    context: Any | None = None
) -> None
extract_context
def extract_context(carrier: dict[str, str]) -> Any | None
get_current_span
def get_current_span() -> Span | None
set_current_span
def set_current_span(span: Span | None) -> None
get_all_spans
def get_all_spans() -> list[Span]

Dispatches alerts to the structured logger.

Implements AlertDispatcherProtocol.

Low and medium-severity alerts are logged at WARNING level; high and critical alerts are logged at ERROR level.

This implementation is always available with no external dependencies and is registered by MonitorProvider as the default AlertDispatcherProtocol binding when no other dispatcher is configured.

send_alert
async def send_alert(
    title: str,
    message: str,
    severity: str,
    context: dict[str, Any] | None = None
) -> None

Dispatch a free-form operational alert to the structured logger.

Parameters
ParameterTypeDescription
`title`strShort, human-readable alert title.
`message`strDetailed alert message.
`severity`strSeverity level string, e.g. ``"low"``, ``"high"``, ``"critical"``.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata included as structured key-value pairs on the log record.
send_metric_alert
async def send_metric_alert(
    metric_name: str,
    current_value: float,
    threshold: float,
    context: dict[str, Any] | None = None
) -> None

Dispatch a metric-threshold alert to the structured logger.

The alert is always logged at WARNING level unless the breach exceeds the threshold by more than 100 %, in which case ERROR is used.

Parameters
ParameterTypeDescription
`metric_name`strName of the metric that breached its threshold.
`current_value`floatObserved metric value at the time of the alert.
`threshold`floatThe configured threshold that was exceeded.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata.

Configuration for structured logging.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether structured logging is enabled. level: Default log level. format: Log format (json, text). include_trace_context: Include trace context in logs. redact_fields: Fields to redact from logs.

validate_level
def validate_level(
    cls,
    v: str
) -> str

Validate log level.


Proxy for a specific metric that provides a stateful API.

Bridges the gap between the MetricsCollectorProtocol (which is name-based) and consumers who expect a persistent metric object.

__init__
def __init__(
    name: str,
    collector: MetricsCollectorProtocol,
    metrics_type: str
) -> None
name
property name() -> str

MetricProtocol name.

increment
def increment(amount: float = 1.0) -> None

Increment the counter.

record
def record(value: float) -> None

Record a histogram value.

value
property value() -> float

Current counter value (tracked locally).


Payload fired when a metric data point is recorded.

Attributes: metric_name: Name of the metric being recorded. value: Numeric value of the data point.


Represents a metric measurement.

Central metrics collection and reporting.
__init__
def __init__() -> None
register_metric
def register_metric(metric: MetricProtocol) -> None

Register an existing metric instrument.

create_counter
def create_counter(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> Counter
create_gauge
def create_gauge(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> Gauge
create_summary
def create_summary(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> Summary
create_histogram
def create_histogram(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
) -> Histogram
get_metric
def get_metric(name: str) -> MetricProtocol | None
get_all_metrics
def get_all_metrics() -> dict[str, MetricProtocol]
clear
def clear() -> None
increment
def increment(
    name: str,
    value: float = 1.0,
    tags: dict[str, str] | None = None
) -> None

Increment a counter metric.

gauge
def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Set a gauge metric.

histogram
def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Record a histogram value.


Configuration for metrics collection.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether metrics collection is enabled. prefix: Prefix for all metric names. default_labels: Default labels to add to all metrics. histogram_buckets: Default bucket boundaries for histograms. collection_interval: Interval for periodic metrics collection (seconds).

make_metric_name
def make_metric_name(name: str) -> str

Create a prefixed metric name.

Parameters
ParameterTypeDescription
`name`strThe metric name.
Returns
TypeDescription
strPrefixed metric name.

Central registry for metrics exporters.

Manages a collection of handlers for creating metrics exporters. Handlers are checked in order until one is found that can handle the exporter type.

Example

registry = MetricsExporterRegistry()
registry.register(CustomMetricsExporterHandler())
exporter = registry.create_exporter(config)
__init__
def __init__() -> None

Initialize the registry without any handlers.

with_defaults
def with_defaults(cls) -> MetricsExporterRegistry

Create a registry pre-populated with the built-in default handlers.

Returns
TypeDescription
MetricsExporterRegistryA new registry instance with all default metrics exporter handlers registered.
register
def register(handler: MetricsExporterHandler) -> None

Register a new metrics exporter handler.

Parameters
ParameterTypeDescription
`handler`MetricsExporterHandlerThe handler to register. Added at the beginning of the handler list.
create_exporter
def create_exporter(exp_config: Any) -> Any

Create an exporter for the given config.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration object with a 'type' attribute.
Returns
TypeDescription
AnyAn exporter instance, or None if no handler can handle the type.

Hierarchical root configuration for Lexigram Monitor.

Attributes: enabled: Whether monitoring is enabled name: Configuration name (default: “monitor”) backend_type: Monitoring backend type metrics: Metrics configuration tracing: Tracing configuration health: Health check configuration logging: Logging configuration opentelemetry: OpenTelemetry configuration prometheus: Prometheus configuration environment: Environment name debug: Debug mode

is_production
property is_production() -> bool

Check if running in production environment.

is_development
property is_development() -> bool

Check if running in development environment.

is_test
property is_test() -> bool

Check if running in test environment.

validate_for_environment
def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Validate monitoring configuration for the given environment.

Parameters
ParameterTypeDescription
`env`Environment | NoneTarget environment; resolved from ``LEX_ENV`` when ``None``.
Returns
TypeDescription
list[ConfigIssue]List of ConfigIssue instances.
get_backend_config
def get_backend_config() -> OpenTelemetryConfig | PrometheusConfig | None

Get the configuration for the selected backend.

Returns
TypeDescription
OpenTelemetryConfig | PrometheusConfig | NoneBackend-specific configuration or None for memory backend.
make_exporter
def make_exporter() -> Any | None

Construct an optional metrics exporter appropriate for the backend.

Returns
TypeDescription
Any | NoneMetricsExporter instance or None if no exporter is available.

Metrics collection, distributed tracing, health checks, and profiling.

Call configure to configure the monitoring subsystem.

Usage (no-op / development)

from lexigram.monitor.backends.noop import NoopMetricsBackend
@module(
imports=[MonitorModule.configure(backend=NoopMetricsBackend())]
)
class AppModule(Module):
pass
from lexigram.monitor.backends.noop import NoopMetricsBackend
@module(
imports=[MonitorModule.configure(backend=NoopMetricsBackend())]
)
class AppModule(Module):
pass

Usage (Prometheus)

from lexigram.monitor.backends.exporters.prometheus import PrometheusMetricsExporter
from lexigram.monitor.backends.prometheus import PrometheusBackend
@module(
imports=[MonitorModule.configure(backend=PrometheusBackend())]
)
class AppModule(Module):
pass
from lexigram.monitor.backends.exporters.prometheus import PrometheusMetricsExporter
from lexigram.monitor.backends.prometheus import PrometheusBackend
@module(
imports=[MonitorModule.configure(backend=PrometheusBackend())]
)
class AppModule(Module):
pass
configure
def configure(
    cls,
    backend: Any = None,
    config: Any | None = None
) -> DynamicModule

Create a MonitorModule with explicit configuration.

Parameters
ParameterTypeDescription
`backend`Any``MetricsBackendProtocol`` implementation. Defaults to ``NoOpMetricsBackend`` when omitted.
`config`Any | None``MonitorConfig`` for advanced tracing and profiling settings.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
stub
def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return a no-op MonitorModule for unit testing.

Registers a NoOpMetricsBackend that discards all metrics. No external telemetry systems are connected.

Parameters
ParameterTypeDescription
`config`AnyOptional test configuration override.
Returns
TypeDescription
DynamicModuleA DynamicModule with noop metrics and tracing.

Monitoring and observability provider for Lexigram Framework
__init__
def __init__(
    backend: MonitoringBackend,
    exporter: Any | None = None,
    config: Any | None = None
)
from_config
def from_config(
    cls,
    config: MonitorConfig,
    **context: Any
) -> MonitorProvider

Create a MonitorProvider from config.

Delegates to the create_provider_from_config factory.

register
async def register(container: ContainerRegistrarProtocol) -> None

Register monitoring services with the container.

Binds all monitoring singletons into the DI container. DB-backed exporter wiring is deferred to boot() where the full container graph is available.

boot
async def boot(container: BootContainerProtocol) -> None

Start the monitoring provider and wire the observability facade.

After all providers are registered the container graph is complete, so this is the correct place to resolve optional cross-provider dependencies such as the database exporter.

shutdown
async def shutdown() -> None

Shutdown the monitoring provider

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check monitoring provider health

record_request
def record_request(
    method: str,
    path: str,
    duration: float,
    status_code: int
) -> None

Record an HTTP request

record_connection_change
def record_connection_change(delta: int) -> None

Record connection count change

create_counter
def create_counter(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)

Create a counter metric

create_gauge
def create_gauge(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)

Create a gauge metric

create_histogram
def create_histogram(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
)

Create a histogram metric


No-op registry for categorised health checks.
add
def add(
    name: str,
    check: Callable[[], Any],
    *,
    timeout: float | None = None,
    critical: bool = True,
    category: Any = None
) -> None

Ignore health check registration.

run_all
async def run_all() -> tuple[Any, dict[str, Any]]

Return an empty aggregate result.

run_liveness
async def run_liveness() -> tuple[Any, dict[str, Any]]

Return an empty liveness result.

run_readiness
async def run_readiness() -> tuple[Any, dict[str, Any]]

Return an empty readiness result.

run_startup
async def run_startup() -> tuple[Any, dict[str, Any]]

Return an empty startup result.

health_check
async def health_check() -> HealthCheckResult

Report the no-op registry as healthy.


Metrics collector that drops all observations.
__init__
def __init__() -> None
name
property name() -> str

Return a placeholder metric name.

description
property description() -> str

Return a placeholder metric description.

register_metric
def register_metric(metric: MetricProtocol) -> None

Ignore pre-created metrics.

increment
def increment(
    name: str,
    value: float = 1.0,
    tags: dict[str, str] | None = None
) -> None

Ignore counter increments.

gauge
def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Ignore gauge values.

histogram
def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Ignore histogram samples.

create_counter
def create_counter(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> NoOpMetricsCollector

Return the collector itself.

create_gauge
def create_gauge(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> NoOpMetricsCollector

Return the collector itself.

create_histogram
def create_histogram(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
) -> NoOpMetricsCollector

Return the collector itself.

record
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Ignore recorded metric values.


No-op span that silently drops all tracing operations.
__init__
def __init__(name: str = '') -> None
set_attribute
def set_attribute(
    key: str,
    value: Any
) -> None

Store span attributes for introspection in tests.

add_event
def add_event(
    name: str,
    attributes: dict[str, Any] | None = None
) -> None

Ignore span events.

record_exception
def record_exception(exception: Exception) -> None

Ignore recorded exceptions.

set_status
def set_status(status: str) -> None

Ignore span status updates.

end
def end() -> None

Mark the span as ended without side effects.


No-op tracer that returns NoOpSpan instances.
start_span
def start_span(
    name: str,
    attributes: dict[str, Any] | None = None,
    context: Any | None = None
) -> NoOpSpan

Start a no-op span.

get_current_span
def get_current_span() -> SpanProtocol | None

Return no active span.

inject_context
def inject_context(
    carrier: dict[str, str],
    context: Any | None = None
) -> None

Leave the carrier unchanged.

extract_context
def extract_context(carrier: dict[str, str]) -> Any | None

Return no extracted context.


Handler for OTLP metrics exporter.

Creates an OTLPMetricExporter for sending metrics to an OTLP-compatible backend.

can_handle
def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "otlp", False otherwise.
create_exporter
def create_exporter(exp_config: Any) -> Any

Create an OTLP metrics exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration with endpoint and headers.
Returns
TypeDescription
AnyAn OTLPMetricExporter instance.

Handler for OTLP tracing exporter.

Creates an OTLPSpanExporter for sending traces to an OTLP-compatible backend.

can_handle
def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "otlp", False otherwise.
create_exporter
def create_exporter(exp_config: Any) -> Any

Create an OTLP tracing exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration with endpoint and headers.
Returns
TypeDescription
AnyAn OTLPSpanExporter instance.

Middleware for OpenTelemetry tracing and metrics in Lexigram-Web.

Provides automatic tracing and metrics collection for HTTP requests. Gracefully degrades when OpenTelemetry is not available or has import issues.

Features:

  • Automatic span creation for each HTTP request
  • Extracts parent trace context from incoming headers
  • Records request count and duration metrics
  • Captures HTTP status codes and error conditions

Example

from lexigram.monitor import OTelMiddleware
app = OTelMiddleware(my_asgi_app)
__init__
def __init__(app: ASGIApp)

Initialize the OTel middleware.

Parameters
ParameterTypeDescription
`app`ASGIAppThe ASGI application to wrap.

Provides observability: metrics, tracing, and health checks.

Registers lightweight no-op stubs for every observability contract so that application code can always resolve MetricsCollectorProtocol, TracerProtocol, and HealthCheckRegistryProtocol via DI — even when lexigram-monitor is not installed.

When lexigram-monitor is installed, its MonitorProvider runs at a higher priority and overrides these registrations with the full implementations. This provider must not import from lexigram.monitor directly; doing so would violate the core ↔ extension hierarchy.

register
async def register(container: ContainerRegistrarProtocol) -> None

Register no-op observability stubs into the container.

lexigram-monitor’s own provider will override these at its higher priority if the package is present.

boot
async def boot(container: ContainerResolverProtocol) -> None

No boot-time work required for the observability module.

shutdown
async def shutdown() -> None

No resources to release for the observability module.


Unified service for observability (traces, metrics, health).

Automatically detects if the lexigram-monitor extension is available and delegates to it. Otherwise, uses NoOp implementations that still allow application code to instrument without errors.

Parameters
ParameterTypeDescription
`tracer`Optional tracer backend. If ``None``, uses NoOpTracer.
`meter`Optional meter backend. If ``None``, uses NoOpMetricsCollector.
__init__
def __init__(
    tracer: TracerProtocol | None = None,
    meter: MetricsCollectorProtocol | None = None
) -> None
register_metric
def register_metric(metric: MetricProtocol) -> None

Register an existing metric instrument.

Parameters
ParameterTypeDescription
`metric`MetricProtocolPre-defined metric instance.
trace
def trace(
    name: str,
    **attributes: Any
) -> Iterator[SpanProtocol]

Create a tracing span.

If a real tracer is configured, delegates to its start_span method. Otherwise yields a NoOpSpan.

Parameters
ParameterTypeDescription
`name`strSpan name. **attributes: Initial span attributes.
Yields
TypeDescription
Iterator[SpanProtocol]A span object with ``set_attribute()`` and ``add_event()`` methods.
counter
def counter(name: str) -> MetricProxy

Get or create a counter metric.

Parameters
ParameterTypeDescription
`name`strCounter name.
Returns
TypeDescription
MetricProxyA counter with ``increment()`` method.
histogram
def histogram(name: str) -> MetricProxy

Get or create a histogram metric.

Parameters
ParameterTypeDescription
`name`strHistogram name.
Returns
TypeDescription
MetricProxyA histogram with ``record()`` method.
timed
def timed(metric_name: str) -> Iterator[None]

Context manager that records execution duration as a histogram.

Parameters
ParameterTypeDescription
`metric_name`strThe histogram metric name.
Yields
TypeDescription
Iterator[None]None — timing is recorded on exit.

OpenTelemetry monitoring backend.
__init__
def __init__(
    service_name: str = 'lexigram-app',
    endpoint: str | None = None,
    config: Any | None = None,
    tracing_exporter_registry: TracingExporterRegistry | None = None,
    metrics_exporter_registry: MetricsExporterRegistry | None = None
)
initialize
async def initialize() -> None

Initialize OpenTelemetry.

shutdown
async def shutdown() -> None
record_metric
def record_metric(
    name: str,
    value: Any,
    metric_type: str,
    labels: dict[str, str] | None = None
) -> None
create_span
def create_span(
    name: str,
    parent_context: SpanContext | None = None
) -> Span

Configuration for OpenTelemetry backend.

Attributes: endpoint: OTLP endpoint URL. headers: Headers to send with OTLP requests. insecure: Use insecure connection (no TLS). timeout: Export timeout in seconds. compression: Compression type (none, gzip). batch_size: Batch size for exports. export_interval: Export interval in seconds.

validate_compression
def validate_compression(
    cls,
    v: str
) -> str

Validate compression type.


Aggregated performance metrics.
duration
property duration() -> float | None
samples_per_second
property samples_per_second() -> float

Async performance monitor with periodic CPU/memory sampling.

Collects snapshots at config.sampling_interval intervals while active. Use start_monitoring / stop_monitoring or the async monitor_context context manager to bracket monitored regions.

__init__
def __init__(config: PerformanceMonitorConfig | None = None) -> None

Initialise monitor with optional configuration.

Parameters
ParameterTypeDescription
`config`PerformanceMonitorConfig | NonePerformance monitoring configuration (defaults apply).
state
property state() -> PerformanceMonitorState

Return the current monitoring state.

metrics
property metrics() -> PerformanceMetrics

Return the accumulated performance metrics.

start_monitoring
async def start_monitoring() -> None

Start periodic metrics collection.

Raises
ExceptionDescription
PerformanceMonitorErrorIf monitoring is already active.
stop_monitoring
async def stop_monitoring() -> None

Stop metrics collection and finalise metrics.

monitor_context
async def monitor_context() -> AsyncGenerator[PerformanceMonitor, None]

Async context manager that starts and stops monitoring automatically.

Yields
TypeDescription
AsyncGenerator[PerformanceMonitor, None]This monitor instance.
profile_function
async def profile_function(
    func: Any,
    *args: Any,
    **kwargs: Any
) -> FunctionProfileResult

Profile a synchronous or asynchronous function.

Parameters
ParameterTypeDescription
`func`AnyCallable to profile (sync or async). *args: Positional arguments for *func*. **kwargs: Keyword arguments for *func*.
Returns
TypeDescription
FunctionProfileResultFunctionProfileResult with timing and memory data.

Configuration for performance monitoring.

State of the performance monitor.

A single performance metrics snapshot.

Prometheus monitoring backend.
__init__
def __init__(port: int = 8000)

Initialize Prometheus backend.

Parameters
ParameterTypeDescription
`port`intPort for metrics HTTP server
Raises
ExceptionDescription
BackendNotAvailableErrorIf prometheus-client is not installed
initialize
async def initialize() -> None

Initialize Prometheus metrics server.

If the configured port is already in use, log a warning and continue without failing the entire application startup.

shutdown
async def shutdown() -> None

Shutdown Prometheus server.

record_metric
def record_metric(
    name: str,
    value: Any,
    metric_type: str,
    labels: dict[str, str] | None = None
) -> None

Record a metric using Prometheus.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name
`value`AnyMetricProtocol value
`metric_type`strType of metric (counter, gauge, histogram)
`labels`dict[str, str] | NoneOptional metric labels
create_span
def create_span(
    name: str,
    parent_context: SpanContext | None = None
) -> Span

Create a span (Prometheus doesn’t do tracing, so this is a no-op).

Parameters
ParameterTypeDescription
`name`strSpan name
`parent_context`SpanContext | NoneOptional parent span context
Returns
TypeDescription
SpanDummy span instance

Configuration for Prometheus backend.

Attributes: port: Port for metrics HTTP server. path: Path for metrics endpoint. enable_default_metrics: Enable default process metrics. pushgateway_url: Optional Pushgateway URL for push-based metrics. push_interval: Interval for pushing metrics to Pushgateway (seconds). store_in_db: Whether to persist metric observations to the database. metrics_table: Name of the metrics table to write samples to.


Async-compatible adapter for prometheus_client.

Wraps prometheus_client.Counter, Gauge, and Histogram objects with a thread-safe, async-friendly API. Blocking calls are offloaded to a thread pool via asyncio.to_thread so the event loop is never stalled.

Label schemas for each metric name are locked-in on first use; subsequent calls must supply the same label keys. If a call omits a previously-seen label key the value defaults to an empty string so the series is still valid.

A Prometheus-client ASGI application (/metrics endpoint) is available via metrics_app. Mount it in your web application

# Inside a WebProvider or application startup
metrics_route = await container.resolve("prometheus_metrics_app")
app.mount("/metrics", metrics_route)
# Inside a WebProvider or application startup
metrics_route = await container.resolve("prometheus_metrics_app")
app.mount("/metrics", metrics_route)
Parameters
ParameterTypeDescription
`registry`Optional custom ``CollectorRegistry``. Defaults to a fresh isolated registry so multiple exporters in the same process do not collide.
__init__
def __init__(registry: Any = None) -> None
metrics_app
property metrics_app() -> Any

ASGI application that exposes the Prometheus /metrics endpoint.

Returns
TypeDescription
AnyAn ASGI-compatible callable (from ``prometheus_client.make_asgi_app``).
Raises
ExceptionDescription
RuntimeErrorIf ``prometheus-client`` is not installed.
counter
async def counter(
    name: str,
    value: int,
    tags: dict[str, str] | None = None
) -> None

Increment a counter by value.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`intAmount to increment.
`tags`dict[str, str] | NoneLabel key/value pairs.
gauge
async def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Set a gauge to value.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`floatNew gauge value.
`tags`dict[str, str] | NoneLabel key/value pairs.
histogram
async def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Observe value on a histogram.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`floatObservation value.
`tags`dict[str, str] | NoneLabel key/value pairs.
flush
async def flush() -> None

No-op — prometheus_client pushes are pull-based.


ASGI middleware that automatically collects HTTP request metrics.

Wraps any ASGI application and records per-request counters, duration histograms, and an active-request gauge. When prometheus_client is installed the metrics are exposed in the standard Prometheus text format at path (default /metrics); otherwise the framework’s MetricsCollectorProtocol is used so the application degrades gracefully without the optional dependency.

Parameters
ParameterTypeDescription
`path`URL path at which to serve the Prometheus scrape endpoint. Defaults to ``"/metrics"``.
`metrics_collector`Optional pre-configured MetricsCollectorProtocol. A new default collector is created when ``None`` is passed.

Example

from lexigram.monitor.middleware import PrometheusMiddleware
app = PrometheusMiddleware(my_asgi_app, path="/metrics")
# Mount *app* in your ASGI server of choice.
from lexigram.monitor.middleware import PrometheusMiddleware
app = PrometheusMiddleware(my_asgi_app, path="/metrics")
# Mount *app* in your ASGI server of choice.
__init__
def __init__(
    path: str = '/metrics',
    metrics_collector: MetricsCollectorProtocol | None = None
)
set_next_app
def set_next_app(app: Callable) -> None
next_app
async def next_app(
    scope: dict,
    receive: Callable,
    send: Callable
) -> None

Defines an acceptable performance target for a monitored metric.

An SLO specifies the maximum tolerable value for a percentile of a metric (e.g. p99 latency < 200 ms) and the error-budget burn-rate threshold above which an alert should be raised.

Attributes: name: Unique name for this SLO (e.g. "api.p99_latency"). metric: Name of the metric to evaluate (e.g. "http.request.duration"). percentile: Target percentile expressed as a fraction between 0.0 and 1.0 (e.g. 0.99 for p99). threshold_ms: Maximum acceptable measured value in milliseconds. window: Measurement window for rolling evaluation. burn_rate_threshold: Fire an alert when the measured/threshold ratio equals or exceeds this value. A value of 1.0 means “alert the moment the threshold is exceeded”. description: Optional human-readable description of the SLO.


Tracks SLO compliance and fires alerts on error-budget exhaustion.

Records metric samples in memory, then evaluates each registered SLO when evaluate is called. Violations are returned as a list of SLOViolation objects and also emitted as structured log warnings.

Example

from datetime import timedelta
from lexigram.monitor.slo import SLO, SLOMonitor
monitor = SLOMonitor()
monitor.register(SLO(
name="api.p99_latency",
metric="http.request.duration",
percentile=0.99,
threshold_ms=200.0,
window=timedelta(hours=1),
))
monitor.record_sample("http.request.duration", 150.0)
monitor.record_sample("http.request.duration", 350.0)
violations = await monitor.evaluate()
from datetime import timedelta
from lexigram.monitor.slo import SLO, SLOMonitor
monitor = SLOMonitor()
monitor.register(SLO(
name="api.p99_latency",
metric="http.request.duration",
percentile=0.99,
threshold_ms=200.0,
window=timedelta(hours=1),
))
monitor.record_sample("http.request.duration", 150.0)
monitor.record_sample("http.request.duration", 350.0)
violations = await monitor.evaluate()
__init__
def __init__(max_samples_per_metric: int = 10000) -> None
register
def register(slo: SLO) -> None

Register an SLO for monitoring.

Parameters
ParameterTypeDescription
`slo`SLOThe SLO to track.
Raises
ExceptionDescription
ValueErrorIf an SLO with the same name is already registered.
record_sample
def record_sample(
    metric: str,
    value_ms: float
) -> None

Record a measurement sample for a metric.

Parameters
ParameterTypeDescription
`metric`strThe metric name (matches registered SLO ``metric`` fields).
`value_ms`floatThe measurement value in milliseconds.
evaluate
async def evaluate() -> list[SLOViolation]

Evaluate all registered SLOs against recorded samples.

Returns
TypeDescription
list[SLOViolation]A list of SLOViolation instances for every SLO that is in breach. Returns an empty list when all SLOs are within bounds or when no samples have been recorded yet.
registered_slos
property registered_slos() -> dict[str, SLO]

Return registered SLOs keyed by name.

clear_samples
def clear_samples(metric: str | None = None) -> None

Remove recorded samples.

Parameters
ParameterTypeDescription
`metric`str | NoneIf given, only clears samples for that metric. When ``None``, clears all recorded samples.

Represents a detected SLO threshold breach.

Attributes: slo: The SLO that was violated. measured_value: The percentile value that exceeded the threshold (ms). burn_rate: ratio of measured_value to slo.threshold_ms at the time of detection. detected_at: UTC timestamp when the violation was detected. message: Human-readable summary, auto-generated when empty.


Supported tracing sampler types.

Distributed tracing span.
set_attribute
def set_attribute(
    key: str,
    value: Any
) -> None
add_event
def add_event(
    name: str,
    attributes: dict[str, Any] | None = None
) -> None
set_status
def set_status(status: str) -> None
record_exception
def record_exception(exception: Exception) -> None
end
def end() -> None
get_duration
def get_duration() -> float | None
finish
def finish() -> None

Alias for end() for compatibility.


Span context for propagation.
to_traceparent
def to_traceparent() -> str
from_traceparent
def from_traceparent(
    cls,
    traceparent: str
) -> SpanContext | None

Span kind following OpenTelemetry specification.

Span status following OpenTelemetry specification.

Summary metric for calculating quantiles.
__init__
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)
name
property name() -> str
description
property description() -> str
record
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
observe
def observe(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

get_observations
def get_observations() -> list[float]
reset
def reset() -> None

Clear all observations and values for metric rotation.


Distributed tracer logic.
__init__
def __init__(
    service_name: str,
    exporter: SpanExporter,
    max_spans: int | None = None
) -> None
start_span
def start_span(
    name: str,
    attributes: dict[str, Any] | None = None,
    context: Any | None = None,
    kind: SpanKind = SpanKind.INTERNAL
) -> Span
flush
def flush() -> None
get_all_spans
def get_all_spans() -> list[Span]
get_current_span
def get_current_span() -> Span | None
inject_context
def inject_context(
    carrier: dict[str, str],
    context: SpanContext | None = None
) -> None

Inject trace context into carrier dict.

extract_context
def extract_context(carrier: Mapping[str, str]) -> SpanContext | None

Extract trace context from carrier.


Configuration for distributed tracing.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether tracing is enabled. service_name: Name of the service for traces. sampler_type: Type of sampling strategy. sample_rate: Sampling rate (0.0 to 1.0) for probability sampler. max_traces_per_second: Max traces/sec for rate limiting sampler. propagation_formats: Trace context propagation formats. max_attributes: Maximum attributes per span. max_events: Maximum events per span. max_links: Maximum links per span.

validate_sample_rate
def validate_sample_rate(
    cls,
    v: float
) -> float

Validate sample rate is between 0 and 1.


Central registry for tracing exporters.

Manages a collection of handlers for creating tracing exporters. Handlers are checked in order until one is found that can handle the exporter type.

Example

registry = TracingExporterRegistry()
registry.register(CustomTracingExporterHandler())
exporter = registry.create_exporter(config)
__init__
def __init__() -> None

Initialize the registry without any handlers.

with_defaults
def with_defaults(cls) -> TracingExporterRegistry

Create a registry pre-populated with the built-in default handlers.

Returns
TypeDescription
TracingExporterRegistryA new registry instance with all default tracing exporter handlers registered.
register
def register(handler: TracingExporterHandler) -> None

Register a new tracing exporter handler.

Parameters
ParameterTypeDescription
`handler`TracingExporterHandlerThe handler to register. Added at the beginning of the handler list.
create_exporter
def create_exporter(exp_config: Any) -> Any

Create an exporter for the given config.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration object with a 'type' attribute.
Returns
TypeDescription
AnyAn exporter instance, or None if no handler can handle the type.

get_performance_summary
async def get_performance_summary(metrics: PerformanceMetrics) -> dict[str, Any]
Return a dictionary summary of performance metrics.
Parameters
ParameterTypeDescription
`metrics`PerformanceMetricsAccumulated PerformanceMetrics to summarise.
Returns
TypeDescription
dict[str, Any]Dictionary containing duration, sample counts, CPU/memory averages, task counts, and a ``has_profile_data`` flag.

health_checker
def health_checker(name: str) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]
Decorator that marks a callable as a named health check.

The decorated function can later be registered on a HealthChecker via HealthChecker.register.

Parameters
ParameterTypeDescription
`name`strHuman-readable name for this check (e.g. ``"database"``).

Example

@health_checker("database")
async def check_db() -> bool:
return await db.ping()
checker = HealthChecker()
checker.register(check_db)
@health_checker("database")
async def check_db() -> bool:
return await db.ping()
checker = HealthChecker()
checker.register(check_db)

inject_trace_context
def inject_trace_context(carrier: dict[str, Any]) -> None
Inject current trace context into carrier for propagation.

Call this before sending a message to propagate trace context.


instrument_database
def instrument_database(provider: Any) -> None
Instrument a Lexigram DatabaseService with OpenTelemetry tracing.

Wraps the provider’s execute and execute_query methods to automatically create spans and record metrics for each database operation.

Features:

  • Automatic span creation for each query
  • Query duration tracking
  • Error recording and status tracking
  • Metrics for query count and duration
Parameters
ParameterTypeDescription
`provider`AnyThe database provider to instrument. Must have 'execute' and optionally 'execute_query' methods.

Example

from lexigram.monitor import instrument_database
from lexigram.sql import DatabaseService
provider = DatabaseService(config)
instrument_database(provider)
# Queries are now automatically traced

metered
def metered(
    metric_name: str | None = None,
    *,
    service: ObservabilityService | None = None
) -> Callable[[F], F]
Decorator that records function execution time as a histogram metric.
Parameters
ParameterTypeDescription
`metric_name`str | NoneExplicit metric name. Defaults to ``{module}.{qualname}.duration``.
`facade`Specific facade instance. Uses module default if ``None``.
Returns
TypeDescription
Callable[[F], F]Decorated function.

monitor
def monitor(
    name: str | None = None,
    *,
    log_args: bool = False
) -> Callable[[F], F]
Wrap a function with combined structured logging and wall-clock timing.

Convenience alternative to stacking @traced and @metered. Emits operation_start, operation_end, and operation_error log events with the elapsed duration in milliseconds. Works for both async and sync callables.

Parameters
ParameterTypeDescription
`name`str | NoneMetric/span name. Defaults to ``"{module}.{qualname}"``.
`log_args`boolWhen ``True``, the number of positional arguments is included in the log context. Defaults to ``False`` to avoid inadvertently logging sensitive data.
Returns
TypeDescription
Callable[[F], F]Decorator that wraps the target function with monitoring machinery.

Example

@monitor("user_service.create")
async def create_user(email: str) -> User:
return await repo.save(User(email=email))
@monitor()
def compute_score(data: list[float]) -> float:
return sum(data) / len(data)
@monitor("user_service.create")
async def create_user(email: str) -> User:
return await repo.save(User(email=email))
@monitor()
def compute_score(data: list[float]) -> float:
return sum(data) / len(data)

monitor_async_operation
async def monitor_async_operation(config: PerformanceMonitorConfig | None = None) -> AsyncGenerator[PerformanceMonitor, None]
Async context manager that monitors a region of async code.

Creates a PerformanceMonitor, starts it before entering the body, and stops it on exit.

Parameters
ParameterTypeDescription
`config`PerformanceMonitorConfig | NoneOptional PerformanceMonitorConfig. Default config is used when not provided.
Yields
TypeDescription
AsyncGenerator[PerformanceMonitor, None]Running PerformanceMonitor instance.

Example

async with monitor_async_operation() as monitor:
await do_some_work()
print(monitor.metrics.duration)

profile_async_function
async def profile_async_function(
    func: Any,
    *args: Any,
    **kwargs: Any
) -> tuple[Any, FunctionProfileResult]
Profile a single async (or sync) function call.
Parameters
ParameterTypeDescription
`func`AnyCallable to profile. *args: Positional arguments forwarded to *func*. **kwargs: Keyword arguments forwarded to *func*.
Returns
TypeDescription
tuple[Any, FunctionProfileResult]A ``(result, profile)`` tuple where *result* is the return value of *func* and *profile* is a FunctionProfileResult.

Example

result, profile = await profile_async_function(my_async_fn, arg1)
print(profile.execution_time)

trace_consume
async def trace_consume(
    channel: str,
    message_type: str = 'unknown',
    carrier: dict[str, Any] | None = None,
    **attributes: Any
) -> AsyncGenerator[Span, None]
Context manager for tracing message consumption.

Usage

async with trace_consume(“notifications”, carrier=headers) as span: await process_message(…)


trace_publish
async def trace_publish(
    channel: str,
    message_type: str = 'email',
    recipient: str | None = None,
    **attributes: Any
) -> AsyncGenerator[Span, None]
Context manager for tracing message publishing.

Usage

async with trace_publish(“notifications”, “email”, “user@example.com”) as span: await send_email(…)


traced
def traced(
    span_name: str | None = None,
    *,
    service: ObservabilityService | None = None
) -> Callable[[F], F]
Decorator that wraps a function in a tracing span.

Automatically captures function name, arguments, and any raised exceptions as span events.

Parameters
ParameterTypeDescription
`span_name`str | NoneExplicit span name. Defaults to the qualified function name.
`facade`Specific facade instance. Uses module default if ``None``.
Returns
TypeDescription
Callable[[F], F]Decorated function.

Raised when a required monitoring backend is not available.

Raised when metric parameters are invalid.

Raised when a requested metric is not found.

Base exception for monitoring errors.

Base exception for performance monitoring errors.
__init__
def __init__(
    message: str = 'Performance monitoring error',
    **kwargs
) -> None

Base exception for span-related errors.