Skip to content
GitHubDiscord

API Reference

Protocol for dispatching operational alerts.

Implementations route alerts to notification channels (logging, PagerDuty, Slack, etc.). lexigram-monitor ships a built-in LoggingAlertDispatcher that writes alerts to the structured logger.

Example

class SlackAlertDispatcher:
async def send_alert(
self,
title: str,
message: str,
severity: str,
context: dict[str, Any] | None = None,
) -> None:
await self._slack.post(
channel="#ops",
text=f"[{severity}] {title}: {message}",
)
async def send_alert(
    title: str,
    message: str,
    severity: str,
    context: dict[str, Any] | None = None
) -> None

Dispatch a free-form operational alert.

Parameters
ParameterTypeDescription
`title`strShort, human-readable alert title.
`message`strDetailed alert message.
`severity`strSeverity level string, e.g. ``"low"``, ``"high"``, ``"critical"``.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata.
async def send_metric_alert(
    metric_name: str,
    current_value: float,
    threshold: float,
    context: dict[str, Any] | None = None
) -> None

Dispatch an alert triggered by a metric threshold breach.

Parameters
ParameterTypeDescription
`metric_name`strName of the metric that breached its threshold.
`current_value`floatObserved metric value at the time of the alert.
`threshold`floatThe configured threshold that was exceeded.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata.

Structural contract for a single health-check component.

Any object implementing check() satisfies this protocol, regardless of inheritance from any concrete class.

async def check() -> HealthCheckResult

Perform the health check and return a result.


Protocol for metric implementations.

Metrics track numeric measurements over time.

property name() -> str

MetricProtocol name.

property description() -> str

MetricProtocol description.

def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Record a metric value.

Parameters
ParameterTypeDescription
`value`floatNumeric value to record.
`labels`dict[str, str] | NoneOptional labels/tags.

Protocol for metrics backend implementations (metrics only).

Backends export metrics to external systems.

async def initialize() -> None

Initialize the metrics backend.

async def shutdown() -> None

Shutdown the metrics backend.

def record_metric(
    name: str,
    value: Any,
    metric_type: str,
    labels: dict[str, str] | None = None
) -> None

Record a metric value.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`AnyMetricProtocol value.
`metric_type`strType of metric (counter, gauge, histogram).
`labels`dict[str, str] | NoneOptional labels.

def export(spans: list[Span]) -> None

Payload fired when a monitoring alert is triggered.

Attributes: alert_name: Name of the alert rule that fired. severity: Severity level (e.g. "warning", "critical").


Supported monitoring backend types.

A single buffered metric recording.

Attributes: name: MetricProtocol name. value: Numeric value to record. labels: Optional label key-value pairs. timestamp: Monotonic timestamp when the entry was created.


Buffers metric recordings and flushes them in configurable batches.

Reduces metric backend overhead for high-throughput code paths by accumulating entries and flushing at intervals or when the buffer is full.

Parameters
ParameterTypeDescription
`backend`MetricProtocol backend instance with metric attributes exposing ``record()``.
`flush_interval`Seconds between automatic periodic flushes. Default: 5.0.
`max_buffer_size`Maximum entries before a forced flush. Default: 1000.

Example

recorder = BufferedMetricRecorder(backend=monitor_backend, flush_interval=2.0)
await recorder.start()
recorder.record("request_count", 1.0, {"method": "GET"})
# ... many more records ...
await recorder.stop() # flushes remaining entries
def __init__(
    backend: Any,
    flush_interval: float = 5.0,
    max_buffer_size: int = 1000
) -> None
def record(
    name: str,
    value: float,
    labels: dict[str, str] | None = None
) -> None

Buffer a metric entry for deferred recording.

If the buffer reaches max_buffer_size, schedules an immediate flush.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name (must match an attribute on the backend).
`value`floatNumeric value to record.
`labels`dict[str, str] | NoneOptional label key-value pairs.
async def flush() -> int

Flush all buffered entries to the backend.

Returns
TypeDescription
intNumber of entries successfully flushed.
async def start() -> None

Start the periodic background flush task.

async def stop() -> None

Stop the background flush task and flush remaining entries.


Handler for console metrics exporter.

Creates a ConsoleMetricExporter for outputting metrics to the console.

def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "console", False otherwise.
def create_exporter(exp_config: Any) -> Any

Create a console metrics exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration for the exporter (unused for console).
Returns
TypeDescription
AnyA ConsoleMetricExporter instance.

def export(spans: list[Span]) -> None

Handler for console tracing exporter.

Creates a ConsoleSpanExporter for outputting traces to the console.

def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "console", False otherwise.
def create_exporter(exp_config: Any) -> Any

Create a console tracing exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration for the exporter (unused for console).
Returns
TypeDescription
AnyA ConsoleSpanExporter instance.

Monotonically increasing counter.
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)
property name() -> str
property description() -> str
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
def increment(
    amount: int = 1,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

def get_count() -> int
def reset() -> None

Reset the counter to zero and clear the observation history.


Health check implemented as a function.
def __init__(
    name: str,
    func: Callable,
    critical: bool = True
)
async def check() -> HealthCheckResult

Result of profiling a function.

Gauge that can go up and down.
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)
property name() -> str
property description() -> str
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
def set(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

def increment(
    amount: float = 1,
    labels: dict[str, str] | None = None
) -> None

Alias for record — increments gauge by amount via canonical implementation.

def decrement(
    amount: float = 1,
    labels: dict[str, str] | None = None
) -> None

Alias for record — decrements gauge by amount via canonical implementation.

def get_value() -> float
def reset() -> None

Reset the gauge to zero and clear the observation history.


Base class for health checks.
def __init__(
    name: str,
    critical: bool = True
)
async def check() -> HealthCheckResult

Categorises health checks by their role in the Kubernetes health model.

Attributes: LIVENESS: Checks that the application is alive (not deadlocked). Used by /health/live endpoints. Failure triggers a restart. READINESS: Checks that the application is ready to accept traffic. Used by /health/ready endpoints. Failure removes the instance from the load-balancer rotation. STARTUP: Checks that the application has finished its startup sequence. Used by /health/startup endpoints. Succeeded once; after that the probe switches to liveness/readiness.


Configuration for health checks.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether health checks are enabled. path: HTTP path for health endpoint. include_details: Include detailed component health in response. timeout: Timeout for health check operations (seconds). checks: List of health check names to include.


ASGI middleware that exposes a ``GET /health`` liveness/readiness endpoint.

Intercepts requests to path and returns a JSON health-status payload. All other requests are passed through with a 404 response (or forwarded to the next ASGI app when composed in a middleware stack).

The HTTP status code reflects the overall application health:

  • 200 — all checks healthy
  • 207 — at least one check degraded
  • 503 — at least one check unhealthy

To add custom checks, subclass HealthCheckProvider and override check_health.

Parameters
ParameterTypeDescription
`path`URL path for the health endpoint. Defaults to ``"/health"``.

Example

from lexigram.monitor.middleware import HealthCheckProvider
app = HealthCheckProvider(path="/ready")
# Mount *app* in your ASGI server / router.
def __init__(path: str = '/health') -> None
async def check_health() -> dict

Return the current application health as a serialisable mapping.

Override this method in a subclass to perform real dependency checks (database ping, cache ping, etc.).

Returns
TypeDescription
dictA ``dict`` with at minimum a ``"status"`` key (one of ``"healthy"``, ``"degraded"``, or ``"unhealthy"``) and a ``"timestamp"`` float. Additional ``"checks"`` entries can be added by subclasses.

Registry for managing health checks.

Extends Registry for unified introspection. Sidecar lists track which checks participate in liveness vs readiness probes.

def __init__() -> None
def add(
    name: str,
    check: Callable[[], Any],
    *,
    timeout: float | None = None,
    critical: bool = True,
    category: Any = None
) -> None

Add a health check to satisfy HealthCheckRegistryProtocol.

async def run_all() -> tuple[Any, dict[str, Any]]
async def run_liveness() -> tuple[Any, dict[str, Any]]
async def run_readiness() -> tuple[Any, dict[str, Any]]
async def run_startup() -> tuple[Any, dict[str, Any]]
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Satisfy HealthCheckProtocol.


Result of a health check.
def to_dict() -> dict[str, Any]

Convert to dictionary format.

def is_healthy() -> bool

Check if status is healthy.

def is_degraded() -> bool

Check if status is degraded.


Payload fired after a health check probe completes.

Attributes: check_name: Name of the health check that ran. healthy: True if the check passed.


Aggregates and runs named health checks with optional per-check timeouts.

Checks can be tagged with a HealthCheckCategory (LIVENESS, READINESS, or STARTUP) and run selectively via run_liveness, run_readiness, or run_startup. Methods that omit a category default to READINESS.

def __init__() -> None
def add(
    name: str,
    check: HealthCheckProtocol | Callable[[], Any],
    *,
    timeout: float | None = None,
    category: HealthCheckCategory = HealthCheckCategory.READINESS
) -> None

Add a named health check with an optional timeout and category.

Parameters
ParameterTypeDescription
`name`strUnique identifier for this check.
`check`HealthCheckProtocol | Callable[[], Any]A HealthCheckProtocol instance or a callable returning a health indicator (bool, dict, or HealthCheckResult).
`timeout`float | NonePer-check timeout in seconds. If the check exceeds this duration it is reported as UNHEALTHY.
`category`HealthCheckCategoryWhether this check is a liveness, readiness, or startup check. Defaults to ``READINESS``.
def remove(name: str) -> None

Remove a named health check.

Parameters
ParameterTypeDescription
`name`strThe check name to remove.
Raises
ExceptionDescription
KeyErrorIf no check with this name is registered.
def has(name: str) -> bool

Check whether a health check with the given name is registered.

Parameters
ParameterTypeDescription
`name`strThe check name.
Returns
TypeDescription
boolTrue if the check exists.
property check_names() -> list[str]

Sorted list of all registered health check names.

Returns
TypeDescription
list[str]A sorted list of check name strings.
def register(check: Callable[[], Any]) -> None

Register a callable decorated with health_checker.

The check name is read from the _health_check_name attribute stamped by the health_checker decorator.

Parameters
ParameterTypeDescription
`check`Callable[[], Any]A decorated health check callable.
Raises
ExceptionDescription
ValueErrorIf *check* was not decorated with health_checker.
async def run_all() -> dict[str, HealthCheckResult]

Run all health checks and return unified results.

Per-check timeouts are enforced when configured. Timed-out checks are reported as UNHEALTHY.

Returns
TypeDescription
dict[str, HealthCheckResult]A dict mapping check names to their ``HealthCheckResult``.
def aggregate_status(results: dict[str, HealthCheckResult]) -> HealthStatus

Determine the overall health status from individual check results.

Returns HEALTHY only if every check is HEALTHY. Returns DEGRADED if any check is DEGRADED but none are UNHEALTHY. Returns UNHEALTHY otherwise.

Parameters
ParameterTypeDescription
`results`dict[str, HealthCheckResult]The dict returned by run_all.
Returns
TypeDescription
HealthStatusThe aggregate ``HealthStatus``.
async def run_all_with_summary() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run all health checks and return the aggregate status together with details.

Convenience wrapper that calls run_all and aggregate_status in one step.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.
async def run_liveness() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run only LIVENESS checks and return aggregate status and details.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.
async def run_readiness() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run only READINESS checks and return aggregate status and details.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.
async def run_startup() -> tuple[HealthStatus, dict[str, HealthCheckResult]]

Run only STARTUP checks and return aggregate status and details.

Returns
TypeDescription
tuple[HealthStatus, dict[str, HealthCheckResult]]A tuple of ``(aggregate_status, per_check_results)``.

Singleton-style registry for named CachedHealthChecker instances.

Manages the lifecycle of zero or more named checkers. The registry is registered as a container singleton by MonitorProvider; application code should resolve it via constructor injection rather than instantiating it directly.

Lifecycle

# At application startup — wired by MonitorProvider automatically.
registry = HealthCheckerRegistry()
checker = registry.get_or_create("api", db_provider=db)
# At application shutdown — MonitorProvider calls this.
await registry.cleanup()
def __init__() -> None
def get_or_create(
    key: str = 'default',
    db_provider: Any = None,
    cache_ttl: float = 5.0,
    check_timeout: float = 2.0
) -> CachedHealthChecker

Return the named checker, creating it if it does not yet exist.

Subsequent calls with the same key always return the same instance regardless of db_provider, cache_ttl, or check_timeout — those arguments are ignored after the first call for a given key.

Parameters
ParameterTypeDescription
`key`strLogical name for this checker. Use distinct keys to manage independent groups of health checks (e.g. ``"api"`` vs ``"worker"``).
`db_provider`AnyOptional database provider injected into the checker for database connectivity verification.
`cache_ttl`floatSeconds to cache the last health-check result before re-running the underlying checks. Defaults to ``5.0``.
`check_timeout`floatPer-check execution timeout in seconds. Defaults to ``2.0``.
Returns
TypeDescription
CachedHealthCheckerThe CachedHealthChecker registered under *key*.
def get_checker(key: str = 'default') -> CachedHealthChecker | None

Return the checker registered under key, or None if absent.

Parameters
ParameterTypeDescription
`key`strThe logical name used when the checker was created.
Returns
TypeDescription
CachedHealthChecker | NoneThe CachedHealthChecker or ``None``.
def list_checkers() -> dict[str, CachedHealthChecker]

Return a snapshot of all registered checkers keyed by name.

Returns
TypeDescription
dict[str, CachedHealthChecker]A shallow copy of the internal registry mapping.
async def cleanup() -> None

Stop background refresh tasks for every checker and clear the registry.

Called automatically by MonitorProvider during application shutdown. It is safe to call multiple times.


Unified health status.

Histogram for measuring distributions.
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
)
property name() -> str
property description() -> str
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
def observe(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

def get_observations() -> list[float]
def get_bucket_counts() -> dict[float, int]
def reset() -> None

Clear all observations and values for metric rotation.


In-memory trace provider using Tracer.
def __init__(
    service_name: str = 'lexigram-service',
    max_spans: int = DEFAULT_MAX_SPANS,
    exporter: SpanExporter | None = None
) -> None
def create_span(
    name: str,
    parent: Span | None = None
) -> Span
def get_current_span() -> Span | None
def set_current_span(span: Span | None) -> None
def get_all_spans() -> list[Span]

Dispatches alerts to the structured logger.

Implements AlertDispatcherProtocol.

Low and medium-severity alerts are logged at WARNING level; high and critical alerts are logged at ERROR level.

This implementation is always available with no external dependencies and is registered by MonitorProvider as the default AlertDispatcherProtocol binding when no other dispatcher is configured.

async def send_alert(
    title: str,
    message: str,
    severity: str,
    context: dict[str, Any] | None = None
) -> None

Dispatch a free-form operational alert to the structured logger.

Parameters
ParameterTypeDescription
`title`strShort, human-readable alert title.
`message`strDetailed alert message.
`severity`strSeverity level string, e.g. ``"low"``, ``"high"``, ``"critical"``.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata included as structured key-value pairs on the log record.
async def send_metric_alert(
    metric_name: str,
    current_value: float,
    threshold: float,
    context: dict[str, Any] | None = None
) -> None

Dispatch a metric-threshold alert to the structured logger.

The alert is always logged at WARNING level unless the breach exceeds the threshold by more than 100 %, in which case ERROR is used.

Parameters
ParameterTypeDescription
`metric_name`strName of the metric that breached its threshold.
`current_value`floatObserved metric value at the time of the alert.
`threshold`floatThe configured threshold that was exceeded.
`context`dict[str, Any] | NoneOptional free-form mapping of additional metadata.

Configuration for structured logging.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether structured logging is enabled. level: Default log level. format: Log format (json, text). include_trace_context: Include trace context in logs. redact_fields: Fields to redact from logs.

def validate_level(
    cls,
    v: str
) -> str

Validate log level.


Proxy for a specific metric that provides a stateful API.

Bridges the gap between the MetricsCollectorProtocol (which is name-based) and consumers who expect a persistent metric object.

def __init__(
    name: str,
    collector: MetricsCollectorProtocol,
    metrics_type: str
) -> None
property name() -> str

MetricProtocol name.

def increment(amount: float = 1.0) -> None

Increment the counter.

def record(value: float) -> None

Record a histogram value.

property value() -> float

Current counter value (tracked locally).


Payload fired when a metric data point is recorded.

Attributes: metric_name: Name of the metric being recorded. value: Numeric value of the data point.


Represents a metric measurement.

Central metrics collection and reporting.
def __init__() -> Any
def register_metric(metric: MetricProtocol) -> None

Register an existing metric instrument.

def create_counter(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> Counter
def create_gauge(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> Gauge
def create_summary(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> Summary
def create_histogram(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
) -> Histogram
def get_metric(name: str) -> MetricProtocol | None
def get_all_metrics() -> dict[str, MetricProtocol]
def clear() -> None
def increment(
    name: str,
    value: float = 1.0,
    tags: dict[str, str] | None = None
) -> None

Increment a counter metric.

def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Set a gauge metric.

def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Record a histogram value.


Configuration for metrics collection.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether metrics collection is enabled. prefix: Prefix for all metric names. default_labels: Default labels to add to all metrics. histogram_buckets: Default bucket boundaries for histograms. collection_interval: Interval for periodic metrics collection (seconds).

def make_metric_name(name: str) -> str

Create a prefixed metric name.

Parameters
ParameterTypeDescription
`name`strThe metric name.
Returns
TypeDescription
strPrefixed metric name.

Central registry for metrics exporters.

Manages a collection of handlers for creating metrics exporters. Handlers are checked in order until one is found that can handle the exporter type.

Example

registry = MetricsExporterRegistry() registry.register(CustomMetricsExporterHandler()) exporter = registry.create_exporter(config)

def __init__() -> None

Initialize the registry without any handlers.

def with_defaults(cls) -> MetricsExporterRegistry

Create a registry pre-populated with the built-in default handlers.

Returns
TypeDescription
MetricsExporterRegistryA new registry instance with all default metrics exporter handlers registered.
def register(handler: MetricsExporterHandler) -> None

Register a new metrics exporter handler.

Parameters
ParameterTypeDescription
`handler`MetricsExporterHandlerThe handler to register. Added at the beginning of the handler list.
def create_exporter(exp_config: Any) -> Any

Create an exporter for the given config.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration object with a 'type' attribute.
Returns
TypeDescription
AnyAn exporter instance, or None if no handler can handle the type.

Hierarchical root configuration for Lexigram Monitor.

Attributes: enabled: Whether monitoring is enabled name: Configuration name (default: “monitor”) backend_type: Monitoring backend type metrics: Metrics configuration tracing: Tracing configuration health: Health check configuration logging: Logging configuration opentelemetry: OpenTelemetry configuration prometheus: Prometheus configuration environment: Environment name debug: Debug mode

def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Validate monitoring configuration for the given environment.

Parameters
ParameterTypeDescription
`env`Environment | NoneTarget environment; resolved from ``LEX_ENV`` when ``None``.
Returns
TypeDescription
list[ConfigIssue]List of ConfigIssue instances.
def get_backend_config() -> OpenTelemetryConfig | PrometheusConfig | None

Get the configuration for the selected backend.

Returns
TypeDescription
OpenTelemetryConfig | PrometheusConfig | NoneBackend-specific configuration or None for memory backend.
def make_exporter() -> Any | None

Construct an optional metrics exporter appropriate for the backend.

Returns
TypeDescription
Any | NoneMetricsExporter instance or None if no exporter is available.

Metrics collection, distributed tracing, health checks, and profiling.

Call configure to configure the monitoring subsystem.

Usage (no-op / development)

from lexigram.monitor.backends.noop import NoopMetricsBackend
@module(
imports=[MonitorModule.configure(backend=NoopMetricsBackend())]
)
class AppModule(Module):
pass

Usage (Prometheus)

from lexigram.monitor.backends.exporters.prometheus import PrometheusMetricsExporter
from lexigram.monitor.backends.prometheus import PrometheusBackend
@module(
imports=[MonitorModule.configure(backend=PrometheusBackend())]
)
class AppModule(Module):
pass
def configure(
    cls,
    backend: Any = None,
    config: Any | None = None
) -> DynamicModule

Create a MonitorModule with explicit configuration.

Parameters
ParameterTypeDescription
`backend`Any``MetricsBackendProtocol`` implementation. Defaults to ``NoOpMetricsBackend`` when omitted.
`config`Any | None``MonitorConfig`` for advanced tracing and profiling settings.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(cls) -> DynamicModule

Return a no-op MonitorModule for unit testing.

Registers a NoOpMetricsBackend that discards all metrics. No external telemetry systems are connected.

Returns
TypeDescription
DynamicModuleA DynamicModule with noop metrics and tracing.

Monitoring and observability provider for Lexigram Framework
def __init__(
    backend: MonitoringBackend,
    exporter: Any | None = None,
    config: Any | None = None
)
def from_config(
    cls,
    config: MonitorConfig,
    **context: Any
) -> MonitorProvider

Create a MonitorProvider from config.

Delegates to the create_provider_from_config factory.

async def register(container: ContainerRegistrarProtocol) -> None

Register monitoring services with the container.

Binds all monitoring singletons into the DI container. DB-backed exporter wiring is deferred to boot() where the full container graph is available.

async def boot(container: BootContainerProtocol) -> None

Start the monitoring provider and wire the observability facade.

After all providers are registered the container graph is complete, so this is the correct place to resolve optional cross-provider dependencies such as the database exporter.

async def shutdown() -> None

Shutdown the monitoring provider

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check monitoring provider health

def record_request(
    method: str,
    path: str,
    duration: float,
    status_code: int
) -> None

Record an HTTP request

def record_connection_change(delta: int) -> None

Record connection count change

def create_counter(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)

Create a counter metric

def create_gauge(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)

Create a gauge metric

def create_histogram(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
)

Create a histogram metric


No-op registry for categorised health checks.
def add(
    name: str,
    check: Callable[[], Any],
    *,
    timeout: float | None = None,
    critical: bool = True,
    category: Any = None
) -> None

Ignore health check registration.

async def run_all() -> tuple[Any, dict[str, Any]]

Return an empty aggregate result.

async def run_liveness() -> tuple[Any, dict[str, Any]]

Return an empty liveness result.

async def run_readiness() -> tuple[Any, dict[str, Any]]

Return an empty readiness result.

async def run_startup() -> tuple[Any, dict[str, Any]]

Return an empty startup result.

async def health_check() -> HealthCheckResult

Report the no-op registry as healthy.


Metrics collector that drops all observations.
def __init__() -> None
property name() -> str

Return a placeholder metric name.

property description() -> str

Return a placeholder metric description.

def register_metric(metric: MetricProtocol) -> None

Ignore pre-created metrics.

def increment(
    name: str,
    value: float = 1.0,
    tags: dict[str, str] | None = None
) -> None

Ignore counter increments.

def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Ignore gauge values.

def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Ignore histogram samples.

def create_counter(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> NoOpMetricsCollector

Return the collector itself.

def create_gauge(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
) -> NoOpMetricsCollector

Return the collector itself.

def create_histogram(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None,
    buckets: list[float] | None = None
) -> NoOpMetricsCollector

Return the collector itself.

def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Ignore recorded metric values.


No-op span that silently drops all tracing operations.
def __init__(name: str = '') -> None
def set_attribute(
    key: str,
    value: Any
) -> None

Store span attributes for introspection in tests.

def add_event(
    name: str,
    attributes: dict[str, Any] | None = None
) -> None

Ignore span events.

def record_exception(exception: Exception) -> None

Ignore recorded exceptions.

def set_status(status: str) -> None

Ignore span status updates.

def end() -> None

Mark the span as ended without side effects.


No-op tracer that returns NoOpSpan instances.
def start_span(
    name: str,
    attributes: dict[str, Any] | None = None,
    context: Any | None = None
) -> NoOpSpan

Start a no-op span.

def get_current_span() -> SpanProtocol | None

Return no active span.

def inject_context(
    carrier: dict[str, str],
    context: Any | None = None
) -> None

Leave the carrier unchanged.

def extract_context(carrier: dict[str, str]) -> Any | None

Return no extracted context.


Handler for OTLP metrics exporter.

Creates an OTLPMetricExporter for sending metrics to an OTLP-compatible backend.

def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "otlp", False otherwise.
def create_exporter(exp_config: Any) -> Any

Create an OTLP metrics exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration with endpoint and headers.
Returns
TypeDescription
AnyAn OTLPMetricExporter instance.

Handler for OTLP tracing exporter.

Creates an OTLPSpanExporter for sending traces to an OTLP-compatible backend.

def can_handle(exporter_type: str) -> bool

Check if this handler can handle the exporter type.

Parameters
ParameterTypeDescription
`exporter_type`strThe type of exporter to check.
Returns
TypeDescription
boolTrue if exporter_type is "otlp", False otherwise.
def create_exporter(exp_config: Any) -> Any

Create an OTLP tracing exporter instance.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration with endpoint and headers.
Returns
TypeDescription
AnyAn OTLPSpanExporter instance.

Middleware for OpenTelemetry tracing and metrics in Lexigram-Web.

Provides automatic tracing and metrics collection for HTTP requests. Gracefully degrades when OpenTelemetry is not available or has import issues.

Features:

  • Automatic span creation for each HTTP request
  • Extracts parent trace context from incoming headers
  • Records request count and duration metrics
  • Captures HTTP status codes and error conditions

Example

from lexigram.monitor import OTelMiddleware

app = OTelMiddleware(my_asgi_app)

def __init__(app: ASGIApp)

Initialize the OTel middleware.

Parameters
ParameterTypeDescription
`app`ASGIAppThe ASGI application to wrap.

Provides observability: metrics, tracing, and health checks.

Registers lightweight no-op stubs for every observability contract so that application code can always resolve MetricsCollectorProtocol, TracerProtocol, and HealthCheckRegistryProtocol via DI — even when lexigram-monitor is not installed.

When lexigram-monitor is installed, its MonitorProvider runs at a higher priority and overrides these registrations with the full implementations. This provider must not import from lexigram.monitor directly; doing so would violate the core ↔ extension hierarchy.

async def register(container: ContainerRegistrarProtocol) -> None

Register no-op observability stubs into the container.

lexigram-monitor’s own provider will override these at its higher priority if the package is present.

async def boot(container: ContainerResolverProtocol) -> None

No boot-time work required for the observability module.

async def shutdown() -> None

No resources to release for the observability module.


Unified service for observability (traces, metrics, health).

Automatically detects if the lexigram-monitor extension is available and delegates to it. Otherwise, uses NoOp implementations that still allow application code to instrument without errors.

Parameters
ParameterTypeDescription
`tracer`Optional tracer backend. If ``None``, uses NoOpTracer.
`meter`Optional meter backend. If ``None``, uses NoOpMetricsCollector.
def __init__(
    tracer: TracerProtocol | None = None,
    meter: MetricsCollectorProtocol | None = None
) -> None
def register_metric(metric: MetricProtocol) -> None

Register an existing metric instrument.

Parameters
ParameterTypeDescription
`metric`MetricProtocolPre-defined metric instance.
def trace(
    name: str,
    **attributes: Any
) -> Iterator[SpanProtocol]

Create a tracing span.

If a real tracer is configured, delegates to its start_span method. Otherwise yields a NoOpSpan.

Parameters
ParameterTypeDescription
`name`strSpan name. **attributes: Initial span attributes.
Yields
TypeDescription
Iterator[SpanProtocol]A span object with ``set_attribute()`` and ``add_event()`` methods.
def counter(name: str) -> MetricProxy

Get or create a counter metric.

Parameters
ParameterTypeDescription
`name`strCounter name.
Returns
TypeDescription
MetricProxyA counter with ``increment()`` method.
def histogram(name: str) -> MetricProxy

Get or create a histogram metric.

Parameters
ParameterTypeDescription
`name`strHistogram name.
Returns
TypeDescription
MetricProxyA histogram with ``record()`` method.
def timed(metric_name: str) -> Iterator[None]

Context manager that records execution duration as a histogram.

Parameters
ParameterTypeDescription
`metric_name`strThe histogram metric name.
Yields
TypeDescription
Iterator[None]None — timing is recorded on exit.

OpenTelemetry monitoring backend.
def __init__(
    service_name: str = 'lexigram-app',
    endpoint: str | None = None,
    config: Any | None = None,
    tracing_exporter_registry: TracingExporterRegistry | None = None,
    metrics_exporter_registry: MetricsExporterRegistry | None = None
)
async def initialize() -> None

Initialize OpenTelemetry.

async def shutdown() -> None
def record_metric(
    name: str,
    value: Any,
    metric_type: str,
    labels: dict[str, str] | None = None
) -> None
def create_span(
    name: str,
    parent_context: SpanContext | None = None
) -> Span

Configuration for OpenTelemetry backend.

Attributes: endpoint: OTLP endpoint URL. headers: Headers to send with OTLP requests. insecure: Use insecure connection (no TLS). timeout: Export timeout in seconds. compression: Compression type (none, gzip). batch_size: Batch size for exports. export_interval: Export interval in seconds.

def validate_compression(
    cls,
    v: str
) -> str

Validate compression type.


Aggregated performance metrics.
property duration() -> float | None
property samples_per_second() -> float

Async performance monitor with periodic CPU/memory sampling.

Collects snapshots at config.sampling_interval intervals while active. Use start_monitoring / stop_monitoring or the async monitor_context context manager to bracket monitored regions.

def __init__(config: PerformanceMonitorConfig | None = None) -> None

Initialise monitor with optional configuration.

Parameters
ParameterTypeDescription
`config`PerformanceMonitorConfig | NonePerformance monitoring configuration (defaults apply).
property state() -> PerformanceMonitorState

Return the current monitoring state.

property metrics() -> PerformanceMetrics

Return the accumulated performance metrics.

async def start_monitoring() -> None

Start periodic metrics collection.

Raises
ExceptionDescription
PerformanceMonitorErrorIf monitoring is already active.
async def stop_monitoring() -> None

Stop metrics collection and finalise metrics.

async def monitor_context() -> AsyncGenerator[PerformanceMonitor, None]

Async context manager that starts and stops monitoring automatically.

Yields
TypeDescription
AsyncGenerator[PerformanceMonitor, None]This monitor instance.
async def profile_function(
    func: Any,
    *args: Any,
    **kwargs: Any
) -> FunctionProfileResult

Profile a synchronous or asynchronous function.

Parameters
ParameterTypeDescription
`func`AnyCallable to profile (sync or async). *args: Positional arguments for *func*. **kwargs: Keyword arguments for *func*.
Returns
TypeDescription
FunctionProfileResultFunctionProfileResult with timing and memory data.

Configuration for performance monitoring.

State of the performance monitor.

A single performance metrics snapshot.

Prometheus monitoring backend.
def __init__(port: int = 8000)

Initialize Prometheus backend.

Parameters
ParameterTypeDescription
`port`intPort for metrics HTTP server
Raises
ExceptionDescription
BackendNotAvailableErrorIf prometheus-client is not installed
async def initialize() -> None

Initialize Prometheus metrics server.

If the configured port is already in use, log a warning and continue without failing the entire application startup.

async def shutdown() -> None

Shutdown Prometheus server.

def record_metric(
    name: str,
    value: Any,
    metric_type: str,
    labels: dict[str, str] | None = None
) -> None

Record a metric using Prometheus.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name
`value`AnyMetricProtocol value
`metric_type`strType of metric (counter, gauge, histogram)
`labels`dict[str, str] | NoneOptional metric labels
def create_span(
    name: str,
    parent_context: SpanContext | None = None
) -> Span

Create a span (Prometheus doesn’t do tracing, so this is a no-op).

Parameters
ParameterTypeDescription
`name`strSpan name
`parent_context`SpanContext | NoneOptional parent span context
Returns
TypeDescription
SpanDummy span instance

Configuration for Prometheus backend.

Attributes: port: Port for metrics HTTP server. path: Path for metrics endpoint. enable_default_metrics: Enable default process metrics. pushgateway_url: Optional Pushgateway URL for push-based metrics. push_interval: Interval for pushing metrics to Pushgateway (seconds). store_in_db: Whether to persist metric observations to the database. metrics_table: Name of the metrics table to write samples to.


Async-compatible adapter for prometheus_client.

Wraps prometheus_client.Counter, Gauge, and Histogram objects with a thread-safe, async-friendly API. Blocking calls are offloaded to a thread pool via asyncio.to_thread so the event loop is never stalled.

Label schemas for each metric name are locked-in on first use; subsequent calls must supply the same label keys. If a call omits a previously-seen label key the value defaults to an empty string so the series is still valid.

A Prometheus-client ASGI application (/metrics endpoint) is available via metrics_app. Mount it in your web application

# Inside a WebProvider or application startup
metrics_route = await container.resolve("prometheus_metrics_app")
app.mount("/metrics", metrics_route)
Parameters
ParameterTypeDescription
`registry`Optional custom ``CollectorRegistry``. Defaults to a fresh isolated registry so multiple exporters in the same process do not collide.
def __init__(registry: Any = None) -> None
property metrics_app() -> Any

ASGI application that exposes the Prometheus /metrics endpoint.

Returns
TypeDescription
AnyAn ASGI-compatible callable (from ``prometheus_client.make_asgi_app``).
Raises
ExceptionDescription
RuntimeErrorIf ``prometheus-client`` is not installed.
async def counter(
    name: str,
    value: int,
    tags: dict[str, str] | None = None
) -> None

Increment a counter by value.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`intAmount to increment.
`tags`dict[str, str] | NoneLabel key/value pairs.
async def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Set a gauge to value.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`floatNew gauge value.
`tags`dict[str, str] | NoneLabel key/value pairs.
async def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Observe value on a histogram.

Parameters
ParameterTypeDescription
`name`strMetricProtocol name.
`value`floatObservation value.
`tags`dict[str, str] | NoneLabel key/value pairs.
async def flush() -> None

No-op — prometheus_client pushes are pull-based.


ASGI middleware that automatically collects HTTP request metrics.

Wraps any ASGI application and records per-request counters, duration histograms, and an active-request gauge. When prometheus_client is installed the metrics are exposed in the standard Prometheus text format at path (default /metrics); otherwise the framework’s MetricsCollectorProtocol is used so the application degrades gracefully without the optional dependency.

Parameters
ParameterTypeDescription
`path`URL path at which to serve the Prometheus scrape endpoint. Defaults to ``"/metrics"``.
`metrics_collector`Optional pre-configured MetricsCollectorProtocol. A new default collector is created when ``None`` is passed.

Example

from lexigram.monitor.middleware import PrometheusMiddleware
app = PrometheusMiddleware(my_asgi_app, path="/metrics")
# Mount *app* in your ASGI server of choice.
def __init__(
    path: str = '/metrics',
    metrics_collector: MetricsCollectorProtocol | None = None
)
def set_next_app(app: Callable) -> None
async def next_app(
    scope: dict,
    receive: Callable,
    send: Callable
) -> None

Defines an acceptable performance target for a monitored metric.

An SLO specifies the maximum tolerable value for a percentile of a metric (e.g. p99 latency < 200 ms) and the error-budget burn-rate threshold above which an alert should be raised.

Attributes: name: Unique name for this SLO (e.g. "api.p99_latency"). metric: Name of the metric to evaluate (e.g. "http.request.duration"). percentile: Target percentile expressed as a fraction between 0.0 and 1.0 (e.g. 0.99 for p99). threshold_ms: Maximum acceptable measured value in milliseconds. window: Measurement window for rolling evaluation. burn_rate_threshold: Fire an alert when the measured/threshold ratio equals or exceeds this value. A value of 1.0 means “alert the moment the threshold is exceeded”. description: Optional human-readable description of the SLO.


Tracks SLO compliance and fires alerts on error-budget exhaustion.

Records metric samples in memory, then evaluates each registered SLO when evaluate is called. Violations are returned as a list of SLOViolation objects and also emitted as structured log warnings.

Example

from datetime import timedelta
from lexigram.monitor.slo import SLO, SLOMonitor
monitor = SLOMonitor()
monitor.register(SLO(
name="api.p99_latency",
metric="http.request.duration",
percentile=0.99,
threshold_ms=200.0,
window=timedelta(hours=1),
))
monitor.record_sample("http.request.duration", 150.0)
monitor.record_sample("http.request.duration", 350.0)
violations = await monitor.evaluate()
def __init__(max_samples_per_metric: int = 10000) -> None
def register(slo: SLO) -> None

Register an SLO for monitoring.

Parameters
ParameterTypeDescription
`slo`SLOThe SLO to track.
Raises
ExceptionDescription
ValueErrorIf an SLO with the same name is already registered.
def record_sample(
    metric: str,
    value_ms: float
) -> None

Record a measurement sample for a metric.

Parameters
ParameterTypeDescription
`metric`strThe metric name (matches registered SLO ``metric`` fields).
`value_ms`floatThe measurement value in milliseconds.
async def evaluate() -> list[SLOViolation]

Evaluate all registered SLOs against recorded samples.

Returns
TypeDescription
list[SLOViolation]A list of SLOViolation instances for every SLO that is in breach. Returns an empty list when all SLOs are within bounds or when no samples have been recorded yet.
property registered_slos() -> dict[str, SLO]

Return registered SLOs keyed by name.

def clear_samples(metric: str | None = None) -> None

Remove recorded samples.

Parameters
ParameterTypeDescription
`metric`str | NoneIf given, only clears samples for that metric. When ``None``, clears all recorded samples.

Represents a detected SLO threshold breach.

Attributes: slo: The SLO that was violated. measured_value: The percentile value that exceeded the threshold (ms). burn_rate: ratio of measured_value to slo.threshold_ms at the time of detection. detected_at: UTC timestamp when the violation was detected. message: Human-readable summary, auto-generated when empty.


Supported tracing sampler types.

Distributed tracing span.
def set_attribute(
    key: str,
    value: Any
) -> None
def add_event(
    name: str,
    attributes: dict[str, Any] | None = None
) -> None
def set_status(
    status: SpanStatus,
    message: str | None = None
) -> None
def record_exception(exception: Exception) -> None
def end() -> None
def get_duration() -> float | None
def finish() -> None

Alias for end() for compatibility.


Span context for propagation.
def to_traceparent() -> str
def from_traceparent(
    cls,
    traceparent: str
) -> SpanContext | None

Span kind following OpenTelemetry specification.

Span status following OpenTelemetry specification.

Summary metric for calculating quantiles.
def __init__(
    name: str,
    description: str = '',
    labels: dict[str, str] | None = None
)
property name() -> str
property description() -> str
def record(
    value: float,
    labels: dict[str, str] | None = None
) -> None
def observe(
    value: float,
    labels: dict[str, str] | None = None
) -> None

Alias for record — delegates to the canonical implementation.

def get_observations() -> list[float]
def reset() -> None

Clear all observations and values for metric rotation.


Distributed tracer logic.
def __init__(
    service_name: str,
    exporter: SpanExporter,
    max_spans: int | None = None
) -> None
def start_span(
    name: str,
    context: SpanContext | None = None,
    attributes: dict[str, Any] | None = None,
    kind: SpanKind = SpanKind.INTERNAL
) -> Span
def flush() -> None
def get_all_spans() -> list[Span]
def get_current_span() -> Span | None
def inject_context(
    carrier: dict[str, str],
    context: SpanContext | None = None
) -> None

Inject trace context into carrier dict.

def extract_context(carrier: Mapping[str, str]) -> SpanContext | None

Extract trace context from carrier.


Configuration for distributed tracing.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)

Attributes: enabled: Whether tracing is enabled. service_name: Name of the service for traces. sampler_type: Type of sampling strategy. sample_rate: Sampling rate (0.0 to 1.0) for probability sampler. max_traces_per_second: Max traces/sec for rate limiting sampler. propagation_formats: Trace context propagation formats. max_attributes: Maximum attributes per span. max_events: Maximum events per span. max_links: Maximum links per span.

def validate_sample_rate(
    cls,
    v: float
) -> float

Validate sample rate is between 0 and 1.


Central registry for tracing exporters.

Manages a collection of handlers for creating tracing exporters. Handlers are checked in order until one is found that can handle the exporter type.

Example

registry = TracingExporterRegistry() registry.register(CustomTracingExporterHandler()) exporter = registry.create_exporter(config)

def __init__() -> None

Initialize the registry without any handlers.

def with_defaults(cls) -> TracingExporterRegistry

Create a registry pre-populated with the built-in default handlers.

Returns
TypeDescription
TracingExporterRegistryA new registry instance with all default tracing exporter handlers registered.
def register(handler: TracingExporterHandler) -> None

Register a new tracing exporter handler.

Parameters
ParameterTypeDescription
`handler`TracingExporterHandlerThe handler to register. Added at the beginning of the handler list.
def create_exporter(exp_config: Any) -> Any

Create an exporter for the given config.

Parameters
ParameterTypeDescription
`exp_config`AnyConfiguration object with a 'type' attribute.
Returns
TypeDescription
AnyAn exporter instance, or None if no handler can handle the type.

async def get_performance_summary(metrics: PerformanceMetrics) -> dict[str, Any]

Return a dictionary summary of performance metrics.

Parameters
ParameterTypeDescription
`metrics`PerformanceMetricsAccumulated PerformanceMetrics to summarise.
Returns
TypeDescription
dict[str, Any]Dictionary containing duration, sample counts, CPU/memory averages, task counts, and a ``has_profile_data`` flag.

def health_checker(name: str) -> Callable[[Callable[Ellipsis, Any]], Callable[Ellipsis, Any]]

Decorator that marks a callable as a named health check.

The decorated function can later be registered on a HealthChecker via HealthChecker.register.

Parameters
ParameterTypeDescription
`name`strHuman-readable name for this check (e.g. ``"database"``).

Example

@health_checker("database")
async def check_db() -> bool:
return await db.ping()
checker = HealthChecker()
checker.register(check_db)

def inject_trace_context(carrier: dict[str, Any]) -> None

Inject current trace context into carrier for propagation.

Call this before sending a message to propagate trace context.


def instrument_database(provider: Any) -> None

Instrument a Lexigram DatabaseService with OpenTelemetry tracing.

Wraps the provider’s execute and execute_query methods to automatically create spans and record metrics for each database operation.

Features:

  • Automatic span creation for each query
  • Query duration tracking
  • Error recording and status tracking
  • Metrics for query count and duration
Parameters
ParameterTypeDescription
`provider`AnyThe database provider to instrument. Must have 'execute' and optionally 'execute_query' methods.

Example

from lexigram.monitor import instrument_database >>> from lexigram.sql import DatabaseService

provider = DatabaseService(config) instrument_database(provider)


def metered(
    metric_name: str | None = None,
    *,
    service: ObservabilityService | None = None
) -> Callable[[F], F]

Decorator that records function execution time as a histogram metric.

Parameters
ParameterTypeDescription
`metric_name`str | NoneExplicit metric name. Defaults to ``{module}.{qualname}.duration``.
`facade`Specific facade instance. Uses module default if ``None``.
Returns
TypeDescription
Callable[[F], F]Decorated function.

def monitor(
    name: str | None = None,
    *,
    log_args: bool = False
) -> Callable[[F], F]

Wrap a function with combined structured logging and wall-clock timing.

Convenience alternative to stacking @traced and @metered. Emits operation_start, operation_end, and operation_error log events with the elapsed duration in milliseconds. Works for both async and sync callables.

Parameters
ParameterTypeDescription
`name`str | NoneMetric/span name. Defaults to ``"{module}.{qualname}"``.
`log_args`boolWhen ``True``, the number of positional arguments is included in the log context. Defaults to ``False`` to avoid inadvertently logging sensitive data.
Returns
TypeDescription
Callable[[F], F]Decorator that wraps the target function with monitoring machinery.

Example

@monitor("user_service.create")
async def create_user(email: str) -> User:
return await repo.save(User(email=email))
@monitor()
def compute_score(data: list[float]) -> float:
return sum(data) / len(data)

async def monitor_async_operation(config: PerformanceMonitorConfig | None = None) -> AsyncGenerator[PerformanceMonitor, None]

Async context manager that monitors a region of async code.

Creates a PerformanceMonitor, starts it before entering the body, and stops it on exit.

Parameters
ParameterTypeDescription
`config`PerformanceMonitorConfig | NoneOptional PerformanceMonitorConfig. Default config is used when not provided.
Yields
TypeDescription
AsyncGenerator[PerformanceMonitor, None]Running PerformanceMonitor instance.

Example

async with monitor_async_operation() as monitor: … await do_some_work() … print(monitor.metrics.duration)


async def profile_async_function(
    func: Any,
    *args: Any,
    **kwargs: Any
) -> tuple[Any, FunctionProfileResult]

Profile a single async (or sync) function call.

Parameters
ParameterTypeDescription
`func`AnyCallable to profile. *args: Positional arguments forwarded to *func*. **kwargs: Keyword arguments forwarded to *func*.
Returns
TypeDescription
tuple[Any, FunctionProfileResult]A ``(result, profile)`` tuple where *result* is the return value of *func* and *profile* is a FunctionProfileResult.

Example

result, profile = await profile_async_function(my_async_fn, arg1) … print(profile.execution_time)


async def trace_consume(
    channel: str,
    message_type: str = 'unknown',
    carrier: dict[str, Any] | None = None,
    **attributes: Any
) -> AsyncGenerator[Span, None]

Context manager for tracing message consumption.

Usage

async with trace_consume(“notifications”, carrier=headers) as span: await process_message(…)


async def trace_publish(
    channel: str,
    message_type: str = 'email',
    recipient: str | None = None,
    **attributes: Any
) -> AsyncGenerator[Span, None]

Context manager for tracing message publishing.

Usage

async with trace_publish(“notifications”, “email”, “user@example.com”) as span: await send_email(…)


def traced(
    span_name: str | None = None,
    *,
    service: ObservabilityService | None = None
) -> Callable[[F], F]

Decorator that wraps a function in a tracing span.

Automatically captures function name, arguments, and any raised exceptions as span events.

Parameters
ParameterTypeDescription
`span_name`str | NoneExplicit span name. Defaults to the qualified function name.
`facade`Specific facade instance. Uses module default if ``None``.
Returns
TypeDescription
Callable[[F], F]Decorated function.

Raised when a required monitoring backend is not available.

Raised when metric parameters are invalid.

Raised when a requested metric is not found.

Base exception for monitoring errors.

Base exception for performance monitoring errors.
def __init__(
    message: str = 'Performance monitoring error',
    **kwargs
) -> None

Base exception for span-related errors.