Skip to content
GitHubDiscord

API Reference

JobProtocol configuration for batch embedding.
def to_job_kwargs() -> dict[str, Any]

Convert to JobProtocol kwargs.


Track progress of batch embedding job.
property progress_percent() -> float

Calculate progress percentage.

property cache_hit_rate() -> float

Calculate cache hit rate.

def update(
    status: EmbeddingStatus | None = None,
    texts_processed: int | None = None,
    cache_hits: int | None = None,
    cache_misses: int | None = None,
    error: str | None = None
) -> None

Update progress tracking.

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization.


Result of batch embedding job.
def success_result(
    cls,
    job_id: str,
    embeddings_generated: int,
    cache_hits: int,
    duration: float,
    metadata: dict[str, Any] | None = None
) -> BatchEmbeddingResult

Create successful embedding result.

def failure_result(
    cls,
    job_id: str,
    error: str,
    duration: float,
    metadata: dict[str, Any] | None = None
) -> BatchEmbeddingResult

Create failed embedding result.

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization.


Background worker for batch embedding generation.

Optimizes embedding generation through:

  • Batch processing (minimize API calls)
  • Cache integration (avoid redundant computations)
  • Progress tracking (resumable on failure)
  • Concurrent processing (parallel batches)

Example

from lexigram.ai.workers.batch_embedding import BatchEmbeddingWorker
from lexigram.contracts import VectorStoreProtocol
from lexigram.contracts.infra.tasks import TaskQueueProtocol
# Setup — resolve implementations via the DI container
vector_store = container.resolve(VectorStoreProtocol)
embedding_provider = container.resolve(EmbeddingClientProtocol)
queue = container.resolve(TaskQueueProtocol)
worker = BatchEmbeddingWorker(
vector_store=vector_store,
embedding_provider=embedding_provider,
queue=queue,
concurrency=3,
)
# Start worker
await worker.start()
# Submit embedding job
job_id = await worker.embed_batch(
chunks=chunks,
collection_name="my-docs",
model_name="text-embedding-ada-002",
batch_size=100,
)
# Check progress
progress = await worker.get_progress(job_id)
logger.info(f"Progress: {progress.progress_percent:.1f}%")
logger.info(f"Cache hit rate: {progress.cache_hit_rate:.1f}%")
# Stop worker
await worker.stop()
def __init__(
    vector_store: VectorStoreProtocol,
    embedding_provider: EmbeddingProvider,
    queue: TaskQueueProtocol,
    worker_id: str = 'batch-embedding',
    concurrency: int = 3,
    default_batch_size: int = 100,
    enable_cache: bool = True
)

Initialize batch embedding worker.

Parameters
ParameterTypeDescription
`vector_store`VectorStoreProtocolVector store for storing embeddings
`embedding_provider`EmbeddingProviderProvider for generating embeddings
`queue`TaskQueueProtocolTask queue for job management
`worker_id`strUnique worker identifier
`concurrency`intNumber of concurrent batch processing tasks
`default_batch_size`intDefault texts per batch
`enable_cache`boolEnable embedding cache
async def start() -> None

Start the embedding worker pool.

async def stop() -> None

Stop the embedding worker pool.

async def embed_batch(
    chunks: list[ChunkProtocol],
    collection_name: str,
    model_name: str = 'text-embedding-ada-002',
    batch_size: int | None = None,
    use_cache: bool = True,
    priority: int = 0
) -> str

Submit batch embedding job.

Parameters
ParameterTypeDescription
`chunks`list[ChunkProtocol]List of text chunks to embed
`collection_name`strVector store collection name
`model_name`strEmbedding model name
`batch_size`int | NoneTexts per batch (default: worker default)
`use_cache`boolUse embedding cache
`priority`intJobProtocol priority (higher = sooner)
Returns
TypeDescription
strJobProtocol ID for tracking
async def get_progress(job_id: str) -> BatchEmbeddingProgress | None

Get embedding progress for job.

def get_stats() -> dict[str, Any]

Get worker statistics.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report the health of this worker.

Parameters
ParameterTypeDescription
`timeout`floatUnused; present for protocol conformance.
Returns
TypeDescription
HealthCheckResultHEALTHY when the worker is running, UNHEALTHY otherwise.
async def clear_cache() -> None

Clear the embedding cache.


Actions for DLQ items.

Item in the dead letter queue.
def can_retry() -> bool

Check if item can be retried.

def calculate_backoff(base_delay: int = 60) -> int

Calculate exponential backoff delay in seconds.

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization.


Statistics for DLQ.
def to_dict() -> dict[str, Any]

Convert to dictionary.


Worker for handling failed tasks and retry logic.

Monitors failed jobs and implements intelligent retry strategies based on failure categorization.

Example

from lexigram.ai.workers import DeadLetterQueueWorker
from lexigram.contracts.infra.tasks import TaskQueueProtocol
# Setup
main_queue = TaskQueueProtocol()
dlq_queue = TaskQueueProtocol() # Separate queue for DLQ items
worker = DeadLetterQueueWorker(
main_queue=main_queue,
dlq_queue=dlq_queue,
worker_id="dlq",
check_interval=60, # Check every minute
)
# Register error notification handler
async def notify_on_permanent_failure(item: DLQItem):
await send_alert(f"Permanent failure: {item.job_id}")
worker.set_notification_handler(notify_on_permanent_failure)
# Start worker
await worker.start()
# Manually retry a failed job
await worker.retry_item(job_id="abc123")
# Get DLQ statistics
stats = await worker.get_stats()
logger.info(f"Total failed items: {stats.total_items}")
# Stop worker
await worker.stop()
def __init__(
    main_queue: TaskQueueProtocol,
    dlq_queue: TaskQueueProtocol | None = None,
    worker_id: str = 'dlq',
    check_interval: int = 60,
    max_retries: int = 5,
    base_backoff: int = 60
)

Initialize DLQ worker.

Parameters
ParameterTypeDescription
`main_queue`TaskQueueProtocolMain task queue to monitor
`dlq_queue`TaskQueueProtocol | NoneOptional separate queue for DLQ items
`worker_id`strUnique worker identifier
`check_interval`intInterval in seconds to check for failed jobs
`max_retries`intMaximum retry attempts per job
`base_backoff`intBase delay in seconds for exponential backoff
async def start() -> None

Start the DLQ worker.

async def stop() -> None

Stop the DLQ worker.

def set_notification_handler(handler: Callable[[DLQItem], Any]) -> None

Set notification handler for permanent failures.

Parameters
ParameterTypeDescription
`handler`Callable[[DLQItem], Any]Async callable that receives DLQItem
async def add_failed_job(
    job: JobProtocol,
    error: str
) -> None

Add a failed job to the DLQ.

Parameters
ParameterTypeDescription
`job`JobProtocolFailed job
`error`strError message
async def retry_item(job_id: str) -> bool

Manually retry a DLQ item.

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to retry
Returns
TypeDescription
boolTrue if retry was initiated, False otherwise
async def remove_item(job_id: str) -> bool

Remove item from DLQ (permanent deletion).

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to remove
Returns
TypeDescription
boolTrue if removed, False if not found
async def archive_item(job_id: str) -> bool

Archive item (mark as permanently failed).

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to archive
Returns
TypeDescription
boolTrue if archived, False if not found
async def get_stats() -> DLQStats

Get DLQ statistics.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report the health of this worker.

Parameters
ParameterTypeDescription
`timeout`floatUnused; present for protocol conformance.
Returns
TypeDescription
HealthCheckResultHEALTHY when the worker is running, UNHEALTHY otherwise.
async def get_items(
    category: FailureCategory | None = None,
    limit: int = 100
) -> list[dict[str, Any]]

Get DLQ items.

Parameters
ParameterTypeDescription
`category`FailureCategory | NoneOptional filter by failure category
`limit`intMaximum items to return
Returns
TypeDescription
list[dict[str, Any]]List of DLQ items as dictionaries

JobProtocol configuration for document ingestion.
def to_job_kwargs() -> dict[str, Any]

Convert to JobProtocol kwargs.


Document ingestion worker.
def __init__(
    vector_store: VectorStoreProtocol,
    queue: Any,
    worker_id: str = 'document-ingestion',
    concurrency: int = 3,
    default_chunking_config: ChunkingConfigDict | None = None,
    document_parser: DocumentParser | None = None
)

Initialize document ingestion worker.

Parameters
ParameterTypeDescription
`vector_store`VectorStoreProtocolVector store for storing document chunks
`queue`AnyTask queue for job management
`worker_id`strUnique worker identifier
`concurrency`intNumber of concurrent document processing tasks
`default_chunking_config`ChunkingConfigDict | NoneDefault chunking configuration
`document_parser`DocumentParser | NoneDocument parser to use (defaults to UniversalDocumentParser)
async def start() -> None

Start the ingestion worker pool.

async def stop() -> None

Stop the ingestion worker pool.

async def ingest_document(
    document_id: str,
    file_path: Path,
    collection_name: str,
    parser: DocumentParser | None = None,
    chunking_config: ChunkingConfigDict | None = None,
    metadata: dict[str, Any] | None = None,
    priority: int = 0,
    batch_size: int = 50
) -> str

Submit document for ingestion.

Parameters
ParameterTypeDescription
`document_id`strUnique document identifier
`file_path`PathPath to document file
`collection_name`strVector store collection name
`parser`DocumentParser | NoneOptional document parser (uses worker default if None)
`chunking_config`ChunkingConfigDict | NoneOptional chunking configuration
`metadata`dict[str, Any] | NoneOptional document metadata
`priority`intJobProtocol priority (higher = sooner)
`batch_size`intChunks per batch
Returns
TypeDescription
strJobProtocol ID for tracking
async def get_progress(job_id: str) -> IngestionProgress | None

Get ingestion progress for job.

def get_stats() -> dict[str, Any]

Get worker statistics.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report the health of this worker.

Parameters
ParameterTypeDescription
`timeout`floatUnused; present for protocol conformance.
Returns
TypeDescription
HealthCheckResultHEALTHY when the worker is running, UNHEALTHY otherwise.

Classify errors into failure categories.
def classify(
    error: str,
    _job: JobProtocol
) -> FailureCategory

Classify error into a failure category.

Parameters
ParameterTypeDescription
`error`strError message
`job`Original job
Returns
TypeDescription
FailureCategoryFailure category

Categories of failures.

Track progress of document ingestion.
property progress_percent() -> float

Calculate overall progress percentage.

def update(
    status: IngestionStatus | None = None,
    pages_processed: int | None = None,
    chunks_processed: int | None = None,
    error: str | None = None
) -> None

Update progress tracking.

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization.


Result of document ingestion.
def success_result(
    cls,
    document_id: str,
    chunks_created: int,
    duration: float,
    metadata: dict[str, Any] | None = None
) -> IngestionResult

Create successful ingestion result.

def failure_result(
    cls,
    document_id: str,
    error: str,
    duration: float,
    metadata: dict[str, Any] | None = None
) -> IngestionResult

Create failed ingestion result.

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization.


Result of a maintenance task execution.
def success(
    cls,
    task_name: str,
    task_type: MaintenanceTaskType,
    started_at: datetime,
    items_processed: int = 0,
    items_deleted: int = 0,
    metadata: dict[str, Any] | None = None
) -> MaintenanceResult

Create successful result.

def failure(
    cls,
    task_name: str,
    task_type: MaintenanceTaskType,
    started_at: datetime,
    error: str
) -> MaintenanceResult

Create failed result.

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization.


Maintenance task status.

Configuration for a maintenance task.
def should_run() -> bool

Check if task should run based on schedule.

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization.


Types of maintenance tasks.

Background worker for scheduled maintenance tasks.

Executes periodic maintenance operations including:

  • Vector store index optimization
  • Cache cleanup and TTL enforcement
  • Old document cleanup
  • Metrics aggregation

Example

from lexigram.ai.workers import MaintenanceWorker
from lexigram.contracts import VectorStoreProtocol
# Setup — resolve via the DI container
vector_store = container.resolve(VectorStoreProtocol)
worker = MaintenanceWorker(
vector_store=vector_store,
worker_id="maintenance",
)
# Register maintenance tasks
worker.register_task(
name="optimize_indexes",
task_type=MaintenanceTaskType.INDEX_OPTIMIZATION,
handler=lambda: vector_store.optimize_indexes(),
interval_seconds=3600, # Every hour
)
worker.register_task(
name="cleanup_old_docs",
task_type=MaintenanceTaskType.DOCUMENT_CLEANUP,
handler=lambda: cleanup_documents(days=30),
schedule_cron="0 2 * * *", # 2 AM daily
)
# Start worker
await worker.start()
# Get statistics
stats = worker.get_stats()
logger.info(f"Tasks run: {stats['total_runs']}")
# Stop worker
await worker.stop()
def __init__(
    vector_store: VectorStoreProtocol | None = None,
    worker_id: str = 'maintenance',
    check_interval: int = 60
)

Initialize maintenance worker.

Parameters
ParameterTypeDescription
`vector_store`VectorStoreProtocol | NoneOptional vector store for index optimization
`worker_id`strUnique worker identifier
`check_interval`intInterval in seconds to check for tasks to run
async def start() -> None

Start the maintenance worker.

async def stop() -> None

Stop the maintenance worker.

def register_task(
    name: str,
    task_type: MaintenanceTaskType,
    handler: Callable[[], Any],
    schedule_cron: str | None = None,
    interval_seconds: int | None = None,
    enabled: bool = True,
    timeout: float = 300.0
) -> None

Register a maintenance task.

Parameters
ParameterTypeDescription
`name`strUnique task name
`task_type`MaintenanceTaskTypeType of maintenance task
`handler`Callable[[], Any]Async or sync callable to execute
`schedule_cron`str | NoneCron expression for scheduling
`interval_seconds`int | NoneSimple interval in seconds
`enabled`boolWhether task is enabled
`timeout`floatTask timeout in seconds
def unregister_task(name: str) -> None

Unregister a maintenance task.

def enable_task(name: str) -> None

Enable a maintenance task.

def disable_task(name: str) -> None

Disable a maintenance task.

async def run_task_now(name: str) -> MaintenanceResult

Run a specific maintenance task immediately.

Parameters
ParameterTypeDescription
`name`strTask name to run
Returns
TypeDescription
MaintenanceResultMaintenance result
def get_stats() -> dict[str, Any]

Get worker statistics.

def get_task_status(name: str) -> dict[str, Any] | None

Get status of a specific task.

def get_all_tasks() -> list[dict[str, Any]]

Get status of all registered tasks.

def get_recent_results(limit: int = 10) -> list[dict[str, Any]]

Get recent maintenance results.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Report the health of this worker.

Parameters
ParameterTypeDescription
`timeout`floatUnused; present for protocol conformance.
Returns
TypeDescription
HealthCheckResultHEALTHY when the worker is running, UNHEALTHY otherwise.
async def optimize_vector_indexes() -> dict[str, Any]

Optimize vector store indexes.

This is a built-in handler for index optimization.

async def cleanup_old_embeddings_cache(max_age_days: int = 30) -> dict[str, Any]

Clean up old embeddings from cache.

This is a built-in handler for cache cleanup.

Parameters
ParameterTypeDescription
`max_age_days`intMaximum age in days for cached embeddings
async def aggregate_metrics() -> dict[str, Any]

Aggregate and rollup metrics.

This is a built-in handler for metrics aggregation.


Payload fired when a worker finishes a job.

Payload fired when a worker starts a job.

Payload fired when a maintenance worker runs a maintenance task.

Configuration for AI background workers.

Loaded from the ai_workers: key in application.yaml, with environment variable overrides via LEX_AI_WORKERS__* prefix.

def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Check config is safe for the target environment.


Module for AI background workers.

Registers the WorkersProvider which wires the ingestion, embedding, DLQ, and maintenance workers.

Usage

from lexigram.ai.workers.config import WorkersConfig
@module(
imports=[WorkersModule.configure(WorkersConfig(...))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: WorkersConfig | None = None,
    enable_scheduler: bool = True,
    **kwargs: Any
) -> DynamicModule

Create a WorkersModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`WorkersConfig | NoneOptional WorkersConfig.
`enable_scheduler`boolStart the background maintenance scheduler that handles DLQ retries and worker health checks. Defaults to ``True``; set to ``False`` to disable all scheduled tasks (e.g. when running in a worker-only process). **kwargs: Additional keyword arguments forwarded to the provider.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(
    cls,
    config: WorkersConfig | None = None
) -> DynamicModule

Create a WorkersModule suitable for unit and integration testing.

Uses in-memory or no-op worker implementations with minimal side effects. The background scheduler is disabled by default to prevent timer interference between tests.

Parameters
ParameterTypeDescription
`config`WorkersConfig | NoneOptional WorkersConfig override. Uses safe test defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Provider for AI background workers.

Reads WorkersConfig and registers workers for batch embedding, ingestion, maintenance, and DLQ handling.

Lifecycle: - register — binds worker singletons into the container. - boot — resolves workers and starts each as a background async task. - shutdown — calls stop() on every worker, then awaits each background task to ensure graceful queue draining.

def __init__(
    config: WorkersConfig | None = None,
    enable_scheduler: bool = True,
    **kwargs: Any
) -> None
def from_config(
    cls,
    config: WorkersConfig,
    **context
) -> WorkersProvider

Factory method for DI container setup.

async def register(container: ContainerRegistrarProtocol) -> None

Register workers with the DI container.

async def boot(container: ContainerResolverProtocol) -> None

Resolve workers from the container and start each as a background task.

Each worker’s start() coroutine is wrapped in an asyncio.Task so it runs concurrently. Task references are stored in _tasks to prevent garbage collection (Ruff RUF006).

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolContainer resolver for resolving worker instances.
async def shutdown() -> None

Stop all workers gracefully and await their background tasks.

Calls stop() on every worker in reverse start order, then waits for each background task to finish (with a 30 s timeout per task).

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check — always healthy (in-process domain provider).

No external backend to ping.

Parameters
ParameterTypeDescription
`timeout`floatIgnored for in-process providers.
Returns
TypeDescription
HealthCheckResultAlways HEALTHY — no external backend to ping.

Raised when a Dead Letter Queue operation fails.

Raised when a maintenance task operation fails.

Base exception for all worker-related errors.