AI Workers
lexigram-ai-workers provides background processing for batch embedding, document ingestion, and scheduled maintenance tasks. Workers register with the task system and run in a separate process or thread pool, leaving the main application loop free for request handling.
For the full configuration reference and advanced features (DLQ recovery, custom schedulers, error classification), see the lexigram-ai-workers package docs.
1. Configuration
Section titled “1. Configuration”Add the module and configure the ai_workers: section. The module wires ingestion, embedding, DLQ, and maintenance workers:
from lexigram.ai.workers import WorkersModule, WorkersConfig
app.add_module( WorkersModule.configure( WorkersConfig( batch_embedding_concurrency=5, document_ingestion_concurrency=4, enable_maintenance=True, dlq_check_interval=120, ) ))ai_workers: enabled: true batch_embedding_concurrency: 3 # concurrent embedding jobs document_ingestion_concurrency: 3 # concurrent document pipelines enable_maintenance: true # vector store + cache cleanup dlq_check_interval: 60 # seconds between DLQ recovery sweeps2. Worker Types
Section titled “2. Worker Types”Three worker families are registered by WorkersModule:
Batch Embedding
Section titled “Batch Embedding”BatchEmbeddingWorker processes embedding jobs in parallel. It accepts BatchEmbeddingJob items, calls the configured embedding provider, and stores results:
from lexigram.ai.workers import BatchEmbeddingWorker, BatchEmbeddingJobfrom lexigram.result import Result
class EmbeddingOrchestrator: def __init__(self, worker: BatchEmbeddingWorker) -> None: self._worker = worker
async def embed_documents( self, texts: list[str], ) -> Result[list[str], str]: job = BatchEmbeddingJob(texts=texts) return await self._worker.execute(job)Document Ingestion
Section titled “Document Ingestion”DocumentIngestionWorker handles parsing, chunking, and storing documents. It tracks progress with IngestionProgress and returns IngestionResult:
from lexigram.ai.workers import DocumentIngestionWorker, DocumentIngestionJob
class DocumentProcessor: def __init__(self, worker: DocumentIngestionWorker) -> None: self._worker = worker
async def ingest(self, path: str) -> None: job = DocumentIngestionJob(source=path) result = await self._worker.execute(job) if result.is_ok(): print(f"Ingested {result.unwrap().chunks_created} chunks")Maintenance Worker
Section titled “Maintenance Worker”MaintenanceWorker runs periodic tasks like index optimization, cache cleanup, and health checks:
from lexigram.ai.workers import MaintenanceWorker, MaintenanceTask, MaintenanceTaskType
class HealthMonitor: def __init__(self, worker: MaintenanceWorker) -> None: self._worker = worker
async def run_checks(self) -> None: task = MaintenanceTask( name="vector-optimize", task_type=MaintenanceTaskType.INDEX_OPTIMIZATION, interval_seconds=3600, ) result = await self._worker.run(task) if result.is_ok(): print(f"Optimized {result.unwrap().items_processed} entries")3. Dead Letter Queue
Section titled “3. Dead Letter Queue”Failed jobs land in the DLQ for retry, archive, or notification. Configure the sweep interval in WorkersConfig:
from lexigram.ai.workers import DeadLetterQueueWorker, DLQItem, DLQStatsfrom lexigram.ai.workers import DLQAction, FailureCategory
class DLQManager: def __init__(self, dlq: DeadLetterQueueWorker) -> None: self._dlq = dlq
async def recover(self) -> None: stats: DLQStats = await self._dlq.get_stats() if stats.total_items > 0: for item in stats.by_category: action = await self._dlq.process( DLQItem( job_id="...", failure_category=FailureCategory.TRANSIENT, ) ) if action == DLQAction.RETRY: print(f"Retrying job")Each DLQItem tracks failure count, category, backoff, and next retry time. The calculate_backoff() method uses exponential backoff capped at 3600 seconds.
4. Testing
Section titled “4. Testing”Use WorkersModule.stub() for unit tests. It disables the background scheduler and uses no-op worker implementations:
from lexigram import Applicationfrom lexigram.ai.workers import WorkersModulefrom lexigram.contracts.infra.tasks.protocols import TaskWorkerProtocol
async def test_worker_registration() -> None: async with Application.boot(modules=[WorkersModule.stub()]) as app: worker = await app.container.resolve(TaskWorkerProtocol) assert worker is not NoneYou can also bind hand-rolled fakes to TaskWorkerProtocol in any container — the rest of your code depends only on the protocol.
Next Steps
Section titled “Next Steps”- Dependency Injection — binding workers to protocols
- Tasks & Scheduling — registering workers with the task system
- Testing — substituting stubs for infrastructure
lexigram-ai-workerspackage — full config reference, DLQ recovery, error classifier