Observability
Lexigram treats observability as a first-class concern. Core ships structured logging out of the box, and lexigram-monitor adds metrics, distributed tracing, and health checks behind protocol interfaces so your code stays backend-agnostic. Swap Prometheus for OpenTelemetry — or a no-op stub in tests — without touching application code.
For the full API, see the lexigram-monitor package docs.
1. The Four Pillars
Section titled “1. The Four Pillars”Observability in Lexigram is organised around four concerns: logs, metrics, traces, and health. A single MonitorModule wires them into the DI container, and exporters fan out to whichever backend you scrape or ship to.
flowchart LR
App[Application code] --> Log[get_logger]
App --> Obs[ObservabilityService]
Obs --> Metrics[MetricsCollectorProtocol]
Obs --> Tracer[TracerProtocol]
App --> Health[HealthCheckRegistryProtocol]
Log --> Stdout[stdout / JSON]
Metrics --> Prom[Prometheus /metrics]
Tracer --> OTLP[OTLP exporter]
Health --> Web[/health, /ready/]
Logging is always-on in core; the other three pillars activate when MonitorModule.configure(...) is imported.
2. Structured Logging
Section titled “2. Structured Logging”lexigram.logging is structlog-based — never use print(). Get a logger named after your module and pass context as keyword arguments:
from lexigram.logging import get_logger
logger = get_logger(__name__)
class OrderService: async def place(self, order_id: str, amount: float) -> None: logger.info("order.placed", order_id=order_id, amount=amount) try: await self._charge(amount) except PaymentError: logger.exception("order.payment_failed", order_id=order_id) raiseKeyword arguments become first-class fields in the JSON output. logger.bind(...) returns a new logger with permanent context — useful for request-scoped fields like request_id or tenant_id. Format, level, and redact_fields are governed by the monitor.logging section below.
3. Metrics
Section titled “3. Metrics”Inject MetricsCollectorProtocol for the full counter / gauge / histogram API, or the narrower MetricsRecorderProtocol if your code only records.
from lexigram.contracts.observability.metrics import MetricsCollectorProtocol
class CheckoutService: def __init__(self, metrics: MetricsCollectorProtocol) -> None: self._metrics = metrics self._latency = metrics.create_histogram( "checkout_duration_seconds", description="End-to-end checkout latency", labels={"service": "checkout"}, )
async def checkout(self, cart_id: str) -> None: self._metrics.increment("checkout_requests_total", tags={"cart": "web"}) ...For ergonomic instrumentation, the package ships decorators that time and trace a function in one line:
from lexigram.monitor import metered, traced
@traced("checkout.process")@metered("checkout.process.duration")async def process(cart_id: str) -> None: ...4. Tracing
Section titled “4. Tracing”Inject TracerProtocol and wrap units of work in a span. Spans are context managers and automatically close on exit, capturing exceptions if any are raised.
from lexigram.contracts.observability.tracing import TracerProtocol
class FulfilmentService: def __init__(self, tracer: TracerProtocol) -> None: self._tracer = tracer
async def ship(self, order_id: str) -> None: with self._tracer.start_span( "fulfilment.ship", attributes={"order.id": order_id}, ) as span: warehouse = await self._pick_warehouse(order_id) span.set_attribute("warehouse.id", warehouse.id) await self._dispatch(warehouse, order_id) span.add_event("dispatched", {"carrier": warehouse.carrier})For propagation across services or messaging boundaries, use tracer.inject_context(carrier) to write traceparent into outbound headers and tracer.extract_context(carrier) to continue the trace on the consumer side. The higher-level ObservabilityService exposes the same span lifecycle through a trace() context manager that also covers the no-op case.
5. Health Checks
Section titled “5. Health Checks”Health checks register against HealthCheckRegistryProtocol, tagged with a HealthCheckCategory that maps directly to Kubernetes probe types:
from lexigram.contracts.core.health import HealthCheckCategory, HealthStatus, HealthCheckResultfrom lexigram.contracts.observability.metrics import HealthCheckRegistryProtocol
async def check_payment_gateway() -> HealthCheckResult: ok = await payment_client.ping() return HealthCheckResult( component="payment_gateway", status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY, )
class CheckoutModule: def __init__(self, health: HealthCheckRegistryProtocol) -> None: health.add( "payment_gateway", check_payment_gateway, timeout=2.0, critical=True, category=HealthCheckCategory.READINESS, )When lexigram-monitor is paired with lexigram-web, the module registers HTTP endpoints under the configured base path (defaults to /health). The Application itself also exposes liveness(), readiness(), startup_check(), and health_check() coroutines for non-HTTP probes (see Deployment).
6. Configuration
Section titled “6. Configuration”Wire MonitorModule into your application and configure the monitor: section. Defaults are zero-config — every pillar can be toggled independently.
from lexigram import Applicationfrom lexigram.di.module import Module, modulefrom lexigram.monitor import MonitorModule
@module(imports=[MonitorModule.configure()])class AppModule(Module): pass
app = Application(modules=[AppModule])monitor: metrics: enabled: true prefix: "myapp" histogram_buckets: [0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0]
prometheus: enabled: true port: 9090 path: "/metrics"
tracing: enabled: true service_name: "myapp" sampler_type: "probability" sample_rate: 0.1 # production: sample 10 %
opentelemetry: enabled: true otlp: endpoint: "http://otel-collector:4317" compression: "gzip"
health: enabled: true path: "/health" interval: 30 timeout: 5
logging: level: "INFO" format: "json" include_trace_context: true redact_fields: ["password", "token", "authorization"]Every key has a LEX_MONITOR__* environment-variable equivalent (e.g. LEX_MONITOR__TRACING__SAMPLE_RATE=0.05). See YAML Configuration for override semantics.
For tests, swap the module for a no-op stub that discards everything:
async with Application.boot(modules=[MonitorModule.stub()]) as app: ...7. Backends in Production
Section titled “7. Backends in Production”| Pillar | Typical sink | How it gets there |
|---|---|---|
| Logs | Loki, Elasticsearch, Datadog | stdout JSON, scraped by the container runtime |
| Metrics | Prometheus, Grafana Cloud | /metrics scrape on the configured port |
| Traces | Tempo, Jaeger, Honeycomb | OTLP exporter to a local OpenTelemetry Collector |
| Health | Kubernetes probes, ALB / NLB | HTTP requests against /health (and /ready) |
Shipping OTLP to a sidecar/agent collector lets it handle routing, retries, and backend-specific protocols.
Next Steps
Section titled “Next Steps”- Dependency Injection — injecting metrics, tracer, and health registry protocols
- Providers — how
MonitorProviderregisters the four pillars - Deployment — wiring
/healthto Kubernetes probes lexigram-monitorpackage reference — full API surface