Skip to content
GitHub

Audit Trail

lexigram-audit provides append-only audit logging with HMAC-SHA256 tamper detection, retention policies, and scheduled verification. The audit subsystem is designed to be fire-tolerant — a logging failure never blocks the operation that triggered it.

For the full configuration reference, SQL store schema, and CLI commands, see the lexigram-audit package docs.


Three protocols define the audit stack:

from typing import Protocol, runtime_checkable
from lexigram.contracts.audit import (
AuditLoggerProtocol,
AuditStoreProtocol,
AuditVerifierProtocol,
RetentionPolicyProtocol,
AuditEntry,
AuditQuery,
AuditEventSeverity,
AuditMismatch,
RetentionDecision,
)
@runtime_checkable
class MyAuditLogger(AuditLoggerProtocol, Protocol):
async def log(self, entry: AuditEntry) -> None: ... # never raises
async def query(self, query: AuditQuery) -> list[AuditEntry]: ...
@runtime_checkable
class MyAuditVerifier(AuditVerifierProtocol, Protocol):
async def verify_recent(self, *, limit: int = 100) -> list[AuditMismatch]: ...
async def verify_entry(self, entry_id: str) -> bool: ...

AuditEntry captures the full context of a security-relevant operation:

AuditEntry(
action="user.update",
actor_id="user-123",
resource_type="User",
resource_id="user-456",
outcome="success",
severity=AuditEventSeverity.MEDIUM,
occurred_at=datetime.now(UTC),
metadata={"changed_fields": ["email"]},
old_values={"email": "old@example.com"},
new_values={"email": "new@example.com"},
source="admin-api",
)

Configure the audit subsystem through AuditModule.configure():

from lexigram import Application
from lexigram.audit import AuditModule
app = Application(name="my-app")
app.add_module(AuditModule.configure(
hmac_key=b"your-hmac-secret-key",
store_backend="sql", # "sql" or "memory"
table_name="audit_log",
retention_days=365,
enable_admin=True,
))

In application.yaml:

application.yaml
audit:
store_backend: "sql"
table_name: "audit_log"
hmac_key: "${AUDIT_HMAC_KEY}"
verification_schedule: "0 * * * *" # hourly
verification_batch_size: 100
retention_policy:
name: "default"
default_retention_days: 365
severity_overrides:
critical: 2555
high: 1095

Inject AuditLoggerProtocol and call log():

from lexigram.contracts.audit import (
AuditLoggerProtocol,
AuditEntry,
AuditQuery,
AuditEventSeverity,
)
from lexigram.result import Result, Ok, Err
class UserService:
def __init__(self, audit: AuditLoggerProtocol) -> None:
self._audit = audit
async def update_email(self, user_id: str, new_email: str) -> Result[str, str]:
old_email = await self._repo.get_email(user_id)
if old_email is None:
return Err("User not found")
await self._repo.set_email(user_id, new_email)
await self._audit.log(AuditEntry(
action="user.update_email",
actor_id=user_id,
resource_type="User",
resource_id=user_id,
outcome="success",
severity=AuditEventSeverity.MEDIUM,
old_values={"email": old_email},
new_values={"email": new_email},
source="user-service",
))
return Ok(new_email)

log() is fire-tolerant — it never raises. If the store is unavailable, the entry is discarded and a warning is logged.


For common patterns, the @audited decorator attaches metadata that an interceptor reads to log automatically:

from lexigram.audit.decorators import audited
class ProductService:
@audited("product.delete", resource_type="Product", severity="high")
async def delete_product(self, product_id: str) -> None:
await self._repo.delete(product_id)

The decorator sets __audited__, __audit_action__, __audit_resource_type__, and __audit_severity__ attributes on the function.


Query entries with the composable AuditQuery:

from lexigram.contracts.audit import AuditQuery, AuditEventSeverity
async def find_recent_failures(self, actor_id: str) -> list[AuditEntry]:
query = AuditQuery(
actor_id=actor_id,
outcome="failure",
severity=AuditEventSeverity.HIGH,
since=datetime.now(UTC) - timedelta(hours=24),
limit=50,
)
return await self._audit.query(query)

Every entry appended to the store gets an HMAC-SHA256 checksum over its serialised content. Verification detects tampering:

from lexigram.audit import AuditVerifier, verify_audit_checksum
async def verify_integrity(verifier: AuditVerifier) -> None:
mismatches = await verifier.verify_recent(limit=1000)
for m in mismatches:
print(f"TAMPERED: entry {m.entry_id} — stored {m.expected_checksum}, actual {m.actual_checksum}")
is_valid = await verifier.verify_entry("entry-abc-123")
assert is_valid

Define a retention policy with severity-based overrides:

from lexigram.contracts.audit import RetentionPolicy
policy = RetentionPolicy(
name="default",
default_retention_days=365,
severity_overrides={
"critical": 2555, # 7 years
"high": 1095, # 3 years
},
)

Run the purger manually or via cron:

from lexigram.audit import AuditPurger
purger = AuditPurger(store, policy)
purged = await purger.purge_expired()
print(f"Purged {purged} expired entries")

The purger logs its own activity as audit entries — meta-audit.


For unit tests, configure the module with store_backend="memory":

from lexigram import Application
from lexigram.audit import AuditModule
from lexigram.contracts.audit import AuditLoggerProtocol, AuditEntry
async def test_logs_audit_entry() -> None:
async with Application.boot(
modules=[AuditModule.configure(hmac_key=b"test", store_backend="memory")]
) as app:
logger = await app.container.resolve(AuditLoggerProtocol)
await logger.log(AuditEntry(action="test.event", actor_id="tester"))
results = await logger.query(AuditQuery(limit=10))
assert len(results) == 1
assert results[0].action == "test.event"