API Reference
Protocols
Section titled “Protocols”ProgressTrackerProtocol
Section titled “ProgressTrackerProtocol”Protocol for tracking and broadcasting task progress.
Implementations must be safe for concurrent use — multiple coroutines
may call update on the same task_id simultaneously, and multiple
consumers may subscribe to the same task.
The subscribe method returns a live AsyncIterator that yields a
new ProgressSnapshot every time update, complete, or
fail is called for the given task. The iterator terminates
automatically when the task reaches a terminal state (COMPLETE or
FAILED).
Example
async def export_data(tracker: ProgressTrackerProtocol) -> None: records = await fetch_all() total = len(records) for i, record in enumerate(records, start=1): await process(record) await tracker.update("export-1", i, total, f"Row {i}/{total}") await tracker.complete("export-1", "Export finished")Record incremental progress for a task.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `current` | int | Units completed so far. |
| `total` | int | Total units to process (0 = unknown). |
| `message` | str | Optional human-readable status message. |
Mark a task as successfully completed.
Closes all active subscriptions for task_id after broadcasting the
terminal ProgressSnapshot.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `result` | str | Optional human-readable completion message. |
Mark a task as failed.
Closes all active subscriptions for task_id after broadcasting the
terminal ProgressSnapshot.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `error` | str | Description of the failure. |
Return the current progress state for a task.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| Type | Description |
|---|---|
| ProgressSnapshot | None | The most recent ProgressSnapshot, or ``None`` if the task is not known to this tracker. |
Subscribe to live progress updates for a task.
Returns an async iterator that yields a new ProgressSnapshot
each time the task’s state changes. The iterator stops automatically
when the task reaches a terminal state (COMPLETE or FAILED).
If the task is already in a terminal state when subscribe is
called, the iterator yields the final snapshot once and then stops.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task to observe. |
| Type | Description |
|---|---|
| AsyncIterator[ProgressSnapshot] | An AsyncIterator of ProgressSnapshot objects. |
Example
async for snap in tracker.subscribe("job-42"): print(f"{snap.percent:.1f}% — {snap.message}")TaskExecutorProtocol
Section titled “TaskExecutorProtocol”Protocol for task executor implementations.
Executors process tasks from a queue by dispatching them to registered handlers.
Example
class TaskExecutorProtocol: async def execute(self, task: Task) -> Any: handler = self._handlers.get(task.name) if handler: return await handler(task.payload) raise UnknownTaskError(task.name)Execute a task using the registered handler.
| Parameter | Type | Description |
|---|---|---|
| `task` | Any | Task to execute. |
| Type | Description |
|---|---|
| Any | Task execution result. |
| Exception | Description |
|---|---|
| UnknownTaskError | If no handler registered for task. |
Register a handler for a task type.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| `handler` | Any | Async callable to handle the task. |
Get the handler for a task type.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| Type | Description |
|---|---|
| Any | None | Handler if registered, None otherwise. |
TaskQueueProtocol
Section titled “TaskQueueProtocol”Protocol for task queue implementations.
This defines the contract for background task queues (Redis-based, PostgreSQL-based, in-memory, etc.).
Example
class RedisTaskQueue: async def enqueue(self, task: Task) -> str: task_id = str(uuid4()) await self._redis.rpush("tasks", task.to_json()) return task_id
async def dequeue(self) -> Task | None: data = await self._redis.lpop("tasks") return Task.from_json(data) if data else NoneAdd a task to the queue.
| Parameter | Type | Description |
|---|---|---|
| `task` | Any | Task instance to enqueue. |
| Type | Description |
|---|---|
| Result[str, TaskQueueError] | Ok(task_id) on success; Err(TaskQueueError) if the queue reports a recoverable failure (e.g. full queue, invalid payload). Only infrastructure failures (lost connection) are raised as exceptions. |
Remove and return the next task.
| Type | Description |
|---|---|
| Any | None | Next Task or None if queue is empty. |
Get the number of tasks in the queue.
| Type | Description |
|---|---|
| int | Number of pending tasks. |
Clear all tasks from the queue.
Get a job by ID.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | Unique job identifier. |
| Type | Description |
|---|---|
| Any | None | Job data or None if not found. |
Delete a job from the queue.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | Unique job identifier. |
| Type | Description |
|---|---|
| bool | True if the job was deleted. |
Count queued jobs.
| Parameter | Type | Description |
|---|---|---|
| `queue_name` | str | Queue to inspect. |
| `status` | Any | None | Optional status filter. |
| Type | Description |
|---|---|
| int | Number of matching jobs. |
Acknowledge successful processing of a dequeued task.
Signals that the task has been processed successfully and can be permanently discarded from any in-flight tracking.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to acknowledge. |
Negative-acknowledge a dequeued task.
Signals that the task could not be processed. The task is optionally requeued for another processing attempt.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to negative-acknowledge. |
| `requeue` | bool | If True, return the task to the queue for retry. If False, discard the task permanently. |
Close the queue connection and cleanup resources.
Classes
Section titled “Classes”BackgroundTaskManager
Section titled “BackgroundTaskManager”Tracks and shuts down background asyncio tasks.
Inject as a singleton from any DI provider. Call shutdown()
during application stop to cancel all in-flight tasks.
Example
manager = BackgroundTaskManager()task = manager.track(some_coroutine())# … later …await manager.shutdown(timeout=30.0)manager = BackgroundTaskManager()task = manager.track(some_coroutine())# … later …await manager.shutdown(timeout=30.0)Schedule coro as an asyncio task and track the handle.
The task is removed from the tracked set automatically via a
done_callback so completed tasks don’t accumulate.
| Parameter | Type | Description |
|---|---|---|
| `coro` | Awaitable[T] | Any awaitable to wrap in a task. |
| Type | Description |
|---|---|
| asyncio.Task[T] | The created asyncio.Task. |
Like track but attach a human-readable name for logs.
Named tasks appear in shutdown() timeout warnings so operators
can identify which work is still running at stop time.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Label surfaced in observability/log output. |
| `coro` | Awaitable[T] | Any awaitable to wrap in a task. |
| Type | Description |
|---|---|
| asyncio.Task[T] | The created asyncio.Task. |
Number of tasks that have not yet completed.
Cancel all pending tracked tasks and await them.
After cancellation each task is given up to timeout seconds
collectively to finish (honour CancelledError and finalise any
finally blocks). Tasks that do not stop within the timeout
are logged by name; no exception is raised.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Seconds to wait for all tasks to finish after cancellation. Default 30 seconds. |
BranchStep
Section titled “BranchStep”Branch based on a condition evaluated against the input.
Example
workflow = BranchStep( condition=lambda data: data[“amount”] > 1000, if_true=TaskStep(“manual_review”, manual_review), if_false=TaskStep(“auto_approve”, auto_approve), )
CacheBackendProgressStore
Section titled “CacheBackendProgressStore”Distributed progress store backed by the platform ``CacheBackendProtocol``.
Stores serialised ProgressInfo objects in the configured cache backend so that progress is visible to all worker processes. TTL-based expiry is delegated to the backend.
| Parameter | Type | Description |
|---|---|---|
| `cache` | Platform cache backend (Redis, Memcached, …). | |
| `ttl` | Seconds before a progress entry expires. Defaults to 3600 (1 hour). |
Persist a ProgressInfo entry to the cache backend.
| Parameter | Type | Description |
|---|---|---|
| `info` | ProgressInfo | Progress state to save. |
Return the ProgressInfo for the given job ID, or None if not found.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | Unique identifier for the job. |
Remove the progress entry for the given job ID.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | Unique identifier for the job. |
Return all active (incomplete) progress entries.
Note This implementation scans the key prefix via
get, which may be slow on large datasets. Consider using a Redis-specific implementation with SCAN for production at scale.
| Type | Description |
|---|---|
| list[ProgressInfo] | List of incomplete ProgressInfo. |
CacheBackendResultStore
Section titled “CacheBackendResultStore”Distributed result store backed by the platform ``CacheBackendProtocol``.
Stores serialised JobResult objects in the configured cache backend so that results are visible to all worker processes. TTL-based expiry is delegated to the backend.
| Parameter | Type | Description |
|---|---|---|
| `cache` | Platform cache backend (Redis, Memcached, …). | |
| `ttl` | Seconds before a result entry expires. Defaults to 3600 (1 hour). | |
| `poll_interval` | Seconds between polls in wait. Defaults to 0.5. |
Persist a job result in the cache backend with TTL.
Return the stored result, or None if absent or expired.
Remove the result from the cache and return True if it existed.
Poll the cache until the result appears or the timeout elapses.
No-op: the cache backend handles TTL expiry automatically.
CronExpression
Section titled “CronExpression”Wrapper for cron expression parsing.
Provides a unified interface for cron expression handling with optional croniter dependency.
Note This class requires the
croniteroptional dependency. See lexigram.tasks.scheduling.cron for installation instructions. Instantiation raises :exc:ImportErrorwhencroniteris absent.
Example
cron = CronExpression("0 9 * * 1-5") # Weekdays at 9amnext_run = cron.get_next()Initialize cron expression
| Parameter | Type | Description |
|---|---|---|
| `expression` | str | Cron expression string |
| Exception | Description |
|---|---|
| ImportError | If croniter is not installed |
| ValueError | If expression is invalid |
Get next execution time
| Parameter | Type | Description |
|---|---|---|
| `base_time` | float | None | Base time (defaults to current time) |
| Type | Description |
|---|---|
| float | Unix timestamp of next execution |
Get previous execution time
| Parameter | Type | Description |
|---|---|---|
| `base_time` | float | None | Base time (defaults to current time) |
| Type | Description |
|---|---|
| float | Unix timestamp of previous execution |
DeadLetterQueue
Section titled “DeadLetterQueue”Manages failed jobs for inspection, retry, and cleanup.
Add a failed job to the DLQ.
| Parameter | Type | Description |
|---|---|---|
| `job` | JobProtocol | The failed job. |
| `error` | str | Error message. |
| `traceback` | str | None | Full traceback string. **metadata: Additional metadata. |
Get a specific failure record.
List failed jobs, optionally filtered by job name.
| Parameter | Type | Description |
|---|---|---|
| `limit` | int | Maximum records to return. |
| `job_name` | str | None | Filter by job type name. |
| Type | Description |
|---|---|
| list[FailureRecord] | List of failure records (newest first). |
Remove a job from DLQ and prepare it for retry.
Returns the job with status reset to PENDING, or None if not found.
Retry all failed jobs. Returns list of jobs to re-enqueue.
Permanently remove a failure from the DLQ.
Purge old failures.
| Parameter | Type | Description |
|---|---|---|
| `older_than_hours` | float | None | Remove records older than this. Defaults to retention_hours. |
| Type | Description |
|---|---|
| int | Number of records purged. |
Clear all DLQ entries. Returns count cleared.
Return the number of currently tracked failure records.
Get DLQ statistics.
DistributedLockProtocol
Section titled “DistributedLockProtocol”In-memory lock for preventing duplicate task execution within a single process.
Note: This is NOT a distributed lock. For true distributed locking across multiple processes/servers, use a Redis-based or database-based lock implementation.
Example
lock_manager = LockManager()async with lock_manager.acquire("user:123:sync", timeout=60): await sync_user_data(user_id=123)Initialize in-memory lock.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Unique lock key |
| `timeout` | float | Lock timeout in seconds (auto-release) |
| `locks_dict` | dict[str, tuple[float, asyncio.Lock]] | Shared locks dictionary (from LockManager) |
| `locks_lock` | asyncio.Lock | Shared lock for accessing locks_dict (from LockManager) |
Acquire the lock.
| Type | Description |
|---|---|
| bool | True if lock was acquired, False if already held |
Try to acquire lock without waiting.
| Type | Description |
|---|---|
| bool | True if acquired, False if already held |
Release the lock.
ExecutionRecord
Section titled “ExecutionRecord”Record of a single task execution.
Serialize the execution record to a plain dictionary.
FailureRecord
Section titled “FailureRecord”Record of a failed job in the DLQ.
Serialize the failure record to a plain dictionary.
Deserialize a failure record from a plain dictionary.
| Parameter | Type | Description |
|---|---|---|
| `data` | dict[str, Any] | Dictionary as produced by to_dict. |
| Type | Description |
|---|---|
| FailureRecord | Reconstructed FailureRecord. |
GlobalRateLimiter
Section titled “GlobalRateLimiter”System-wide throughput cap applied across all queues.
Delegates to a single TokenBucket for consistent rate enforcement regardless of which queue a task originates from.
Example
limiter = GlobalRateLimiter(rate=100, per=1.0) # 100 tasks / secawait limiter.acquire()limiter = GlobalRateLimiter(rate=100, per=1.0) # 100 tasks / secawait limiter.acquire()Initialise the global rate limiter.
| Parameter | Type | Description |
|---|---|---|
| `rate` | int | Maximum allowed tasks per time period. |
| `per` | float | Length of the time period in seconds. |
| `burst` | int | None | Maximum burst capacity (defaults to *rate*). |
Block until a global token is available.
Non-blocking global token acquisition.
| Type | Description |
|---|---|
| bool | ``True`` if a token was obtained; ``False`` if globally rate-limited. |
HandlerRegistry
Section titled “HandlerRegistry”Registry for task handlers that also satisfies the TaskExecutorProtocol contract.
Maps task names to their handler functions. Handlers can be
registered and retrieved by name. The execute method dispatches
a task to its registered handler, making this class usable wherever a
TaskExecutorProtocol is expected.
Extends Registry for unified introspection and lifecycle hooks.
Example
handler = registry.get("send_email")```pythonInitialize handler registry.
Register a task handler.
Can be used as a decorator or called directly.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Task name. |
| `handler` | Callable[Ellipsis, Awaitable[Any]] | None | Handler function (if not using as decorator). |
| Type | Description |
|---|---|
| Callable[Ellipsis, Any] | Handler function or decorator. |
Example
# As decorator@registry.register("task_name")async def my_handler(...): pass
# Direct callregistry.register("task_name", my_handler)Register a handler for a task type (TaskExecutorProtocol method).
This method satisfies the register_handler requirement defined in
lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a
protocol-mandated entry point that delegates directly to register
so that HandlerRegistry can be used wherever a TaskExecutorProtocol
is expected without any additional adapter. Do not remove — the
protocol contract requires this exact signature.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| `handler` | Any | Async callable to handle the task. |
Get the handler for a task type (TaskExecutorProtocol method).
This method satisfies the get_handler requirement defined in
lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a
protocol-mandated entry point that delegates directly to get
so that HandlerRegistry can be used wherever a TaskExecutorProtocol
is expected without any additional adapter. Do not remove — the
protocol contract requires this exact signature.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| Type | Description |
|---|---|
| Any | None | Handler if registered, ``None`` otherwise. |
Execute a task using the registered handler (TaskExecutorProtocol protocol).
Resolves the handler for task.name, then invokes it with
task.args and task.kwargs. Supports both sync and async
handlers.
| Parameter | Type | Description |
|---|---|---|
| `task` | Any | Task object with ``name``, ``args``, and ``kwargs`` attributes. |
| Type | Description |
|---|---|
| Any | Result returned by the handler. |
| Exception | Description |
|---|---|
| TaskNotFoundError | If no handler is registered for the task name. |
Remove handler from registry.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Task name. |
| Type | Description |
|---|---|
| bool | True if handler was removed. |
List all registered handler names.
| Type | Description |
|---|---|
| list[str] | List of task names. |
Get handlers as dictionary.
| Type | Description |
|---|---|
| dict[str, Callable[Ellipsis, Awaitable[Any]]] | Dictionary of all handlers. |
InMemoryProgressStore
Section titled “InMemoryProgressStore”In-memory progress store for development/testing.
Persist a ProgressInfo entry in the in-memory store.
Return the ProgressInfo for the given job ID, or None if not found.
Remove the progress entry for the given job ID if it exists.
Return all progress entries whose current count has not yet reached total.
InMemoryProgressTracker
Section titled “InMemoryProgressTracker”Thread-safe, in-memory ProgressTrackerProtocol implementation.
Maintains the latest ProgressSnapshot for each tracked task and
fans out every state change to all active subscribers via
asyncio.Queue.
Subscriptions are kept alive until the task reaches a terminal state
(COMPLETE or FAILED), at which point the iterator closes
automatically. Subscribers that call subscribe after the task has
already finished receive the terminal snapshot once and then stop.
All public methods are safe to call concurrently from multiple coroutines.
Record incremental progress and broadcast to all subscribers.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `current` | int | Units completed so far. |
| `total` | int | Total units to process (0 means unknown). |
| `message` | str | Optional human-readable status line. |
Mark a task as successfully completed and close all subscriptions.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `result` | str | Optional human-readable completion message. |
Mark a task as failed and close all subscriptions.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `error` | str | Description of the failure. |
Return the current progress state for a task.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| Type | Description |
|---|---|
| ProgressSnapshot | None | The most recent ProgressSnapshot, or ``None`` if the task has not been seen by this tracker. |
Subscribe to live progress updates for a task.
Returns an async generator that yields one ProgressSnapshot per state change. The generator stops automatically when the task reaches a terminal state. If the task is already finished, the terminal snapshot is yielded once and the generator stops immediately.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task to observe. |
| Type | Description |
|---|---|
| AsyncGenerator[ProgressSnapshot, None] | An async generator of ProgressSnapshot objects. |
InMemoryResultStore
Section titled “InMemoryResultStore”In-memory result store with TTL-based expiry.
Suitable for development and single-process deployments. For production, implement a Redis-backed store.
Persist a job result, evicting the oldest entry when at capacity.
Return the result for the given job ID, or None if absent or expired.
Remove the result for the given job ID and return True if it existed.
Wait for a result to appear, with timeout.
Remove all entries whose TTL has elapsed and return the count removed.
Return store statistics including size, capacity, and waiter count.
JobProtocol
Section titled “JobProtocol”Represents a background job with comprehensive status tracking
Jobs are executable work units that can be queued, scheduled, tracked, and retried. They support priority ordering, dependency management, and timeout control.
Attributes: id: Unique job identifier name: Human-readable job name/type args: Positional arguments for job handler kwargs: Keyword arguments for job handler priority: Execution priority (higher = more important) max_retries: Maximum retry attempts on failure timeout: Execution timeout in seconds depends_on: List of job IDs this job depends on status: Current job status created_at: Timestamp when job was created started_at: Timestamp when job started executing completed_at: Timestamp when job finished retry_count: Current number of retry attempts last_error: Most recent error message result: JobProtocol execution result scheduled_at: Timestamp for scheduled execution cron_expression: Cron expression for recurring jobs
Check if job is pending execution
Check if job is currently running
Check if job completed successfully
Check if job failed
Check if job can be retried (hasn’t exceeded max_retries)
Get job execution duration in milliseconds
Mark job as running and record start time
Mark job as completed with result
| Parameter | Type | Description |
|---|---|---|
| `result` | JobResult | JobProtocol execution result |
Mark job as failed with error message
| Parameter | Type | Description |
|---|---|---|
| `error` | str | Error message describing the failure |
Mark job as retrying and increment retry count
Mark job as cancelled
Convert job to dictionary for serialization
| Type | Description |
|---|---|
| dict[str, Any] | Dictionary representation of job |
Create job from dictionary
| Parameter | Type | Description |
|---|---|---|
| `data` | dict[str, Any] | Dictionary representation of job |
| Type | Description |
|---|---|
| JobProtocol | JobProtocol instance |
JobResult
Section titled “JobResult”Result of a job execution
Captures the outcome of job execution including success status, return data, error information, and performance metrics.
Create a successful result
| Parameter | Type | Description |
|---|---|---|
| `data` | Any | Return value from the job |
| `duration` | float | Execution duration in seconds |
| Type | Description |
|---|---|
| JobResult | JobResult with success=True |
Create a failure result
| Parameter | Type | Description |
|---|---|---|
| `error` | str | Error message describing the failure |
| `retry_count` | int | Number of retry attempts made |
| `duration` | float | Execution duration in seconds |
| Type | Description |
|---|---|
| JobResult | JobResult with success=False |
Convert result to dictionary for serialization
Create result from dictionary
JobStatus
Section titled “JobStatus”JobProtocol execution status.
JobTemplateProtocol
Section titled “JobTemplateProtocol”Template for creating scheduled jobs
JobTemplateProtocol defines a reusable job configuration that can be instantiated multiple times with different IDs.
Attributes: name: JobProtocol name/type args: Positional arguments kwargs: Keyword arguments priority: Execution priority max_retries: Maximum retry attempts timeout: Execution timeout depends_on: JobProtocol dependencies template
Create a JobProtocol instance from this template
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID (generated if empty) |
| Type | Description |
|---|---|
| JobProtocol | New JobProtocol instance |
Create template from existing job
| Parameter | Type | Description |
|---|---|---|
| `job` | JobProtocol | JobProtocol to convert to template |
| Type | Description |
|---|---|
| JobTemplateProtocol | JobTemplateProtocol instance |
LoggingMiddleware
Section titled “LoggingMiddleware”Logs task start, completion, and failures.
Log task start before execution.
Log task completion status and duration after execution.
Log the error when a task raises an exception.
MemoryTaskQueue
Section titled “MemoryTaskQueue”In-memory task queue implementation
Uses Python’s heapq for priority-based ordering. Tasks are stored in memory and will be lost on process restart. Thread-safe using asyncio.Lock for concurrent access.
Characteristics:
- Storage: In-memory heap-based priority queue
- Concurrency: asyncio.Lock for thread safety
- Persistence: No persistence (volatile)
- Use Case: Development, testing, single-process applications
- Limitations: No persistence, single-process only
Example
queue = MemoryTaskQueue()await queue.enqueue(task)task = await queue.dequeue()Initialize memory task queue
Attach an optional hook registry after provider boot wiring.
Add a task to the queue, returning Ok(task_id) or Err(TaskError) on failure.
Tasks are stored in a min-heap with negative priority to achieve max-heap behavior (highest priority first).
| Parameter | Type | Description |
|---|---|---|
| `task` | JobProtocol | Task to enqueue |
| Type | Description |
|---|---|
| Result[str, TaskError] | Ok containing the enqueued task id, or Err containing a TaskError. |
Remove and return the next task from the queue
Returns the highest priority task. If multiple tasks have the same priority, returns the oldest (FIFO). Respects task delay - tasks with future available_at are skipped.
| Type | Description |
|---|---|
| JobProtocol | None | Next task or None if queue is empty or no tasks are available |
Acknowledge successful processing of a task
Removes the task from the in-flight tracking set.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to acknowledge |
Negative-acknowledge a task, optionally requeuing it
Removes the task from in-flight tracking. If requeue is True the task is pushed back onto the priority heap for retry.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to negative-acknowledge |
| `requeue` | bool | If True, return the task to the queue for retry |
Get the number of tasks in the queue
| Type | Description |
|---|---|
| int | Number of pending tasks |
Clear all tasks from the queue
Close the queue connection
For memory queue, there are no external resources to clean up.
MetricsMiddleware
Section titled “MetricsMiddleware”Collects per-task-type execution metrics.
Record execution metrics for the completed task.
Increment failure counter for the task type that raised an error.
Return aggregated execution statistics keyed by task name.
NamedTaskConfig
Section titled “NamedTaskConfig”Configuration for a single named task queue backend.
Used in TaskConfig.backends to declare multiple task queues that the framework registers as Named DI bindings.
| Parameter | Type | Description |
|---|---|---|
| `name` | Unique backend identifier. Used as the Named() DI key. | |
| `primary` | Whether this is the primary backend. Primary backends also receive the unnamed TaskQueueProtocol binding. | |
| `type` | Backend type ('memory', 'redis', 'rabbitmq', 'postgres'). | |
| `redis_url` | Redis connection URL (when type='redis'). | |
| `amqp_url` | AMQP connection URL (when type='rabbitmq'). | |
| `postgres_dsn` | Postgres DSN (when type='postgres'). | |
| `queue_name` | Queue name for this backend instance. |
Example
backends: - name: primary primary: true type: redis redis_url: "${REDIS_URL}" - name: notifications type: rabbitmq amqp_url: "${AMQP_URL}"backends: - name: primary primary: true type: redis redis_url: "${REDIS_URL}" - name: notifications type: rabbitmq amqp_url: "${AMQP_URL}"Inject by name
queue: Annotated[TaskQueueProtocol, Named("notifications")]queue: Annotated[TaskQueueProtocol, Named("notifications")]OnErrorPolicy
Section titled “OnErrorPolicy”Controls what happens when ScheduledWorker.run_cycle raises.
Attributes:
LOG_AND_CONTINUE: Log the error, then resume the normal schedule.
BACKOFF: Log the error and double the sleep before the next cycle
(up to 10 * interval_seconds).
STOP: Log the error and stop the worker permanently.
Priority
Section titled “Priority”Task priority levels for queue ordering (higher number = higher priority).
The scale is 0–20 and is separate from the kernel’s
ProviderPriority (0–100). Workers dequeue higher-priority
jobs before lower-priority ones.
Values: LOW (0): Background tasks that can be deferred indefinitely. NORMAL (5): Default priority for routine task processing. HIGH (10): Time-sensitive tasks processed before NORMAL queue. CRITICAL (20): Highest priority; reserved for urgent system tasks.
ProgressInfo
Section titled “ProgressInfo”Current progress state of a task.
Return True when the current count has reached the total.
Return seconds elapsed since the task started, or 0 if not started.
ProgressSnapshot
Section titled “ProgressSnapshot”Immutable point-in-time view of a task's progress.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | Unique identifier for the tracked task. | |
| `current` | Number of units completed so far. | |
| `total` | Total number of units to complete (0 means unknown). | |
| `status` | Current lifecycle state. | |
| `message` | Human-readable status message. | |
| `error` | Error description when ``status`` is ``FAILED``; empty otherwise. |
ProgressStatus
Section titled “ProgressStatus”Lifecycle states for a tracked task.
Values are lowercase strings so they round-trip cleanly through JSON.
ProgressStore
Section titled “ProgressStore”Abstract storage for progress state.
Save progress state.
Get progress for a job.
Remove progress entry.
List all active (incomplete) progress entries.
ProgressTracker
Section titled “ProgressTracker”Track and report task progress.
Injected into task handlers to enable progress reporting.
Update progress.
| Parameter | Type | Description |
|---|---|---|
| `current` | int | Current progress count. |
| `total` | int | Total items to process. |
| `message` | str | Human-readable status message. **metadata: Additional metadata. |
Mark progress as complete.
Mark progress as failed.
Return the current ProgressInfo snapshot.
QueueRateLimiter
Section titled “QueueRateLimiter”Per-queue rate limiter.
Manages independent rate limits for named task queues, delegating each limit to a TokenBucket.
Example
limiter = QueueRateLimiter()limiter.add_limit("emails", rate=5, per=1.0) # 5 / seclimiter.add_limit("reports", rate=1, per=60.0) # 1 / min
await limiter.acquire("emails")limiter = QueueRateLimiter()limiter.add_limit("emails", rate=5, per=1.0) # 5 / seclimiter.add_limit("reports", rate=1, per=60.0) # 1 / min
await limiter.acquire("emails")Register a rate limit for a named queue.
| Parameter | Type | Description |
|---|---|---|
| `queue` | str | Queue name to apply the limit to. |
| `rate` | int | Allowed requests per time period. |
| `per` | float | Length of the time period in seconds. |
| `burst` | int | None | Maximum burst size (defaults to *rate*). |
Block until a token is available for queue.
| Parameter | Type | Description |
|---|---|---|
| `queue` | str | Queue name; a no-op for unlisted queues. |
Attempt a non-blocking token acquisition for queue.
| Parameter | Type | Description |
|---|---|---|
| `queue` | str | Queue name. |
| Type | Description |
|---|---|
| bool | ``True`` if a token was obtained or the queue has no limit; ``False`` if the queue is currently rate-limited. |
ResultStore
Section titled “ResultStore”Abstract result store backend.
Store a job result.
Get a result by job ID. Returns None if not found or expired.
Delete a result. Returns True if deleted.
Wait for a result with timeout.
Remove expired results. Returns count removed.
ScheduledJob
Section titled “ScheduledJob”Represents a scheduled job with cron expression
Attributes: job_template: Template for creating job instances cron_expression: Cron expression string next_run: Unix timestamp of next execution enabled: Whether job is enabled for execution
Calculate the next run time based on cron expression
| Type | Description |
|---|---|
| float | Unix timestamp of next execution |
ScheduledWorker
Section titled “ScheduledWorker”Periodic worker base class.
Subclass and implement run_cycle. The base class handles the scheduling loop, error policy, graceful shutdown, and task tracking via BackgroundTaskManager.
Class-level defaults (override per subclass or via constructor)
class MyWorker(ScheduledWorker): interval_seconds = 60.0 initial_delay_seconds = 0.0 max_jitter_seconds = 0.0 on_error_policy = OnErrorPolicy.LOG_AND_CONTINUEclass MyWorker(ScheduledWorker): interval_seconds = 60.0 initial_delay_seconds = 0.0 max_jitter_seconds = 0.0 on_error_policy = OnErrorPolicy.LOG_AND_CONTINUEConstructor injection is the preferred way to customise at runtime
worker = MyWorker( task_manager=task_manager, interval_seconds=30.0, on_error_policy=OnErrorPolicy.BACKOFF,)worker = MyWorker( task_manager=task_manager, interval_seconds=30.0, on_error_policy=OnErrorPolicy.BACKOFF,)Initialise the worker.
| Parameter | Type | Description |
|---|---|---|
| `task_manager` | BackgroundTaskManager | Shared BackgroundTaskManager that owns the task handle and participates in framework shutdown. |
| `interval_seconds` | float | None | Override interval_seconds. |
| `initial_delay_seconds` | float | None | Override initial_delay_seconds. |
| `max_jitter_seconds` | float | None | Override max_jitter_seconds. |
| `on_error_policy` | OnErrorPolicy | None | Override on_error_policy. |
Start the worker loop as a tracked background task.
Safe to call only once. Calling start() while already running
is a no-op (logs a warning).
Signal the worker to stop and wait for it to finish.
Graceful: the stop event is set so the sleeping phase wakes up early, and the current cycle (if any) is allowed to finish. If the task has not completed within grace_seconds, it is forcibly cancelled.
| Parameter | Type | Description |
|---|---|---|
| `grace_seconds` | float | Seconds to wait for a clean stop before cancelling. Default is 2 seconds. |
Override to implement one unit of periodic work.
Called once per interval. Must not swallow asyncio.CancelledError.
StepResult
Section titled “StepResult”Result from a single workflow step.
TaskBackendConfig
Section titled “TaskBackendConfig”Configuration for task queue backends.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskChain
Section titled “TaskChain”Sequential task execution — output of each step feeds the next.
Example
chain = TaskChain([ TaskStep(“parse”, parse_data), TaskStep(“validate”, validate_data), TaskStep(“save”, save_data), ]) result = await chain.execute(raw_input)
Execute all steps in sequence, threading output of each step into the next.
If a workflow_id and state_store were provided at construction,
this method will attempt to resume from the last saved checkpoint — any
step whose name appears in the persisted state’s completed steps will be
skipped, and the last completed step’s output will seed the next step.
TaskChord
Section titled “TaskChord”Fan-out then collect — parallel steps + callback with all results.
Example
chord = TaskChord( steps=[TaskStep(“a”, fetch_a), TaskStep(“b”, fetch_b)], callback=TaskStep(“merge”, merge_results), ) result = await chord.execute()
TaskCompletedEvent
Section titled “TaskCompletedEvent”Emitted when a task finishes successfully.
Attributes: task_id: Unique identifier of the completed task. worker_id: Identifier of the worker that executed the task.
TaskCompletedHook
Section titled “TaskCompletedHook”Payload fired when a task finishes successfully.
Attributes: task_name: Name or type label of the completed task. task_id: Unique identifier of the task instance.
TaskConfig
Section titled “TaskConfig”Hierarchical root configuration for Lexigram Tasks.
Attributes: name: Configuration name (default: “tasks”) enabled: Whether tasks module is enabled backend: Task queue backend configuration worker: Task worker settings scheduler: Task scheduler settings retry: Task retry configuration rate_limit: Task rate limiting timeout: Task timeout settings extra: Extra configuration
Check if running in production environment.
Check if running in development environment.
Check if running in test environment.
Validate task configuration for the given environment.
| Parameter | Type | Description |
|---|---|---|
| `env` | Environment | None | Target environment; resolved from ``LEX_ENV`` when ``None``. |
| Type | Description |
|---|---|
| list[ConfigIssue] | List of ConfigIssue instances. |
Build a single-backend TaskConfig from a NamedTaskConfig entry.
Used internally by TasksProvider to create per-backend configs.
| Parameter | Type | Description |
|---|---|---|
| `entry` | NamedTaskConfig | The named backend entry to materialise. |
| Type | Description |
|---|---|
| TaskConfig | A TaskConfig configured for the single named backend. |
TaskDashboard
Section titled “TaskDashboard”Aggregated task observability dashboard.
Record a task execution.
Get dashboard summary with throughput and error rates.
Get per-task-type statistics with latency percentiles.
Get recent execution history.
Get recent errors grouped by task type.
Reset all counters and history.
TaskEnqueuedHook
Section titled “TaskEnqueuedHook”Payload fired when a task is placed on the queue.
Attributes: task_name: Name or type label of the enqueued task. queue_name: Name of the queue the task was added to.
TaskExecutionContext
Section titled “TaskExecutionContext”Context passed through the middleware pipeline.
TaskFailedEvent
Section titled “TaskFailedEvent”Emitted when a task fails after all retry attempts are exhausted.
Attributes: task_id: Unique identifier of the failed task. error: Human-readable description of the failure reason.
TaskFailedHook
Section titled “TaskFailedHook”Payload fired when a task raises an unhandled exception.
Attributes: task_name: Name or type label of the failed task. task_id: Unique identifier of the task instance. reason: Short description or exception message.
TaskGroup
Section titled “TaskGroup”Parallel task execution — all steps run concurrently.
Example
group = TaskGroup([ TaskStep(“email”, send_email), TaskStep(“sms”, send_sms), ]) result = await group.execute(user_data)
TaskHealth
Section titled “TaskHealth”Health status for task processing system
Aggregates health information from queue, workers, and scheduler.
Check if system is healthy
Calculate job success rate
Convert to dictionary for serialization
TaskMiddleware
Section titled “TaskMiddleware”Abstract base for task middleware.
Called before task execution. Override to modify or log.
Called after task execution. Override to observe or log.
Called when task execution fails.
TaskMiddlewarePipeline
Section titled “TaskMiddlewarePipeline”Chains middleware around task execution.
Append a middleware instance to the pipeline.
Execute a job through the middleware pipeline.
| Parameter | Type | Description |
|---|---|---|
| `job` | JobProtocol | The job to execute. |
| `handler` | Any | The async handler function. |
| Type | Description |
|---|---|
| JobResult | JobResult from execution. |
TaskProvider
Section titled “TaskProvider”Task processing provider for Lexigram Framework
TaskProvider integrates task processing with the Lexigram Framework, providing dependency injection, lifecycle management, and health monitoring.
Example
from lexigram.app import Applicationfrom lexigram.tasks import TaskProvider, MemoryTaskQueue
app = Application()queue = MemoryTaskQueue()provider = TaskProvider(queue, worker_count=4)app.use(provider)
await app.start()Initialize task provider
| Parameter | Type | Description |
|---|---|---|
| `queue` | TaskQueueProtocol | TaskQueueProtocol implementation to use |
| `worker_count` | int | Number of workers to create |
| `enable_scheduler` | bool | Whether to enable job scheduling |
Create a TaskProvider from a TaskConfig.
Context kwargs may include a pre-built ‘queue’. If not provided, a MemoryTaskQueue is created as default.
Register task services with the DI container.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerRegistrarProtocol | DI registrar to bind task services into. |
Start the task provider.
Called by the framework on application startup after all registrations.
Shutdown the task provider gracefully
Check task provider health.
In multi-backend mode the overall status is the worst individual status
across all named queues. TaskQueueProtocol does not expose a
health_check() method, so liveness is probed via get_task_count().
| Type | Description |
|---|---|
| HealthCheckResult | HealthCheckResult with current status and metrics. |
Register a task handler
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type |
| `handler` | Callable[Ellipsis, Any] | Async handler function |
Register a decorated task function for scheduling.
Validates that the task has required metadata (_task_name and _cron). Registration is atomic: if scheduling fails, the handler is rolled back.
| Parameter | Type | Description |
|---|---|---|
| `task_func` | Any | Task function decorated with @scheduled |
| Exception | Description |
|---|---|
| TaskRegistrationError | If _task_name or _cron is missing |
Enqueue a job for processing
Schedule a job with cron expression
Remove a scheduled job
Get worker pool statistics
Get scheduled jobs information
Refresh handler mappings in all workers.
Call this after registering new handlers to ensure workers can execute jobs registered after pool creation.
TaskQueuedEvent
Section titled “TaskQueuedEvent”Emitted when a task is accepted into a processing queue.
Attributes: task_id: Unique identifier of the queued task. queue_name: Name of the queue that accepted the task.
TaskRateLimitConfig
Section titled “TaskRateLimitConfig”Configuration for rate limiting.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskScheduler
Section titled “TaskScheduler”Task scheduler for cron-like job scheduling
TaskScheduler manages periodic job execution using cron expressions. Jobs are checked at regular intervals and enqueued when due.
Example
scheduler = TaskScheduler()
# Schedule jobtemplate = JobTemplateProtocol(name="cleanup", args=())scheduler.schedule_job(template, "0 0 * * *") # Daily at midnight
# Start schedulerawait scheduler.start_scheduler(queue.enqueue)Initialize task scheduler.
| Parameter | Type | Description |
|---|---|---|
| `check_interval` | float | How often to check for due jobs (seconds). |
| `store` | SchedulerStore | None | Optional SchedulerStore for durable persistence of job definitions and next-run times. When provided, all schedule/unschedule operations are persisted and the scheduler can be restored after restart via restore_from_store. |
Schedule a job with cron expression (sync, no persistence).
For the primary async version with store persistence, use schedule_job.
| Parameter | Type | Description |
|---|---|---|
| `job_template` | JobTemplateProtocol | JobProtocol | JobProtocol template or JobProtocol to schedule |
| `cron_expression` | str | Cron expression (e.g., "0 9 * * 1-5") |
| `job_id` | str | None | Optional job ID, generated if not provided |
| Type | Description |
|---|---|
| str | JobProtocol ID for the scheduled job |
Schedule a job and persist to store if one is configured.
This is the primary async method. For a synchronous variant without persistence, use schedule_job_sync.
| Parameter | Type | Description |
|---|---|---|
| `job_template` | JobTemplateProtocol | JobProtocol | JobProtocol template or JobProtocol to schedule. |
| `cron_expression` | str | Cron expression (e.g., ``"0 9 * * 1-5"``). |
| `job_id` | str | None | Optional job ID, generated if not provided. |
| Type | Description |
|---|---|
| str | JobProtocol ID for the scheduled job. |
Restore all persisted job definitions from the configured store.
Loads every job persisted by previous scheduler instances and re-registers them in scheduled_jobs. Call this once during startup before invoking start_scheduler to resume the full scheduling state.
| Type | Description |
|---|---|
| int | Number of jobs restored. |
| Exception | Description |
|---|---|
| RuntimeError | If no SchedulerStore was provided at construction. |
Remove a scheduled job (sync, no store deletion).
For the primary async version with store deletion, use unschedule_job.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID to remove |
| Type | Description |
|---|---|
| bool | True if job was removed, False if not found |
Remove a scheduled job and delete it from the store.
This is the primary async method. For a synchronous variant without store deletion, use unschedule_job_sync.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID to remove. |
| Type | Description |
|---|---|
| bool | True if job was removed, False if not found. |
Get all scheduled jobs
| Type | Description |
|---|---|
| list[ScheduledJob] | List of all scheduled jobs |
Get a specific scheduled job
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID |
| Type | Description |
|---|---|
| ScheduledJob | None | ScheduledJob or None if not found |
Start the scheduler loop
| Parameter | Type | Description |
|---|---|---|
| `enqueue_callback` | Callable[[JobProtocol], Any] | Async function to enqueue jobs when they are due |
Stop the scheduler gracefully
Enable a scheduled job
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID |
| Type | Description |
|---|---|
| bool | True if job was enabled |
Disable a scheduled job
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID |
| Type | Description |
|---|---|
| bool | True if job was disabled |
Get next run times for all enabled scheduled jobs
| Type | Description |
|---|---|
| dict[str, float] | Dictionary mapping job ID to next run timestamp |
TaskSchedulerConfig
Section titled “TaskSchedulerConfig”Configuration for task scheduler.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskStartedHook
Section titled “TaskStartedHook”Payload fired when a worker picks up and begins executing a task.
Attributes: task_name: Name or type label of the task being executed. task_id: Unique identifier of the task instance.
TaskStep
Section titled “TaskStep”A single step in a workflow.
| Parameter | Type | Description |
|---|---|---|
| `name` | Human-readable step name. | |
| `handler` | Async callable that takes input and returns output. | |
| `on_error` | Optional error handler. | |
| `timeout` | Optional timeout in seconds. |
TaskTimeoutConfig
Section titled “TaskTimeoutConfig”Configuration for task timeout behavior.
TaskWorker
Section titled “TaskWorker”Individual task worker for job execution
TaskWorker processes jobs from a queue using registered handlers. Each worker runs independently and tracks its own statistics.
Example
worker = TaskWorker("worker-1", queue, handlers)await worker.start()# ... worker processes jobs ...await worker.stop()Initialize task worker.
| Parameter | Type | Description |
|---|---|---|
| `worker_id` | str | Unique worker identifier. |
| `queue` | TaskQueueProtocol | TaskQueueProtocol to pull jobs from. |
| `handler_registry` | dict[str, Callable[Ellipsis, Any]] | Dict mapping job names to handler functions. |
| `config` | TaskWorkerConfig | None | Optional worker configuration controlling timeouts and execution behaviour. Defaults to TaskWorkerConfig() when not provided. |
| `services` | TaskWorkerServices | None | Optional bundle of infrastructure services (DLQ, result store, progress store, dashboard, retry policy, task manager, idempotency manager, logger). Use TaskWorkerServices to compose only the services you need; absent services degrade gracefully. Pass a ``logger`` via ``services.logger`` to inject a custom bound logger; otherwise a module logger is created and bound to ``worker_id`` automatically. |
| `middleware_pipeline` | TaskMiddlewarePipeline | None | Optional middleware pipeline for cross-cutting concerns. |
Get worker statistics
| Type | Description |
|---|---|
| WorkerJobStats | Current worker statistics |
Check if worker is currently processing a job
| Type | Description |
|---|---|
| bool | True if worker has a job in progress |
Get a ProgressTracker for a job (opt-in for handlers).
Handlers can use this to report progress. The worker must have a progress_store configured.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | The job ID to track progress for |
| Type | Description |
|---|---|
| ProgressTracker | None | ProgressTracker instance if progress_store is configured, None otherwise |
TaskWorkerConfig
Section titled “TaskWorkerConfig”Configuration for task workers.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskWorkerServices
Section titled “TaskWorkerServices”Optional infrastructure services injected into a TaskWorker.
Groups all optional infrastructure dependencies so that the
TaskWorker constructor stays focused on the mandatory parameters.
All fields default to None — omitted services degrade gracefully.
Attributes:
container: DI container for resolving task dependencies.
dead_letter_queue: DLQ for routing permanently-failed jobs.
result_store: Persistent store for job execution results.
progress_store: Store for in-progress task progress updates.
dashboard: Observability dashboard for worker and job metrics.
retry_policy: Retry policy implementation (from lexigram-resilience).
task_manager: Kernel task manager for critical shutdown registration.
idempotency_manager: Manager for at-most-once job execution.
logger: Optional logger instance.
TasksModule
Section titled “TasksModule”Background task workers, scheduling, and async job queues.
Call configure to configure the task-processing subsystem.
Usage (in-memory queue for development)
from lexigram.tasks.backends.memory import MemoryTaskQueue
@module( imports=[TasksModule.configure(queue=MemoryTaskQueue())])class AppModule(Module): passfrom lexigram.tasks.backends.memory import MemoryTaskQueue
@module( imports=[TasksModule.configure(queue=MemoryTaskQueue())])class AppModule(Module): passUsage (Redis queue for production)
from lexigram.tasks.backends.redis import RedisTaskQueue
@module( imports=[ TasksModule.configure( queue=RedisTaskQueue(url="redis://localhost"), worker_count=4, ) ])class AppModule(Module): passfrom lexigram.tasks.backends.redis import RedisTaskQueue
@module( imports=[ TasksModule.configure( queue=RedisTaskQueue(url="redis://localhost"), worker_count=4, ) ])class AppModule(Module): passCreate a TasksModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `queue` | Any | None | TaskQueueProtocol implementation. Defaults to MemoryTaskQueue. |
| `worker_count` | int | Number of concurrent task workers. |
| `enable_scheduler` | bool | Whether to enable the job scheduler. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
Scope task handler classes into a feature module.
Registers the given task handler classes as providers so they are discovered and wired by the task executor. The parent module graph must already include TasksModule.configure — this does not create a new queue or worker pool.
Uses the anonymous token pattern so both configure() and
scope() can coexist in the same compiled graph without a
ModuleDuplicateError.
Example
@module( imports=[ TasksModule.configure(queue=RedisTaskQueue(url=...)), TasksModule.scope(SendEmailTask, GenerateReportTask), ])class NotificationFeatureModule(Module): pass@module( imports=[ TasksModule.configure(queue=RedisTaskQueue(url=...)), TasksModule.scope(SendEmailTask, GenerateReportTask), ])class NotificationFeatureModule(Module): pass| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule scoped to this feature. |
Return an in-memory TasksModule for unit testing.
Uses a MemoryTaskQueue with a single worker and no job scheduler.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule backed by an in-memory task queue. |
TimeoutMiddleware
Section titled “TimeoutMiddleware”Enforces execution timeout on tasks.
Uses asyncio.wait_for to enforce timeout on task execution.
Compute and store the effective timeout in the execution context metadata.
Execute handler with timeout enforcement.
| Parameter | Type | Description |
|---|---|---|
| `handler` | Any | The handler to execute |
| `job` | JobProtocol | The job being executed |
| `ctx` | TaskExecutionContext | Execution context |
| `resolved_args` | list[Any] | None | Pre-resolved positional arguments (from DI) |
| `resolved_kwargs` | dict[str, Any] | None | Pre-resolved keyword arguments (from DI) |
| Type | Description |
|---|---|
| JobResult | JobResult from execution |
UniqueTask
Section titled “UniqueTask”Decorator for unique task execution.
Prevents duplicate execution of the same task using in-memory locks.
Example
lock_manager = LockManager()@unique_task( lock_manager=lock_manager, key_func=lambda user_id: f"sync_user:{user_id}", timeout=3600)async def sync_user(user_id: str): await sync_user_data(user_id)Initialize unique task decorator.
| Parameter | Type | Description |
|---|---|---|
| `lock_manager` | LockManager | LockManager instance for creating locks |
| `key_func` | Callable[[Any], str] | Function to generate lock key from task args |
| `timeout` | float | Lock timeout in seconds |
| `skip_if_locked` | bool | If True, skip execution if lock held; if False, wait |
WorkerJobStats
Section titled “WorkerJobStats”Worker statistics for monitoring performance.
Tracks execution metrics for an individual worker including job counts, timing, and activity tracking.
Get worker uptime in seconds.
Record job execution statistics.
| Parameter | Type | Description |
|---|---|---|
| `result` | JobResult | JobProtocol execution result. |
WorkerPool
Section titled “WorkerPool”Pool of task workers with monitoring and scaling
WorkerPool manages multiple TaskWorker instances for concurrent job processing. Supports dynamic scaling and aggregate statistics.
Example
pool = WorkerPool(queue, handlers, size=4)await pool.start()stats = pool.get_pool_stats()await pool.scale_to(8) # Scale upawait pool.stop()Initialize worker pool
| Parameter | Type | Description |
|---|---|---|
| `queue` | TaskQueueProtocol | TaskQueueProtocol for workers to pull from |
| `handler_registry` | dict[str, Callable[Ellipsis, Any]] | HandlerRegistry | Dict mapping job names to handlers, or HandlerRegistry for dynamic lookups (allows handlers registered after pool creation). |
| `size` | int | Number of workers to create |
| `task_manager` | TaskManagerProtocol | None | Optional kernel task manager for critical task registration during worker lifecycle. |
| `dead_letter_queue` | DeadLetterQueue | None | Optional shared DLQ for permanently failed jobs. |
| `container` | Any | Optional DI container for resolving task dependencies at runtime. |
Refresh handlers from registry if using dynamic registry.
Start the worker pool
Creates and starts all workers in the pool.
Stop the worker pool gracefully.
| Parameter | Type | Description |
|---|---|---|
| `drain` | bool | If ``True``, wait for each worker to finish its current job before stopping. Defaults to ``False`` for a fast cancel-and-stop. |
| `timeout` | float | None | Per-worker seconds to wait when *drain* is ``True``. ``None`` means wait indefinitely. Defaults to 30 seconds. |
Get statistics for all workers
| Type | Description |
|---|---|
| list[WorkerJobStats] | List of WorkerJobStats for each worker |
Get aggregate pool statistics
| Type | Description |
|---|---|
| dict[str, Any] | Dictionary with pool-wide statistics |
Scale the worker pool to a new size
Dynamically add or remove workers.
| Parameter | Type | Description |
|---|---|---|
| `new_size` | int | Target number of workers |
WorkflowResult
Section titled “WorkflowResult”Result from a complete workflow execution.
WorkflowStatus
Section titled “WorkflowStatus”Enumeration of possible workflow execution states.
Functions
Section titled “Functions”Create a sequential TaskChain from the given steps.
Each step receives the output of the previous step as its input.
| Parameter | Type | Description |
|---|---|---|
| `stop_on_error` | bool | Whether to abort the pipeline on the first failure. Defaults to ``True``. |
Example
result = await chain( TaskStep("validate", validate_input), TaskStep("process", process_data), TaskStep("persist", save_to_db),).execute(raw_data)result = await chain( TaskStep("validate", validate_input), TaskStep("process", process_data), TaskStep("persist", save_to_db),).execute(raw_data)Decorate an async function to add a ``.delay()`` background-dispatch method.
The decorated function is unchanged when called directly — it still returns
a coroutine that must be await-ed. The .delay() method is the only
addition; it schedules the coroutine as a background asyncio.Task
and stores the reference to prevent premature garbage collection.
| Parameter | Type | Description |
|---|---|---|
| `fn` | Callable[_P, Coroutine[Any, Any, _R]] | An async function (coroutine function) to wrap. |
| Type | Description |
|---|---|
| _DelayedCallable[_P, _R] | A _DelayedCallable that is callable exactly like ``fn`` but also exposes ``.delay(*args, **kwargs) -> asyncio.Task``. |
Example
@delayasync def flush_audit_log(batch: list[AuditEntry]) -> None: await audit_store.save_many(batch)
# Direct call (awaited in the current task):await flush_audit_log(batch=entries)
# Background dispatch (does not block):flush_audit_log.delay(batch=entries)distributed_lock
Section titled “distributed_lock”Context manager for in-memory locks.
Deprecated: Use LockManager.acquire() instead for better DI integration.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Unique lock key |
| `timeout` | float | Lock timeout in seconds |
| `lock_manager` | LockManager | None | Optional LockManager instance (creates new if None) |
| Type | Description |
|---|---|
| AsyncGenerator[InMemoryLock, None] | InMemoryLock instance |
Example
manager = LockManager() # Should be from DI containerasync with distributed_lock("resource:123", lock_manager=manager): await process_resource(123)scheduled
Section titled “scheduled”Decorator to define a scheduled task.
| Parameter | Type | Description |
|---|---|---|
| `cron` | str | Cron expression (e.g., "0 9 * * *") |
| `name` | str | None | Task name |
| `priority` | Priority | int | Task priority |
| `max_retries` | int | Maximum retry attempts |
| `timeout` | float | None | Execution timeout |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Any] | Decorated task function with scheduling metadata |
Decorator to define a task.
Converts a regular function into a task that can be enqueued and executed by workers.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | None | Task name (defaults to function name) |
| `priority` | Priority | int | Task priority (HIGH, NORMAL, LOW) |
| `max_retries` | int | Maximum retry attempts on failure |
| `timeout` | float | None | Execution timeout in seconds |
| `queue` | str | Queue name for this task |
| `idempotency_key` | str | None | Optional key for idempotent execution |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Any] | Decorated function with .delay() and .apply_async() methods |
Exceptions
Section titled “Exceptions”QueueFullError
Section titled “QueueFullError”The queue has reached its maximum capacity and cannot accept new tasks.
The caller should back off and retry after a delay, or apply backpressure.
TaskCancelledError
Section titled “TaskCancelledError”Raised when a task is cancelled
TaskError
Section titled “TaskError”Base exception for all task-related errors
TaskExecutionError
Section titled “TaskExecutionError”Raised when a task execution fails
TaskNotFoundError
Section titled “TaskNotFoundError”Raised when a task cannot be found
TaskTimeoutError
Section titled “TaskTimeoutError”Raised when a task execution exceeds timeout
TaskValidationError
Section titled “TaskValidationError”Raised when task parameters are invalid