API Reference
Classes
Section titled “Classes”BatchEmbeddingJob
Section titled “BatchEmbeddingJob”JobProtocol configuration for batch embedding.
BatchEmbeddingProgress
Section titled “BatchEmbeddingProgress”Track progress of batch embedding job.
Calculate progress percentage.
Calculate cache hit rate.
def update( status: EmbeddingStatus | None = None, texts_processed: int | None = None, cache_hits: int | None = None, cache_misses: int | None = None, error: str | None = None ) -> None
Update progress tracking.
Convert to dictionary for serialization.
BatchEmbeddingResult
Section titled “BatchEmbeddingResult”Result of batch embedding job.
def success_result( cls, job_id: str, embeddings_generated: int, cache_hits: int, duration: float, metadata: dict[str, Any] | None = None ) -> BatchEmbeddingResult
Create successful embedding result.
def failure_result( cls, job_id: str, error: str, duration: float, metadata: dict[str, Any] | None = None ) -> BatchEmbeddingResult
Create failed embedding result.
Convert to dictionary for serialization.
BatchEmbeddingWorker
Section titled “BatchEmbeddingWorker”Background worker for batch embedding generation.
Optimizes embedding generation through:
- Batch processing (minimize API calls)
- Cache integration (avoid redundant computations)
- Progress tracking (resumable on failure)
- Concurrent processing (parallel batches)
Example
from lexigram.ai.workers.batch_embedding import BatchEmbeddingWorkerfrom lexigram.contracts import VectorStoreProtocolfrom lexigram.contracts.infra.tasks import TaskQueueProtocol
# Setup — resolve implementations via the DI containervector_store = container.resolve(VectorStoreProtocol)embedding_provider = container.resolve(EmbeddingClientProtocol)queue = container.resolve(TaskQueueProtocol)
worker = BatchEmbeddingWorker( vector_store=vector_store, embedding_provider=embedding_provider, queue=queue, concurrency=3,)
# Start workerawait worker.start()
# Submit embedding jobjob_id = await worker.embed_batch( chunks=chunks, collection_name="my-docs", model_name="text-embedding-ada-002", batch_size=100,)
# Check progressprogress = await worker.get_progress(job_id)logger.info(f"Progress: {progress.progress_percent:.1f}%")logger.info(f"Cache hit rate: {progress.cache_hit_rate:.1f}%")
# Stop workerawait worker.stop()def __init__( vector_store: VectorStoreProtocol, embedding_provider: EmbeddingProvider, queue: TaskQueueProtocol, worker_id: str = 'batch-embedding', concurrency: int = 3, default_batch_size: int = 100, enable_cache: bool = True )
Initialize batch embedding worker.
| Parameter | Type | Description |
|---|---|---|
| `vector_store` | VectorStoreProtocol | Vector store for storing embeddings |
| `embedding_provider` | EmbeddingProvider | Provider for generating embeddings |
| `queue` | TaskQueueProtocol | Task queue for job management |
| `worker_id` | str | Unique worker identifier |
| `concurrency` | int | Number of concurrent batch processing tasks |
| `default_batch_size` | int | Default texts per batch |
| `enable_cache` | bool | Enable embedding cache |
Start the embedding worker pool.
Stop the embedding worker pool.
async def embed_batch( chunks: list[ChunkProtocol], collection_name: str, model_name: str = 'text-embedding-ada-002', batch_size: int | None = None, use_cache: bool = True, priority: int = 0 ) -> str
Submit batch embedding job.
| Parameter | Type | Description |
|---|---|---|
| `chunks` | list[ChunkProtocol] | List of text chunks to embed |
| `collection_name` | str | Vector store collection name |
| `model_name` | str | Embedding model name |
| `batch_size` | int | None | Texts per batch (default: worker default) |
| `use_cache` | bool | Use embedding cache |
| `priority` | int | JobProtocol priority (higher = sooner) |
| Type | Description |
|---|---|
| str | JobProtocol ID for tracking |
async def get_progress(job_id: str) -> BatchEmbeddingProgress | None
Get embedding progress for job.
Get worker statistics.
Report the health of this worker.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Unused; present for protocol conformance. |
| Type | Description |
|---|---|
| HealthCheckResult | HEALTHY when the worker is running, UNHEALTHY otherwise. |
Clear the embedding cache.
DLQAction
Section titled “DLQAction”Actions for DLQ items.
DLQItem
Section titled “DLQItem”Item in the dead letter queue.
Check if item can be retried.
Calculate exponential backoff delay in seconds.
Convert to dictionary for serialization.
DLQStats
Section titled “DLQStats”Statistics for DLQ.
DeadLetterQueueWorker
Section titled “DeadLetterQueueWorker”Worker for handling failed tasks and retry logic.
Monitors failed jobs and implements intelligent retry strategies based on failure categorization.
Example
from lexigram.ai.workers import DeadLetterQueueWorkerfrom lexigram.contracts.infra.tasks import TaskQueueProtocol
# Setupmain_queue = TaskQueueProtocol()dlq_queue = TaskQueueProtocol() # Separate queue for DLQ items
worker = DeadLetterQueueWorker( main_queue=main_queue, dlq_queue=dlq_queue, worker_id="dlq", check_interval=60, # Check every minute)
# Register error notification handlerasync def notify_on_permanent_failure(item: DLQItem): await send_alert(f"Permanent failure: {item.job_id}")
worker.set_notification_handler(notify_on_permanent_failure)
# Start workerawait worker.start()
# Manually retry a failed jobawait worker.retry_item(job_id="abc123")
# Get DLQ statisticsstats = await worker.get_stats()logger.info(f"Total failed items: {stats.total_items}")
# Stop workerawait worker.stop()def __init__( main_queue: TaskQueueProtocol, dlq_queue: TaskQueueProtocol | None = None, worker_id: str = 'dlq', check_interval: int = 60, max_retries: int = 5, base_backoff: int = 60 )
Initialize DLQ worker.
| Parameter | Type | Description |
|---|---|---|
| `main_queue` | TaskQueueProtocol | Main task queue to monitor |
| `dlq_queue` | TaskQueueProtocol | None | Optional separate queue for DLQ items |
| `worker_id` | str | Unique worker identifier |
| `check_interval` | int | Interval in seconds to check for failed jobs |
| `max_retries` | int | Maximum retry attempts per job |
| `base_backoff` | int | Base delay in seconds for exponential backoff |
Start the DLQ worker.
Stop the DLQ worker.
def set_notification_handler(handler: Callable[[DLQItem], Any]) -> None
Set notification handler for permanent failures.
| Parameter | Type | Description |
|---|---|---|
| `handler` | Callable[[DLQItem], Any] | Async callable that receives DLQItem |
Add a failed job to the DLQ.
| Parameter | Type | Description |
|---|---|---|
| `job` | JobProtocol | Failed job |
| `error` | str | Error message |
Manually retry a DLQ item.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID to retry |
| Type | Description |
|---|---|
| bool | True if retry was initiated, False otherwise |
Remove item from DLQ (permanent deletion).
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID to remove |
| Type | Description |
|---|---|
| bool | True if removed, False if not found |
Archive item (mark as permanently failed).
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID to archive |
| Type | Description |
|---|---|
| bool | True if archived, False if not found |
async def get_stats() -> DLQStats
Get DLQ statistics.
Report the health of this worker.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Unused; present for protocol conformance. |
| Type | Description |
|---|---|
| HealthCheckResult | HEALTHY when the worker is running, UNHEALTHY otherwise. |
async def get_items( category: FailureCategory | None = None, limit: int = 100 ) -> list[dict[str, Any]]
Get DLQ items.
| Parameter | Type | Description |
|---|---|---|
| `category` | FailureCategory | None | Optional filter by failure category |
| `limit` | int | Maximum items to return |
| Type | Description |
|---|---|
| list[dict[str, Any]] | List of DLQ items as dictionaries |
DocumentIngestionJob
Section titled “DocumentIngestionJob”JobProtocol configuration for document ingestion.
DocumentIngestionWorker
Section titled “DocumentIngestionWorker”Document ingestion worker.
def __init__( vector_store: VectorStoreProtocol, queue: Any, worker_id: str = 'document-ingestion', concurrency: int = 3, default_chunking_config: ChunkingConfigDict | None = None, document_parser: DocumentParser | None = None )
Initialize document ingestion worker.
| Parameter | Type | Description |
|---|---|---|
| `vector_store` | VectorStoreProtocol | Vector store for storing document chunks |
| `queue` | Any | Task queue for job management |
| `worker_id` | str | Unique worker identifier |
| `concurrency` | int | Number of concurrent document processing tasks |
| `default_chunking_config` | ChunkingConfigDict | None | Default chunking configuration |
| `document_parser` | DocumentParser | None | Document parser to use (defaults to UniversalDocumentParser) |
Start the ingestion worker pool.
Stop the ingestion worker pool.
async def ingest_document( document_id: str, file_path: Path, collection_name: str, parser: DocumentParser | None = None, chunking_config: ChunkingConfigDict | None = None, metadata: dict[str, Any] | None = None, priority: int = 0, batch_size: int = 50 ) -> str
Submit document for ingestion.
| Parameter | Type | Description |
|---|---|---|
| `document_id` | str | Unique document identifier |
| `file_path` | Path | Path to document file |
| `collection_name` | str | Vector store collection name |
| `parser` | DocumentParser | None | Optional document parser (uses worker default if None) |
| `chunking_config` | ChunkingConfigDict | None | Optional chunking configuration |
| `metadata` | dict[str, Any] | None | Optional document metadata |
| `priority` | int | JobProtocol priority (higher = sooner) |
| `batch_size` | int | Chunks per batch |
| Type | Description |
|---|---|
| str | JobProtocol ID for tracking |
async def get_progress(job_id: str) -> IngestionProgress | None
Get ingestion progress for job.
Get worker statistics.
Report the health of this worker.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Unused; present for protocol conformance. |
| Type | Description |
|---|---|
| HealthCheckResult | HEALTHY when the worker is running, UNHEALTHY otherwise. |
ErrorClassifier
Section titled “ErrorClassifier”Classify errors into failure categories.
def classify( error: str, _job: JobProtocol ) -> FailureCategory
Classify error into a failure category.
| Parameter | Type | Description |
|---|---|---|
| `error` | str | Error message |
| `job` | Original job |
| Type | Description |
|---|---|
| FailureCategory | Failure category |
FailureCategory
Section titled “FailureCategory”Categories of failures.
IngestionProgress
Section titled “IngestionProgress”Track progress of document ingestion.
Calculate overall progress percentage.
def update( status: IngestionStatus | None = None, pages_processed: int | None = None, chunks_processed: int | None = None, error: str | None = None ) -> None
Update progress tracking.
Convert to dictionary for serialization.
IngestionResult
Section titled “IngestionResult”Result of document ingestion.
def success_result( cls, document_id: str, chunks_created: int, duration: float, metadata: dict[str, Any] | None = None ) -> IngestionResult
Create successful ingestion result.
def failure_result( cls, document_id: str, error: str, duration: float, metadata: dict[str, Any] | None = None ) -> IngestionResult
Create failed ingestion result.
Convert to dictionary for serialization.
MaintenanceResult
Section titled “MaintenanceResult”Result of a maintenance task execution.
def success( cls, task_name: str, task_type: MaintenanceTaskType, started_at: datetime, items_processed: int = 0, items_deleted: int = 0, metadata: dict[str, Any] | None = None ) -> MaintenanceResult
Create successful result.
def failure( cls, task_name: str, task_type: MaintenanceTaskType, started_at: datetime, error: str ) -> MaintenanceResult
Create failed result.
Convert to dictionary for serialization.
MaintenanceStatus
Section titled “MaintenanceStatus”Maintenance task status.
MaintenanceTask
Section titled “MaintenanceTask”Configuration for a maintenance task.
Check if task should run based on schedule.
Convert to dictionary for serialization.
MaintenanceTaskType
Section titled “MaintenanceTaskType”Types of maintenance tasks.
MaintenanceWorker
Section titled “MaintenanceWorker”Background worker for scheduled maintenance tasks.
Executes periodic maintenance operations including:
- Vector store index optimization
- Cache cleanup and TTL enforcement
- Old document cleanup
- Metrics aggregation
Example
from lexigram.ai.workers import MaintenanceWorkerfrom lexigram.contracts import VectorStoreProtocol
# Setup — resolve via the DI containervector_store = container.resolve(VectorStoreProtocol)
worker = MaintenanceWorker( vector_store=vector_store, worker_id="maintenance",)
# Register maintenance tasksworker.register_task( name="optimize_indexes", task_type=MaintenanceTaskType.INDEX_OPTIMIZATION, handler=lambda: vector_store.optimize_indexes(), interval_seconds=3600, # Every hour)
worker.register_task( name="cleanup_old_docs", task_type=MaintenanceTaskType.DOCUMENT_CLEANUP, handler=lambda: cleanup_documents(days=30), schedule_cron="0 2 * * *", # 2 AM daily)
# Start workerawait worker.start()
# Get statisticsstats = worker.get_stats()logger.info(f"Tasks run: {stats['total_runs']}")
# Stop workerawait worker.stop()def __init__( vector_store: VectorStoreProtocol | None = None, worker_id: str = 'maintenance', check_interval: int = 60 )
Initialize maintenance worker.
| Parameter | Type | Description |
|---|---|---|
| `vector_store` | VectorStoreProtocol | None | Optional vector store for index optimization |
| `worker_id` | str | Unique worker identifier |
| `check_interval` | int | Interval in seconds to check for tasks to run |
Start the maintenance worker.
Stop the maintenance worker.
def register_task( name: str, task_type: MaintenanceTaskType, handler: Callable[[], Any], schedule_cron: str | None = None, interval_seconds: int | None = None, enabled: bool = True, timeout: float = 300.0 ) -> None
Register a maintenance task.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Unique task name |
| `task_type` | MaintenanceTaskType | Type of maintenance task |
| `handler` | Callable[[], Any] | Async or sync callable to execute |
| `schedule_cron` | str | None | Cron expression for scheduling |
| `interval_seconds` | int | None | Simple interval in seconds |
| `enabled` | bool | Whether task is enabled |
| `timeout` | float | Task timeout in seconds |
Unregister a maintenance task.
Enable a maintenance task.
Disable a maintenance task.
async def run_task_now(name: str) -> MaintenanceResult
Run a specific maintenance task immediately.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Task name to run |
| Type | Description |
|---|---|
| MaintenanceResult | Maintenance result |
Get worker statistics.
Get status of a specific task.
Get status of all registered tasks.
Get recent maintenance results.
Report the health of this worker.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Unused; present for protocol conformance. |
| Type | Description |
|---|---|
| HealthCheckResult | HEALTHY when the worker is running, UNHEALTHY otherwise. |
Optimize vector store indexes.
This is a built-in handler for index optimization.
Clean up old embeddings from cache.
This is a built-in handler for cache cleanup.
| Parameter | Type | Description |
|---|---|---|
| `max_age_days` | int | Maximum age in days for cached embeddings |
Aggregate and rollup metrics.
This is a built-in handler for metrics aggregation.
WorkerJobCompletedHook
Section titled “WorkerJobCompletedHook”Payload fired when a worker finishes a job.
WorkerJobStartedHook
Section titled “WorkerJobStartedHook”Payload fired when a worker starts a job.
WorkerMaintenanceRunHook
Section titled “WorkerMaintenanceRunHook”Payload fired when a maintenance worker runs a maintenance task.
WorkersConfig
Section titled “WorkersConfig”Configuration for AI background workers.
Loaded from the ai_workers: key in application.yaml, with environment
variable overrides via LEX_AI_WORKERS__* prefix.
Check config is safe for the target environment.
WorkersModule
Section titled “WorkersModule”Module for AI background workers.
Registers the WorkersProvider which wires the ingestion, embedding, DLQ, and maintenance workers.
Usage
from lexigram.ai.workers.config import WorkersConfig
@module( imports=[WorkersModule.configure(WorkersConfig(...))])class AppModule(Module): passdef configure( cls, config: WorkersConfig | None = None, enable_scheduler: bool = True, **kwargs: Any ) -> DynamicModule
Create a WorkersModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | WorkersConfig | None | Optional WorkersConfig. |
| `enable_scheduler` | bool | Start the background maintenance scheduler that handles DLQ retries and worker health checks. Defaults to ``True``; set to ``False`` to disable all scheduled tasks (e.g. when running in a worker-only process). **kwargs: Additional keyword arguments forwarded to the provider. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def stub( cls, config: WorkersConfig | None = None ) -> DynamicModule
Create a WorkersModule suitable for unit and integration testing.
Uses in-memory or no-op worker implementations with minimal side effects. The background scheduler is disabled by default to prevent timer interference between tests.
| Parameter | Type | Description |
|---|---|---|
| `config` | WorkersConfig | None | Optional WorkersConfig override. Uses safe test defaults when ``None``. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
WorkersProvider
Section titled “WorkersProvider”Provider for AI background workers.
Reads WorkersConfig and registers workers for batch embedding, ingestion, maintenance, and DLQ handling.
Lifecycle:
- register — binds worker singletons into the container.
- boot — resolves workers and starts each as a background async task.
- shutdown — calls stop() on every worker, then awaits each
background task to ensure graceful queue draining.
def __init__( config: WorkersConfig | None = None, enable_scheduler: bool = True, **kwargs: Any ) -> None
def from_config( cls, config: WorkersConfig, **context ) -> WorkersProvider
Factory method for DI container setup.
async def register(container: ContainerRegistrarProtocol) -> None
Register workers with the DI container.
async def boot(container: ContainerResolverProtocol) -> None
Resolve workers from the container and start each as a background task.
Each worker’s start() coroutine is wrapped in an asyncio.Task
so it runs concurrently. Task references are stored in _tasks to
prevent garbage collection (Ruff RUF006).
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerResolverProtocol | Container resolver for resolving worker instances. |
Stop all workers gracefully and await their background tasks.
Calls stop() on every worker in reverse start order, then waits
for each background task to finish (with a 30 s timeout per task).
Health check — always healthy (in-process domain provider).
No external backend to ping.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Ignored for in-process providers. |
| Type | Description |
|---|---|
| HealthCheckResult | Always HEALTHY — no external backend to ping. |
Exceptions
Section titled “Exceptions”DLQError
Section titled “DLQError”Raised when a Dead Letter Queue operation fails.
MaintenanceError
Section titled “MaintenanceError”Raised when a maintenance task operation fails.
WorkerError
Section titled “WorkerError”Base exception for all worker-related errors.