How-To Guides
Submit a Batch Embedding Job
Section titled “Submit a Batch Embedding Job”from lexigram.ai.workers.batch_embedding import BatchEmbeddingWorkerfrom lexigram.contracts.ai.rag import ChunkProtocol
worker = await container.resolve(BatchEmbeddingWorker)
chunks: list[ChunkProtocol] = [ ChunkProtocol(text="Document one", metadata={"source": "file1.pdf"}), ChunkProtocol(text="Document two", metadata={"source": "file2.pdf"}),]
job_id = await worker.embed_batch( chunks=chunks, collection_name="my-docs", model_name="text-embedding-ada-002", batch_size=50,)Track Embedding Progress
Section titled “Track Embedding Progress”progress = await worker.get_progress(job_id)if progress: print(f"Status: {progress.status.value}") print(f"Progress: {progress.progress_percent:.1f}%") print(f"Cache hit rate: {progress.cache_hit_rate:.1f}%")Ingest a Document
Section titled “Ingest a Document”from pathlib import Path
from lexigram.ai.workers.document_ingestion import DocumentIngestionWorker
worker = await container.resolve(DocumentIngestionWorker)
job_id = await worker.ingest_document( document_id="doc-001", file_path=Path("/data/reports/Q1-2026.pdf"), collection_name="reports", metadata={"department": "finance"},)
progress = await worker.get_progress(job_id)Register a Maintenance Task
Section titled “Register a Maintenance Task”from lexigram.ai.workers.maintenance.worker import MaintenanceWorkerfrom lexigram.ai.workers.types import MaintenanceTaskType
maint = await container.resolve(MaintenanceWorker)
maint.register_task( name="optimize_vectors", task_type=MaintenanceTaskType.INDEX_OPTIMIZATION, handler=lambda: vector_store.optimize_indexes(), interval_seconds=3600, timeout=300.0,)
# Run a task immediatelyresult = await maint.run_task_now("optimize_vectors")print(f"{result.status.value}: {result.duration_seconds:.2f}s")Handle Failed Jobs with DLQ
Section titled “Handle Failed Jobs with DLQ”from lexigram.ai.workers.dlq.worker import DeadLetterQueueWorkerfrom lexigram.ai.workers.types import DLQItem
dlq = await container.resolve(DeadLetterQueueWorker)
# Add a failed jobawait dlq.add_failed_job(job, error="Connection timeout")
# Check statsstats = await dlq.get_stats()print(f"Total: {stats.total_items}, Permanent: {stats.permanent_failures}")
# Manually retryawait dlq.retry_item("job-42")
# Set notification handler for permanent failuresasync def alert(item: DLQItem) -> None: await send_pagerduty(f"Permanent failure: {item.job_id}")
dlq.set_notification_handler(alert)Clear the Embedding Cache
Section titled “Clear the Embedding Cache”worker = await container.resolve(BatchEmbeddingWorker)await worker.clear_cache()Use WorkersModule.stub() in Tests
Section titled “Use WorkersModule.stub() in Tests”import pytestfrom lexigram.ai.workers import WorkersModule
@pytest.fixturedef workers_module() -> WorkersModule: # No background scheduler — safe for tests return WorkersModule.stub()Observe Worker Lifecycle with Hooks
Section titled “Observe Worker Lifecycle with Hooks”from lexigram.ai.workers.hooks import ( WorkerJobCompletedHook, WorkerJobStartedHook, WorkerMaintenanceRunHook,)from lexigram.hooks import HookRegistry
async def on_job_started(hook: WorkerJobStartedHook) -> None: print(f"Job started: {hook.job_type}")
async def on_job_completed(hook: WorkerJobCompletedHook) -> None: print(f"Job completed: {hook.job_type}")
async def on_maintenance_run(hook: WorkerMaintenanceRunHook) -> None: print(f"Maintenance run: {hook.task_type}")
# Register hooks after the app bootsregistry = await app.container.resolve(HookRegistry)registry.register(WorkerJobStartedHook, on_job_started)registry.register(WorkerJobCompletedHook, on_job_completed)registry.register(WorkerMaintenanceRunHook, on_maintenance_run)Bridge Document Ingestion to the RAG Pipeline
Section titled “Bridge Document Ingestion to the RAG Pipeline”from lexigram.ai.workers.adapters import ( IngestionReport, LoaderWorkerBridge, RAGIngestionAdapter,)from lexigram.ai.workers.document_ingestion.types import Document
# Option A — Bridge ingestion worker as a document loaderfrom pathlib import Pathfrom lexigram.logging import get_logger
logger = get_logger(__name__)
bridge = LoaderWorkerBridge(worker=doc_ingestion_worker, timeout=120.0)registry = loader_registry # RAG pipeline's LoaderRegistryregistry.register([".pdf", ".docx"], bridge)
# Option B — Full pipeline: ingest → chunk → embed → storeadapter = RAGIngestionAdapter( chunker=fixed_size_chunker, embedding_worker=batch_worker, vector_store=pg_vector_store,)
documents: list[Document] = [ Document(content="...", metadata={"document_id": "doc-1"}),]result = await adapter.ingest_to_rag(documents, collection="support-docs")if result.is_ok(): report: IngestionReport = result.unwrap() logger.info("rag_ingestion_complete", chunk_count=report.chunk_count)Customize Batch Processing with WorkersConfig
Section titled “Customize Batch Processing with WorkersConfig”from lexigram.ai.workers.config import WorkersConfigfrom lexigram.ai.workers.constants import ( DEFAULT_BASE_BACKOFF, DEFAULT_MAX_RETRIES,)from lexigram.ai.workers import WorkersModule
config = WorkersConfig( enabled=True, batch_embedding_concurrency=5, # Default: 3 document_ingestion_concurrency=10, # Default: 3 enable_maintenance=True, dlq_check_interval=120, # Check DLQ every 2 minutes)
# Pass to the modulemodule = WorkersModule.configure( config=config, enable_scheduler=True,)
# Or set via environment variables:# LEX_AI_WORKERS__BATCH_EMBEDDING_CONCURRENCY=5# LEX_AI_WORKERS__ENABLE_MAINTENANCE=true
# Use constants for retry configurationmax_retries = DEFAULT_MAX_RETRIES # 5backoff = DEFAULT_BASE_BACKOFF # 60 secondsDisable Workers for a Subprocess
Section titled “Disable Workers for a Subprocess”When running a web server that doesn’t need background workers:
ai_workers: enabled: falseOr pass enable_scheduler=False to WorkersModule.configure() to start workers without the scheduler loop.