Skip to content
GitHub

Architecture

Internal design of the lexigram-ai-feedback package.


lexigram-ai-feedback is the feedback collection layer for AI-generated outputs — data types, storage backends, and services for collecting, processing, and querying user feedback on AI responses.

flowchart BT
    Contracts[lexigram-contracts<br/>FeedbackProtocol · FeedbackStoreProtocol<br/>FeedbackType · FeedbackItem · FeedbackSummary]
    Core[lexigram<br/>DI · Config · Result · Logging]
    subgraph Feedback[lexigram-ai-feedback]
        Models[Feedback Model]
        Collect[Collection Pipeline]
        Storage[Storage Backends]
        Process[Processor Registry]
    end
    subgraph Backends[External]
        DB[DatabaseProviderProtocol]
        Cache[CacheBackendProtocol]
    end
    Feedback --> Contracts & Core
    Storage --> DB & Cache

Depends only on lexigram and lexigram-contracts. Storage backends wired dynamically in boot() via optional container resolution.


class FeedbackType(StrEnum):
RATING = "rating" # Numeric score (1.0–5.0)
TEXT = "text" # Free-text comment
CORRECTION = "correction" # Original → corrected pair
LABEL = "label" # Ground truth label + input
@dataclass(frozen=True)
class FeedbackItem:
feedback_type: FeedbackType
value: Any # float, str, or dict
context: dict[str, Any] # trace_id, session_id, model
metadata: dict[str, Any] # comment, user_id, endpoint
id: str # UUID4, auto-generated
created_at: datetime # UTC, auto-generated
@dataclass(frozen=True)
class FeedbackSummary:
total_count: int
average_rating: float | None
count_by_type: dict[str, int]

src/lexigram/ai/feedback/
├── config.py # FeedbackConfig (enabled, async_processing)
├── constants.py # ENV_PREFIX, rating min/max, size limits
├── events.py # FeedbackSubmittedEvent (DomainEvent)
├── exceptions.py # FeedbackError hierarchy
├── hooks.py # Lifecycle hook payloads
├── types.py # Re-exports FeedbackType, FeedbackItem
├── protocols.py # Re-exports FeedbackProtocol, FeedbackStoreProtocol
├── module.py # FeedbackModule — configure() + stub()
├── di/provider.py # FeedbackProvider — register, boot, shutdown
├── services/
│ ├── collector.py # FeedbackCollector — collect_rating/text/correction/label
│ └── feedback_service.py # FeedbackService — submit_feedback, get_feedback_stats
├── storage/
│ ├── database.py # DatabaseFeedbackStore — SQL via DatabaseProviderProtocol
│ └── cache.py # CachedFeedbackStore — write-through via CacheBackendProtocol
├── processors/
│ └── processor_registry.py# FeedbackProcessorRegistry + 4 built-in processors
└── middleware/
└── middleware.py # FeedbackMiddleware, FeedbackContext

sequenceDiagram
    actor User
    participant MW as FeedbackMiddleware
    participant Svc as FeedbackService
    participant Coll as FeedbackCollector
    participant Store as FeedbackStore
    participant Cache as CachedFeedbackStore
    participant Stats as Stats API

    User->>MW: POST /feedback (type, value, context)
    MW->>Coll: collect_*()
    Coll->>Coll: Create FeedbackItem
    Coll->>Store: save(item)
    alt DB + Cache
        Store->>Cache: write-through
    else DB only
        Store-->>Coll: Ok(id)
    else No backend
        Coll->>Coll: Append to _feedback
    end
    Coll-->>MW: feedback_id
    MW-->>User: {"status": "success", "feedback_id": "..."}

    User->>Svc: submit_feedback(trace_id, score, comment)
    Svc->>Store: Delegates to collector
    User->>Svc: get_feedback_stats()
    Svc->>Store: aggregate(window_hours=24)
    Store-->>Svc: FeedbackSummary
    Svc-->>User: {total_count, average_rating, by_type}

Two entry points: web middleware (auto-captures request context) and programmatic API (trace-correlated submission).


sequenceDiagram
    participant App as Application
    participant P as FeedbackProvider
    participant C as Container

    App->>P: register(container)
    P->>C: singleton(FeedbackConfig)
    P->>C: singleton(FeedbackProcessorRegistry)
    P->>C: singleton(FeedbackCollector)
    P->>C: singleton(FeedbackProtocol → FeedbackService)

    App->>P: boot(container)
    P->>C: resolve_optional(DatabaseProviderProtocol)
    alt DB available
        P->>P: DatabaseFeedbackStore(db)
        P->>C: resolve_optional(CacheBackendProtocol)
        alt Cache available
            P->>P: CachedFeedbackStore(db, cache)
        end
        P->>C: resolve(FeedbackCollector)
        P->>C: resolve(FeedbackProtocol)
        P->>Collector: collector.storage = store
        P->>Service: service._store = store
    else No DB
        Note over P: Memory-only mode
    end

register() — Registers config, processor registry, collector, and service. Exits early if enabled=False.

boot() — Wires storage. Resolves DatabaseProviderProtocol (optional). If found, creates DatabaseFeedbackStore, optionally wraps with CachedFeedbackStore. Assigns store to collector and service. Falls back to in-memory when no DB available.

shutdown() — No-op. Database connection pooling managed by the database provider.


ContractImport PathImplemented By
FeedbackProtocollexigram.contracts.ai.feedbackFeedbackService
FeedbackStoreProtocollexigram.contracts.ai.feedbackDatabaseFeedbackStore, CachedFeedbackStore
FeedbackTypelexigram.contracts.ai.feedbackStrEnum
FeedbackItemlexigram.contracts.ai.feedbackFrozen dataclass
FeedbackSummarylexigram.contracts.ai.feedbackFrozen dataclass
DatabaseProviderProtocollexigram.contracts.dataconsumed
CacheBackendProtocollexigram.contracts.infra.cacheconsumed
DomainEventlexigram.contracts.domain.eventsFeedbackSubmittedEvent

FeedbackSubmittedEvent (feedback_id, item_type) extends DomainEvent for event bus subscribers. Three lifecycle hooks:

HookFired When
FeedbackSubmittedHookFeedback enters the pipeline
FeedbackProcessedHookProcessing completes
FeedbackStoredHookFeedback is persisted

FeedbackProcessorRegistry dispatches to the correct collector method by FeedbackType:

ProcessorFeedbackTypeCollector Action
RatingFeedbackProcessorRATINGcollect_rating(value)
TextFeedbackProcessorTEXTcollect_text(text)
CorrectionFeedbackProcessorCORRECTIONcollect_correction(original, corrected)
LabelFeedbackProcessorLABELcollect_label(label, input)

Built-in processors loaded via with_defaults(). Custom processors implement FeedbackProcessor protocol.


FeedbackMiddleware.__call__() extracts request context (body, user_id, session_id), calls next handler, captures output. create_feedback_endpoint() returns a POST /feedback handler. FeedbackContext is an async context manager for manual scoping:

async with FeedbackContext(collector, operation="prediction") as ctx:
result = await model.predict(input_data)
ctx.set_result(result)

FeedbackError
├── FeedbackProcessingError # Processor pipeline failure
└── FeedbackValidationError # Schema / data-validation failure

Domain operations use Result[T, E]. Exceptions above signal configuration and processing failures.


PointMechanismExample
Custom feedback typeAdd FeedbackType member + register FeedbackProcessorFeedbackType.PREFERENCE
Custom storageImplement FeedbackStoreProtocol, assign to collector.storageS3-backed store
Custom analyticsSubscribe to hook or listen for domain eventML retraining trigger
Custom processorsImplement FeedbackProcessor, call registry.register()Sentiment analysis
Lifecycle hooksSubscribe to FeedbackSubmittedHookAudit logging
Custom middlewareSubclass or wrap FeedbackMiddlewareAI chat application