Skip to content
GitHub

Architecture

Internal design of the lexigram-ai-workers package.


lexigram-ai-workers provides a concurrent worker pool for AI task execution — batch embedding generation, document ingestion, scheduled maintenance, and dead-letter-queue recovery. It sits in the AI subsystem, depending only on lexigram and lexigram-contracts.

flowchart LR
    subgraph App[Application]
        A[Client Code]
    end
    subgraph Workers[lexigram-ai-workers]
        WP[WorkersProvider]
        BEW[BatchEmbeddingWorker]
        DIW[DocumentIngestionWorker]
        MW[MaintenanceWorker]
        DLQ[DeadLetterQueueWorker]
    end
    subgraph Infra[Infrastructure]
        Q[TaskQueueProtocol]
        VS[VectorStoreProtocol]
    end

    A --> WP
    WP --> BEW
    WP --> DIW
    WP --> MW
    WP --> DLQ
    BEW --> Q
    BEW --> VS
    DIW --> Q
    DIW --> VS
    DLQ --> Q

The package implements a worker pool pattern with four worker types registered by WorkersProvider:

ComponentRole
WorkersProviderDI provider; registers and boots all workers as asyncio.Task instances
BatchEmbeddingWorkerConsumes batch_embed jobs; delegates to EmbeddingProvider, caches results, stores in VectorStoreProtocol
DocumentIngestionWorkerConsumes ingest_document jobs; parses files via DocumentParser, chunks via DocumentProcessor, stores in VectorStoreProtocol
MaintenanceWorkerTimer-driven loop; runs registered MaintenanceTask instances (index optimization, cache cleanup, metrics rollup)
DeadLetterQueueWorkerFailed-job recovery; classifies errors via ErrorClassifier, retries with exponential backoff, notifies on permanent failures

Each worker registers named handlers in a dict (e.g. {"batch_embed": self._handle_batch_embed}) at start time. The TaskWorkerProtocol implementation resolves these handlers when a matching job arrives from the queue — no if/elif chains.

BatchEmbeddingWorker and DocumentIngestionWorker maintain their own sub-pools of TaskWorkerProtocol instances, configurable via WorkersConfig.batch_embedding_concurrency and WorkersConfig.document_ingestion_concurrency (both default to 3).


sequenceDiagram
    participant Client as Client Code
    participant Q as TaskQueueProtocol
    participant WP as WorkersProvider
    participant TW as TaskWorker (sub-pool)
    participant EP as EmbeddingProvider / DocumentParser
    participant VS as VectorStoreProtocol

    Client->>WP: worker.embed_batch(chunks)
    WP->>Q: queue.enqueue("batch_embed", data)
    Q-->>WP: job_id

    par Worker loop
        Q->>TW: consume job
        TW->>TW: resolve handler from registry
        TW->>EP: embed_texts(batch)
        EP-->>TW: embeddings
        TW->>VS: add_texts(texts, embeddings)
        VS-->>TW: stored
        TW-->>Q: ack job
    end

    WP-->>Client: job_id

    Client->>WP: worker.get_progress(job_id)
    WP-->>Client: BatchEmbeddingProgress

sequenceDiagram
    participant C as Container
    participant WP as WorkersProvider
    participant BEW as BatchEmbeddingWorker
    participant DIW as DocumentIngestionWorker
    participant MW as MaintenanceWorker
    participant DLQ as DeadLetterQueueWorker

    C->>WP: register(registrar)
    WP->>WP: bind WorkersConfig singleton
    WP->>C: register worker types (transient)

    C->>WP: boot(resolver)
    WP->>C: resolve each worker type
    C-->>WP: worker instances
    loop for each worker
        WP->>BEW: asyncio.create_task(worker.start())
        WP->>DIW: asyncio.create_task(worker.start())
        WP->>MW: asyncio.create_task(worker.start())
        WP->>DLQ: asyncio.create_task(worker.start())
        WP->>WP: _tasks.add(task) + add_done_callback
    end

    C->>WP: shutdown()
    loop reverse order
        WP->>DLQ: worker.stop()
        WP->>MW: worker.stop()
        WP->>DIW: worker.stop()
        WP->>BEW: worker.stop()
    end
    WP->>WP: await asyncio.wait(tasks, timeout=30)
    WP->>WP: cancel remaining pending tasks
  • Priority: ProviderPriority.INFRASTRUCTURE (10)
  • register(): Binds WorkersConfig singleton; registers each worker type as transient if config.enabled is true
  • boot(): Skips if disabled; resolves each worker from the container; starts each via asyncio.create_task() with add_done_callback for cleanup (RUF006 compliant)
  • shutdown(): Calls stop() on each worker in reverse order; awaits background tasks with 30-second timeout; cancels stragglers
  • health_check(): Always HEALTHY — in-process provider with no external backends

Failed worker tasks are handled by _on_worker_done() — logs the exception and discards the task reference. DLQ recovery is handled by DeadLetterQueueWorker, which classifies failures via ErrorClassifier and applies exponential backoff (base_backoff * 2^retry_count, max 3600s).


The package consumes protocols from lexigram-contracts and defines one local protocol:

ContractLocationUsed By
TaskWorkerProtocolcontracts/infra/tasks/protocols.pyWorker sub-pool management; exported by WorkersModule
TaskQueueProtocolcontracts/infra/tasks/protocols.pyJob enqueue/dequeue for embedding and ingestion
VectorStoreProtocolcontracts/data/vector/protocols.pyEmbedding storage and document chunk persistence
HealthCheckResult / HealthStatuscontracts/core/health.pyHealth reporting for all workers
AIErrorcontracts/ai/errors.pyException hierarchy base (WorkerError < AIError < LexigramError)
EmbeddingProviderbatch_embedding/protocols.py (package-local)Embedding generation abstraction
ExceptionExtendsCode
WorkerErrorAIErrorLEX_ERR_AIWORK_001
DLQErrorWorkerErrorLEX_ERR_AIWORK_002
MaintenanceErrorWorkerErrorLEX_ERR_AIWORK_003

WorkersModule provides a configure() factory that returns a DynamicModule:

from lexigram.ai.workers.config import WorkersConfig
from lexigram.ai.workers.module import WorkersModule
from lexigram.contracts.infra.tasks.protocols import TaskWorkerProtocol
# Register with explicit config
module = WorkersModule.configure(
config=WorkersConfig(enabled=True, batch_embedding_concurrency=5),
)
# Testing — no background scheduler
test_module = WorkersModule.stub()

The module exports TaskWorkerProtocol so downstream consumers can resolve worker pool references through the container.


PointMechanism
Custom embedding providerImplement EmbeddingProvider protocol from batch_embedding/protocols.py
Custom document parserSubclass DocumentParser interface
Custom worker typeAdd a new class; register in _WORKER_TYPES tuple in di/provider.py
Custom queue backendImplement TaskQueueProtocol from lexigram-contracts
Custom worker strategyImplement TaskWorkerProtocol; register "TaskWorkerClass" in the container
Maintenance tasksMaintenanceWorker.register_task() with handler + schedule
DLQ notification handlerDeadLetterQueueWorker.set_notification_handler()
Custom adapterAdd adapter in adapters/ (loader, RAG, task queue adapters exist)
Hook consumerRegister hook listener for WorkerJobStartedHook, WorkerJobCompletedHook, or WorkerMaintenanceRunHook