API Reference
Protocols
Section titled “Protocols”ProgressTrackerProtocol
Section titled “ProgressTrackerProtocol”Protocol for tracking and broadcasting task progress.
Implementations must be safe for concurrent use — multiple coroutines
may call update on the same task_id simultaneously, and multiple
consumers may subscribe to the same task.
The subscribe method returns a live AsyncIterator that yields a
new ProgressSnapshot every time update, complete, or
fail is called for the given task. The iterator terminates
automatically when the task reaches a terminal state (COMPLETE or
FAILED).
Example
async def export_data(tracker: ProgressTrackerProtocol) -> None: records = await fetch_all() total = len(records) for i, record in enumerate(records, start=1): await process(record) await tracker.update("export-1", i, total, f"Row {i}/{total}") await tracker.complete("export-1", "Export finished")Record incremental progress for a task.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `current` | int | Units completed so far. |
| `total` | int | Total units to process (0 = unknown). |
| `message` | str | Optional human-readable status message. |
Mark a task as successfully completed.
Closes all active subscriptions for task_id after broadcasting the
terminal ProgressSnapshot.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `result` | str | Optional human-readable completion message. |
Mark a task as failed.
Closes all active subscriptions for task_id after broadcasting the
terminal ProgressSnapshot.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `error` | str | Description of the failure. |
async def get(task_id: str) -> ProgressSnapshot | None
Return the current progress state for a task.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| Type | Description |
|---|---|
| ProgressSnapshot | None | The most recent ProgressSnapshot, or ``None`` if the task is not known to this tracker. |
def subscribe(task_id: str) -> AsyncIterator[ProgressSnapshot]
Subscribe to live progress updates for a task.
Returns an async iterator that yields a new ProgressSnapshot
each time the task’s state changes. The iterator stops automatically
when the task reaches a terminal state (COMPLETE or FAILED).
If the task is already in a terminal state when subscribe is
called, the iterator yields the final snapshot once and then stops.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task to observe. |
| Type | Description |
|---|---|
| AsyncIterator[ProgressSnapshot] | An AsyncIterator of ProgressSnapshot objects. |
Example
async for snap in tracker.subscribe("job-42"): print(f"{snap.percent:.1f}% — {snap.message}")TaskExecutorProtocol
Section titled “TaskExecutorProtocol”Protocol for task executor implementations.
Executors process tasks from a queue by dispatching them to registered handlers.
Example
class TaskExecutorProtocol: async def execute(self, task: Task) -> Any: handler = self._handlers.get(task.name) if handler: return await handler(task.payload) raise UnknownTaskError(task.name)Execute a task using the registered handler.
| Parameter | Type | Description |
|---|---|---|
| `task` | Any | Task to execute. |
| Type | Description |
|---|---|
| Any | Task execution result. |
| Exception | Description |
|---|---|
| UnknownTaskError | If no handler registered for task. |
Register a handler for a task type.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| `handler` | Any | Async callable to handle the task. |
Get the handler for a task type.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| Type | Description |
|---|---|
| Any | None | Handler if registered, None otherwise. |
TaskQueueProtocol
Section titled “TaskQueueProtocol”Protocol for task queue implementations.
This defines the contract for background task queues (Redis-based, PostgreSQL-based, in-memory, etc.).
Example
class RedisTaskQueue: async def enqueue(self, task: Task) -> str: task_id = str(uuid4()) await self._redis.rpush("tasks", task.to_json()) return task_id
async def dequeue(self) -> Task | None: data = await self._redis.lpop("tasks") return Task.from_json(data) if data else NoneAdd a task to the queue.
| Parameter | Type | Description |
|---|---|---|
| `task` | Any | Task instance to enqueue. |
| Type | Description |
|---|---|
| Result[str, TaskQueueError] | Ok(task_id) on success; Err(TaskQueueError) if the queue reports a recoverable failure (e.g. full queue, invalid payload). Only infrastructure failures (lost connection) are raised as exceptions. |
Remove and return the next task.
| Type | Description |
|---|---|
| Any | None | Next Task or None if queue is empty. |
Get the number of tasks in the queue.
| Type | Description |
|---|---|
| int | Number of pending tasks. |
Clear all tasks from the queue.
Acknowledge successful processing of a dequeued task.
Signals that the task has been processed successfully and can be permanently discarded from any in-flight tracking.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to acknowledge. |
Negative-acknowledge a dequeued task.
Signals that the task could not be processed. The task is optionally requeued for another processing attempt.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to negative-acknowledge. |
| `requeue` | bool | If True, return the task to the queue for retry. If False, discard the task permanently. |
Close the queue connection and cleanup resources.
Classes
Section titled “Classes”BranchStep
Section titled “BranchStep”Branch based on a condition evaluated against the input.
Example
workflow = BranchStep( condition=lambda data: data[“amount”] > 1000, if_true=TaskStep(“manual_review”, manual_review), if_false=TaskStep(“auto_approve”, auto_approve), )
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]
Evaluate the condition and execute either the true or false branch.
CacheBackendProgressStore
Section titled “CacheBackendProgressStore”Distributed progress store backed by the platform ``CacheBackendProtocol``.
Stores serialised ProgressInfo objects in the configured cache backend so that progress is visible to all worker processes. TTL-based expiry is delegated to the backend.
| Parameter | Type | Description |
|---|---|---|
| `cache` | Platform cache backend (Redis, Memcached, …). | |
| `ttl` | Seconds before a progress entry expires. Defaults to 3600 (1 hour). |
async def save(info: ProgressInfo) -> None
Persist a ProgressInfo entry to the cache backend.
| Parameter | Type | Description |
|---|---|---|
| `info` | ProgressInfo | Progress state to save. |
async def get(job_id: str) -> ProgressInfo | None
Return the ProgressInfo for the given job ID, or None if not found.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | Unique identifier for the job. |
Remove the progress entry for the given job ID.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | Unique identifier for the job. |
async def list_active() -> list[ProgressInfo]
Return all active (incomplete) progress entries.
Note This implementation scans the key prefix via
get, which may be slow on large datasets. Consider using a Redis-specific implementation with SCAN for production at scale.
| Type | Description |
|---|---|
| list[ProgressInfo] | List of incomplete ProgressInfo. |
CacheBackendResultStore
Section titled “CacheBackendResultStore”Distributed result store backed by the platform ``CacheBackendProtocol``.
Stores serialised JobResult objects in the configured cache backend so that results are visible to all worker processes. TTL-based expiry is delegated to the backend.
| Parameter | Type | Description |
|---|---|---|
| `cache` | Platform cache backend (Redis, Memcached, …). | |
| `ttl` | Seconds before a result entry expires. Defaults to 3600 (1 hour). | |
| `poll_interval` | Seconds between polls in wait. Defaults to 0.5. |
async def store( job_id: str, result: JobResult ) -> None
Persist a job result in the cache backend with TTL.
async def get(job_id: str) -> JobResult | None
Return the stored result, or None if absent or expired.
Remove the result from the cache and return True if it existed.
async def wait( job_id: str, *, timeout: float = 30.0, poll_interval: float | None = None ) -> JobResult | None
Poll the cache until the result appears or the timeout elapses.
No-op: the cache backend handles TTL expiry automatically.
CronExpression
Section titled “CronExpression”Wrapper for cron expression parsing.
Provides a unified interface for cron expression handling with optional croniter dependency.
Note This class requires the
croniteroptional dependency. See lexigram.tasks.scheduling.cron for installation instructions. Instantiation raises :exc:ImportErrorwhencroniteris absent.
Example
cron = CronExpression("0 9 * * 1-5") # Weekdays at 9amnext_run = cron.get_next()Initialize cron expression
| Parameter | Type | Description |
|---|---|---|
| `expression` | str | Cron expression string |
| Exception | Description |
|---|---|
| ImportError | If croniter is not installed |
| ValueError | If expression is invalid |
Get next execution time
| Parameter | Type | Description |
|---|---|---|
| `base_time` | float | None | Base time (defaults to current time) |
| Type | Description |
|---|---|
| float | Unix timestamp of next execution |
Get previous execution time
| Parameter | Type | Description |
|---|---|---|
| `base_time` | float | None | Base time (defaults to current time) |
| Type | Description |
|---|---|
| float | Unix timestamp of previous execution |
DeadLetterQueue
Section titled “DeadLetterQueue”Manages failed jobs for inspection, retry, and cleanup.
Add a failed job to the DLQ.
| Parameter | Type | Description |
|---|---|---|
| `job` | JobProtocol | The failed job. |
| `error` | str | Error message. |
| `traceback` | str | None | Full traceback string. **metadata: Additional metadata. |
def get(job_id: str) -> FailureRecord | None
Get a specific failure record.
def list_failed( *, limit: int = 50, job_name: str | None = None ) -> list[FailureRecord]
List failed jobs, optionally filtered by job name.
| Parameter | Type | Description |
|---|---|---|
| `limit` | int | Maximum records to return. |
| `job_name` | str | None | Filter by job type name. |
| Type | Description |
|---|---|
| list[FailureRecord] | List of failure records (newest first). |
Remove a job from DLQ and prepare it for retry.
Returns the job with status reset to PENDING, or None if not found.
Retry all failed jobs. Returns list of jobs to re-enqueue.
Permanently remove a failure from the DLQ.
Purge old failures.
| Parameter | Type | Description |
|---|---|---|
| `older_than_hours` | float | None | Remove records older than this. Defaults to retention_hours. |
| Type | Description |
|---|---|
| int | Number of records purged. |
Clear all DLQ entries. Returns count cleared.
Return the number of currently tracked failure records.
Get DLQ statistics.
DistributedLockProtocol
Section titled “DistributedLockProtocol”In-memory lock for preventing duplicate task execution within a single process.
Note: This is NOT a distributed lock. For true distributed locking across multiple processes/servers, use a Redis-based or database-based lock implementation.
Example
lock_manager = LockManager()async with lock_manager.acquire("user:123:sync", timeout=60): await sync_user_data(user_id=123)def __init__( key: str, timeout: float, locks_dict: dict[str, tuple[float, asyncio.Lock]], locks_lock: asyncio.Lock )
Initialize in-memory lock.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Unique lock key |
| `timeout` | float | Lock timeout in seconds (auto-release) |
| `locks_dict` | dict[str, tuple[float, asyncio.Lock]] | Shared locks dictionary (from LockManager) |
| `locks_lock` | asyncio.Lock | Shared lock for accessing locks_dict (from LockManager) |
Acquire the lock.
| Type | Description |
|---|---|
| bool | True if lock was acquired, False if already held |
Try to acquire lock without waiting.
| Type | Description |
|---|---|
| bool | True if acquired, False if already held |
Release the lock.
ExecutionRecord
Section titled “ExecutionRecord”Record of a single task execution.
FailureRecord
Section titled “FailureRecord”Record of a failed job in the DLQ.
Serialize the failure record to a plain dictionary.
def from_dict( cls, data: dict[str, Any] ) -> FailureRecord
Deserialize a failure record from a plain dictionary.
| Parameter | Type | Description |
|---|---|---|
| `data` | dict[str, Any] | Dictionary as produced by to_dict. |
| Type | Description |
|---|---|
| FailureRecord | Reconstructed FailureRecord. |
GlobalRateLimiter
Section titled “GlobalRateLimiter”System-wide throughput cap applied across all queues.
Delegates to a single TokenBucket for consistent rate enforcement regardless of which queue a task originates from.
Example
limiter = GlobalRateLimiter(rate=100, per=1.0) # 100 tasks / secawait limiter.acquire()Initialise the global rate limiter.
| Parameter | Type | Description |
|---|---|---|
| `rate` | int | Maximum allowed tasks per time period. |
| `per` | float | Length of the time period in seconds. |
| `burst` | int | None | Maximum burst capacity (defaults to *rate*). |
Block until a global token is available.
Non-blocking global token acquisition.
| Type | Description |
|---|---|
| bool | ``True`` if a token was obtained; ``False`` if globally rate-limited. |
HandlerRegistry
Section titled “HandlerRegistry”Registry for task handlers that also satisfies the TaskExecutorProtocol contract.
Maps task names to their handler functions. Handlers can be
registered and retrieved by name. The execute method dispatches
a task to its registered handler, making this class usable wherever a
TaskExecutorProtocol is expected.
Extends Registry for unified introspection and lifecycle hooks.
Example
registry = HandlerRegistry()
@registry.register("send_email")async def send_email(to: str, subject: str): ...
handler = registry.get("send_email")Initialize handler registry.
def register( name: str, handler: Callable[Ellipsis, Awaitable[Any]] | None = None ) -> Callable[Ellipsis, Any]
Register a task handler.
Can be used as a decorator or called directly.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Task name. |
| `handler` | Callable[Ellipsis, Awaitable[Any]] | None | Handler function (if not using as decorator). |
| Type | Description |
|---|---|
| Callable[Ellipsis, Any] | Handler function or decorator. |
Example
# As decorator@registry.register("task_name")async def my_handler(...): pass
# Direct callregistry.register("task_name", my_handler)Register a handler for a task type (TaskExecutorProtocol method).
This method satisfies the register_handler requirement defined in
lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a
protocol-mandated entry point that delegates directly to register
so that HandlerRegistry can be used wherever a TaskExecutorProtocol
is expected without any additional adapter. Do not remove — the
protocol contract requires this exact signature.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| `handler` | Any | Async callable to handle the task. |
Get the handler for a task type (TaskExecutorProtocol method).
This method satisfies the get_handler requirement defined in
lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a
protocol-mandated entry point that delegates directly to get
so that HandlerRegistry can be used wherever a TaskExecutorProtocol
is expected without any additional adapter. Do not remove — the
protocol contract requires this exact signature.
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type. |
| Type | Description |
|---|---|
| Any | None | Handler if registered, ``None`` otherwise. |
Execute a task using the registered handler (TaskExecutorProtocol protocol).
Resolves the handler for task.name, then invokes it with
task.args and task.kwargs. Supports both sync and async
handlers.
| Parameter | Type | Description |
|---|---|---|
| `task` | Any | Task object with ``name``, ``args``, and ``kwargs`` attributes. |
| Type | Description |
|---|---|
| Any | Result returned by the handler. |
| Exception | Description |
|---|---|
| TaskNotFoundError | If no handler is registered for the task name. |
Remove handler from registry.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Task name. |
| Type | Description |
|---|---|
| bool | True if handler was removed. |
List all registered handler names.
| Type | Description |
|---|---|
| list[str] | List of task names. |
Get handlers as dictionary.
| Type | Description |
|---|---|
| dict[str, Callable[Ellipsis, Awaitable[Any]]] | Dictionary of all handlers. |
InMemoryProgressStore
Section titled “InMemoryProgressStore”In-memory progress store for development/testing.
async def save(info: ProgressInfo) -> None
Persist a ProgressInfo entry in the in-memory store.
async def get(job_id: str) -> ProgressInfo | None
Return the ProgressInfo for the given job ID, or None if not found.
Remove the progress entry for the given job ID if it exists.
async def list_active() -> list[ProgressInfo]
Return all progress entries whose current count has not yet reached total.
InMemoryProgressTracker
Section titled “InMemoryProgressTracker”Thread-safe, in-memory ProgressTrackerProtocol implementation.
Maintains the latest ProgressSnapshot for each tracked task and
fans out every state change to all active subscribers via
asyncio.Queue.
Subscriptions are kept alive until the task reaches a terminal state
(COMPLETE or FAILED), at which point the iterator closes
automatically. Subscribers that call subscribe after the task has
already finished receive the terminal snapshot once and then stop.
All public methods are safe to call concurrently from multiple coroutines.
Record incremental progress and broadcast to all subscribers.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `current` | int | Units completed so far. |
| `total` | int | Total units to process (0 means unknown). |
| `message` | str | Optional human-readable status line. |
Mark a task as successfully completed and close all subscriptions.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `result` | str | Optional human-readable completion message. |
Mark a task as failed and close all subscriptions.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| `error` | str | Description of the failure. |
async def get(task_id: str) -> ProgressSnapshot | None
Return the current progress state for a task.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task. |
| Type | Description |
|---|---|
| ProgressSnapshot | None | The most recent ProgressSnapshot, or ``None`` if the task has not been seen by this tracker. |
def subscribe(task_id: str) -> AsyncGenerator[ProgressSnapshot, None]
Subscribe to live progress updates for a task.
Returns an async generator that yields one ProgressSnapshot per state change. The generator stops automatically when the task reaches a terminal state. If the task is already finished, the terminal snapshot is yielded once and the generator stops immediately.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | Unique identifier for the task to observe. |
| Type | Description |
|---|---|
| AsyncGenerator[ProgressSnapshot, None] | An async generator of ProgressSnapshot objects. |
InMemoryResultStore
Section titled “InMemoryResultStore”In-memory result store with TTL-based expiry.
Suitable for development and single-process deployments. For production, implement a Redis-backed store.
async def store( job_id: str, result: JobResult ) -> None
Persist a job result, evicting the oldest entry when at capacity.
async def get(job_id: str) -> JobResult | None
Return the result for the given job ID, or None if absent or expired.
Remove the result for the given job ID and return True if it existed.
async def wait( job_id: str, *, timeout: float = 30.0, poll_interval: float = 0.5 ) -> JobResult | None
Wait for a result to appear, with timeout.
Remove all entries whose TTL has elapsed and return the count removed.
Return store statistics including size, capacity, and waiter count.
JobProtocol
Section titled “JobProtocol”Represents a background job with comprehensive status tracking
Jobs are executable work units that can be queued, scheduled, tracked, and retried. They support priority ordering, dependency management, and timeout control.
Attributes: id: Unique job identifier name: Human-readable job name/type args: Positional arguments for job handler kwargs: Keyword arguments for job handler priority: Execution priority (higher = more important) max_retries: Maximum retry attempts on failure timeout: Execution timeout in seconds depends_on: List of job IDs this job depends on status: Current job status created_at: Timestamp when job was created started_at: Timestamp when job started executing completed_at: Timestamp when job finished retry_count: Current number of retry attempts last_error: Most recent error message result: JobProtocol execution result scheduled_at: Timestamp for scheduled execution cron_expression: Cron expression for recurring jobs
Check if job is pending execution
Check if job is currently running
Check if job completed successfully
Check if job failed
Check if job can be retried (hasn’t exceeded max_retries)
Get job execution duration in milliseconds
Mark job as running and record start time
def mark_completed(result: JobResult) -> None
Mark job as completed with result
| Parameter | Type | Description |
|---|---|---|
| `result` | JobResult | JobProtocol execution result |
Mark job as failed with error message
| Parameter | Type | Description |
|---|---|---|
| `error` | str | Error message describing the failure |
Mark job as retrying and increment retry count
Mark job as cancelled
Convert job to dictionary for serialization
| Type | Description |
|---|---|
| dict[str, Any] | Dictionary representation of job |
Create job from dictionary
| Parameter | Type | Description |
|---|---|---|
| `data` | dict[str, Any] | Dictionary representation of job |
| Type | Description |
|---|---|
| JobProtocol | JobProtocol instance |
JobResult
Section titled “JobResult”Result of a job execution
Captures the outcome of job execution including success status, return data, error information, and performance metrics.
def ok( cls, data: Any = None, duration: float = 0.0 ) -> JobResult
Create a successful result
| Parameter | Type | Description |
|---|---|---|
| `data` | Any | Return value from the job |
| `duration` | float | Execution duration in seconds |
| Type | Description |
|---|---|
| JobResult | JobResult with success=True |
def fail( cls, error: str, retry_count: int = 0, duration: float = 0.0 ) -> JobResult
Create a failure result
| Parameter | Type | Description |
|---|---|---|
| `error` | str | Error message describing the failure |
| `retry_count` | int | Number of retry attempts made |
| `duration` | float | Execution duration in seconds |
| Type | Description |
|---|---|
| JobResult | JobResult with success=False |
Convert result to dictionary for serialization
def from_dict( cls, data: dict[str, Any] ) -> JobResult
Create result from dictionary
JobStatus
Section titled “JobStatus”JobProtocol execution status.
JobTemplateProtocol
Section titled “JobTemplateProtocol”Template for creating scheduled jobs
JobTemplateProtocol defines a reusable job configuration that can be instantiated multiple times with different IDs.
Attributes: name: JobProtocol name/type args: Positional arguments kwargs: Keyword arguments priority: Execution priority max_retries: Maximum retry attempts timeout: Execution timeout depends_on: JobProtocol dependencies template
Create a JobProtocol instance from this template
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID (generated if empty) |
| Type | Description |
|---|---|
| JobProtocol | New JobProtocol instance |
def from_job( cls, job: JobProtocol ) -> JobTemplateProtocol
Create template from existing job
| Parameter | Type | Description |
|---|---|---|
| `job` | JobProtocol | JobProtocol to convert to template |
| Type | Description |
|---|---|
| JobTemplateProtocol | JobTemplateProtocol instance |
LoggingMiddleware
Section titled “LoggingMiddleware”Logs task start, completion, and failures.
async def before_execute(ctx: TaskExecutionContext) -> None
Log task start before execution.
async def after_execute(ctx: TaskExecutionContext) -> None
Log task completion status and duration after execution.
async def on_error( ctx: TaskExecutionContext, error: Exception ) -> None
Log the error when a task raises an exception.
MemoryTaskQueue
Section titled “MemoryTaskQueue”In-memory task queue implementation
Uses Python’s heapq for priority-based ordering. Tasks are stored in memory and will be lost on process restart. Thread-safe using asyncio.Lock for concurrent access.
Characteristics:
- Storage: In-memory heap-based priority queue
- Concurrency: asyncio.Lock for thread safety
- Persistence: No persistence (volatile)
- Use Case: Development, testing, single-process applications
- Limitations: No persistence, single-process only
Example
queue = MemoryTaskQueue()await queue.enqueue(task)task = await queue.dequeue()Initialize memory task queue
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None
Attach an optional hook registry after provider boot wiring.
async def enqueue(task: JobProtocol) -> Result[str, TaskError]
Add a task to the queue, returning Ok(task_id) or Err(TaskError) on failure.
Tasks are stored in a min-heap with negative priority to achieve max-heap behavior (highest priority first).
| Parameter | Type | Description |
|---|---|---|
| `task` | JobProtocol | Task to enqueue |
| Type | Description |
|---|---|
| Result[str, TaskError] | Ok containing the enqueued task id, or Err containing a TaskError. |
Remove and return the next task from the queue
Returns the highest priority task. If multiple tasks have the same priority, returns the oldest (FIFO). Respects task delay - tasks with future available_at are skipped.
| Type | Description |
|---|---|
| JobProtocol | None | Next task or None if queue is empty or no tasks are available |
Acknowledge successful processing of a task
Removes the task from the in-flight tracking set.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to acknowledge |
Negative-acknowledge a task, optionally requeuing it
Removes the task from in-flight tracking. If requeue is True the task is pushed back onto the priority heap for retry.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | str | ID of the task to negative-acknowledge |
| `requeue` | bool | If True, return the task to the queue for retry |
Get the number of tasks in the queue
| Type | Description |
|---|---|
| int | Number of pending tasks |
Clear all tasks from the queue
Close the queue connection
For memory queue, there are no external resources to clean up.
MetricsMiddleware
Section titled “MetricsMiddleware”Collects per-task-type execution metrics.
async def after_execute(ctx: TaskExecutionContext) -> None
Record execution metrics for the completed task.
async def on_error( ctx: TaskExecutionContext, error: Exception ) -> None
Increment failure counter for the task type that raised an error.
Return aggregated execution statistics keyed by task name.
NamedTaskConfig
Section titled “NamedTaskConfig”Configuration for a single named task queue backend.
Used in TaskConfig.backends to declare multiple task queues that the framework registers as Named DI bindings.
| Parameter | Type | Description |
|---|---|---|
| `name` | Unique backend identifier. Used as the Named() DI key. | |
| `primary` | Whether this is the primary backend. Primary backends also receive the unnamed TaskQueueProtocol binding. | |
| `type` | Backend type ('memory', 'redis', 'rabbitmq', 'postgres'). | |
| `redis_url` | Redis connection URL (when type='redis'). | |
| `amqp_url` | AMQP connection URL (when type='rabbitmq'). | |
| `postgres_dsn` | Postgres DSN (when type='postgres'). | |
| `queue_name` | Queue name for this backend instance. |
Example
backends: - name: primary primary: true type: redis redis_url: "${REDIS_URL}" - name: notifications type: rabbitmq amqp_url: "${AMQP_URL}"Inject by name
queue: Annotated[TaskQueueProtocol, Named("notifications")]Priority
Section titled “Priority”Task priority levels for queue ordering (higher number = higher priority).
The scale is 0–20 and is separate from the kernel’s
ProviderPriority (0–100). Workers dequeue higher-priority
jobs before lower-priority ones.
Values: LOW (0): Background tasks that can be deferred indefinitely. NORMAL (5): Default priority for routine task processing. HIGH (10): Time-sensitive tasks processed before NORMAL queue. CRITICAL (20): Highest priority; reserved for urgent system tasks.
ProgressInfo
Section titled “ProgressInfo”Current progress state of a task.
Return True when the current count has reached the total.
Return seconds elapsed since the task started, or 0 if not started.
ProgressSnapshot
Section titled “ProgressSnapshot”Immutable point-in-time view of a task's progress.
| Parameter | Type | Description |
|---|---|---|
| `task_id` | Unique identifier for the tracked task. | |
| `current` | Number of units completed so far. | |
| `total` | Total number of units to complete (0 means unknown). | |
| `status` | Current lifecycle state. | |
| `message` | Human-readable status message. | |
| `error` | Error description when ``status`` is ``FAILED``; empty otherwise. |
Completion percentage in the range [0.0, 100.0].
Returns 0.0 when total is unknown (zero).
ProgressStatus
Section titled “ProgressStatus”Lifecycle states for a tracked task.
Values are lowercase strings so they round-trip cleanly through JSON.
ProgressStore
Section titled “ProgressStore”Abstract storage for progress state.
async def save(info: ProgressInfo) -> None
Save progress state.
async def get(job_id: str) -> ProgressInfo | None
Get progress for a job.
Remove progress entry.
async def list_active() -> list[ProgressInfo]
List all active (incomplete) progress entries.
ProgressTracker
Section titled “ProgressTracker”Track and report task progress.
Injected into task handlers to enable progress reporting.
def __init__( job_id: str, store: ProgressStore ) -> None
Update progress.
| Parameter | Type | Description |
|---|---|---|
| `current` | int | Current progress count. |
| `total` | int | Total items to process. |
| `message` | str | Human-readable status message. **metadata: Additional metadata. |
Mark progress as complete.
Mark progress as failed.
property info() -> ProgressInfo
Return the current ProgressInfo snapshot.
QueueRateLimiter
Section titled “QueueRateLimiter”Per-queue rate limiter.
Manages independent rate limits for named task queues, delegating each limit to a TokenBucket.
Example
limiter = QueueRateLimiter()limiter.add_limit("emails", rate=5, per=1.0) # 5 / seclimiter.add_limit("reports", rate=1, per=60.0) # 1 / min
await limiter.acquire("emails")Register a rate limit for a named queue.
| Parameter | Type | Description |
|---|---|---|
| `queue` | str | Queue name to apply the limit to. |
| `rate` | int | Allowed requests per time period. |
| `per` | float | Length of the time period in seconds. |
| `burst` | int | None | Maximum burst size (defaults to *rate*). |
Block until a token is available for queue.
| Parameter | Type | Description |
|---|---|---|
| `queue` | str | Queue name; a no-op for unlisted queues. |
Attempt a non-blocking token acquisition for queue.
| Parameter | Type | Description |
|---|---|---|
| `queue` | str | Queue name. |
| Type | Description |
|---|---|
| bool | ``True`` if a token was obtained or the queue has no limit; ``False`` if the queue is currently rate-limited. |
ResultStore
Section titled “ResultStore”Abstract result store backend.
async def store( job_id: str, result: JobResult ) -> None
Store a job result.
async def get(job_id: str) -> JobResult | None
Get a result by job ID. Returns None if not found or expired.
Delete a result. Returns True if deleted.
async def wait( job_id: str, *, timeout: float = 30.0, poll_interval: float = 0.5 ) -> JobResult | None
Wait for a result with timeout.
Remove expired results. Returns count removed.
ScheduledJob
Section titled “ScheduledJob”Represents a scheduled job with cron expression
Attributes: job_template: Template for creating job instances cron_expression: Cron expression string next_run: Unix timestamp of next execution enabled: Whether job is enabled for execution
Calculate the next run time based on cron expression
| Type | Description |
|---|---|
| float | Unix timestamp of next execution |
StepResult
Section titled “StepResult”Result from a single workflow step.
TaskBackendConfig
Section titled “TaskBackendConfig”Configuration for task queue backends.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskChain
Section titled “TaskChain”Sequential task execution — output of each step feeds the next.
Example
chain = TaskChain([ TaskStep(“parse”, parse_data), TaskStep(“validate”, validate_data), TaskStep(“save”, save_data), ]) result = await chain.execute(raw_input)
def __init__( steps: list[TaskStep], *, stop_on_error: bool = True, state_store: WorkflowStateStore | None = None, workflow_id: str | None = None ) -> None
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]
Execute all steps in sequence, threading output of each step into the next.
If a workflow_id and state_store were provided at construction,
this method will attempt to resume from the last saved checkpoint — any
step whose name appears in the persisted state’s completed steps will be
skipped, and the last completed step’s output will seed the next step.
TaskChord
Section titled “TaskChord”Fan-out then collect — parallel steps + callback with all results.
Example
chord = TaskChord( steps=[TaskStep(“a”, fetch_a), TaskStep(“b”, fetch_b)], callback=TaskStep(“merge”, merge_results), ) result = await chord.execute()
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]
Fan out to all group steps then call the callback with their collected results.
TaskCompletedEvent
Section titled “TaskCompletedEvent”Emitted when a task finishes successfully.
Attributes: task_id: Unique identifier of the completed task. worker_id: Identifier of the worker that executed the task.
TaskCompletedHook
Section titled “TaskCompletedHook”Payload fired when a task finishes successfully.
Attributes: task_name: Name or type label of the completed task. task_id: Unique identifier of the task instance.
TaskConfig
Section titled “TaskConfig”Hierarchical root configuration for Lexigram Tasks.
Attributes: name: Configuration name (default: “tasks”) enabled: Whether tasks module is enabled backend: Task queue backend configuration worker: Task worker settings scheduler: Task scheduler settings retry: Task retry configuration rate_limit: Task rate limiting timeout: Task timeout settings extra: Extra configuration
Validate task configuration for the given environment.
| Parameter | Type | Description |
|---|---|---|
| `env` | Environment | None | Target environment; resolved from ``LEX_ENV`` when ``None``. |
| Type | Description |
|---|---|
| list[ConfigIssue] | List of ConfigIssue instances. |
def from_named( cls, entry: NamedTaskConfig ) -> TaskConfig
Build a single-backend TaskConfig from a NamedTaskConfig entry.
Used internally by TasksProvider to create per-backend configs.
| Parameter | Type | Description |
|---|---|---|
| `entry` | NamedTaskConfig | The named backend entry to materialise. |
| Type | Description |
|---|---|
| TaskConfig | A TaskConfig configured for the single named backend. |
TaskDashboard
Section titled “TaskDashboard”Aggregated task observability dashboard.
def record_execution( task_name: str, *, job_id: str = '', duration_ms: float = 0.0, success: bool = True, error: str | None = None, worker_id: str | None = None ) -> None
Record a task execution.
Get dashboard summary with throughput and error rates.
Get per-task-type statistics with latency percentiles.
def get_recent_executions( *, limit: int = 50, task_name: str | None = None ) -> list[dict[str, Any]]
Get recent execution history.
Get recent errors grouped by task type.
Reset all counters and history.
TaskEnqueuedHook
Section titled “TaskEnqueuedHook”Payload fired when a task is placed on the queue.
Attributes: task_name: Name or type label of the enqueued task. queue_name: Name of the queue the task was added to.
TaskExecutionContext
Section titled “TaskExecutionContext”Context passed through the middleware pipeline.
TaskFailedEvent
Section titled “TaskFailedEvent”Emitted when a task fails after all retry attempts are exhausted.
Attributes: task_id: Unique identifier of the failed task. error: Human-readable description of the failure reason.
TaskFailedHook
Section titled “TaskFailedHook”Payload fired when a task raises an unhandled exception.
Attributes: task_name: Name or type label of the failed task. task_id: Unique identifier of the task instance. reason: Short description or exception message.
TaskGroup
Section titled “TaskGroup”Parallel task execution — all steps run concurrently.
Example
group = TaskGroup([ TaskStep(“email”, send_email), TaskStep(“sms”, send_sms), ]) result = await group.execute(user_data)
def __init__( steps: list[TaskStep], *, max_concurrency: int | None = None, state_store: WorkflowStateStore | None = None, workflow_id: str | None = None ) -> None
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]
Run all steps concurrently, respecting max_concurrency if set.
TaskHealth
Section titled “TaskHealth”Health status for task processing system
Aggregates health information from queue, workers, and scheduler.
Check if system is healthy
Calculate job success rate
Convert to dictionary for serialization
TaskMiddleware
Section titled “TaskMiddleware”Abstract base for task middleware.
async def before_execute(ctx: TaskExecutionContext) -> None
Called before task execution. Override to modify or log.
async def after_execute(ctx: TaskExecutionContext) -> None
Called after task execution. Override to observe or log.
async def on_error( ctx: TaskExecutionContext, error: Exception ) -> None
Called when task execution fails.
TaskMiddlewarePipeline
Section titled “TaskMiddlewarePipeline”Chains middleware around task execution.
def add(middleware: TaskMiddleware) -> None
Append a middleware instance to the pipeline.
async def execute( job: JobProtocol, handler: Any ) -> JobResult
Execute a job through the middleware pipeline.
| Parameter | Type | Description |
|---|---|---|
| `job` | JobProtocol | The job to execute. |
| `handler` | Any | The async handler function. |
| Type | Description |
|---|---|
| JobResult | JobResult from execution. |
TaskProvider
Section titled “TaskProvider”Task processing provider for Lexigram Framework
TaskProvider integrates task processing with the Lexigram Framework, providing dependency injection, lifecycle management, and health monitoring.
Example
from lexigram.app import Applicationfrom lexigram.tasks import TaskProvider, MemoryTaskQueue
app = Application()queue = MemoryTaskQueue()provider = TaskProvider(queue, worker_count=4)app.use(provider)
await app.start()Initialize task provider
| Parameter | Type | Description |
|---|---|---|
| `queue` | TaskQueueProtocol | TaskQueueProtocol implementation to use |
| `worker_count` | int | Number of workers to create |
| `enable_scheduler` | bool | Whether to enable job scheduling |
def from_config( cls, config: TaskConfig, **context: Any ) -> TaskProvider
Create a TaskProvider from a TaskConfig.
Context kwargs may include a pre-built ‘queue’. If not provided, a MemoryTaskQueue is created as default.
async def register(container: ContainerRegistrarProtocol) -> None
Register task services with the DI container.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerRegistrarProtocol | DI registrar to bind task services into. |
async def boot(container: ContainerResolverProtocol) -> None
Start the task provider.
Called by the framework on application startup after all registrations.
Shutdown the task provider gracefully
Check task provider health.
In multi-backend mode the overall status is the worst individual status
across all named queues. TaskQueueProtocol does not expose a
health_check() method, so liveness is probed via get_task_count().
| Type | Description |
|---|---|
| HealthCheckResult | HealthCheckResult with current status and metrics. |
Register a task handler
| Parameter | Type | Description |
|---|---|---|
| `task_name` | str | Name of the task type |
| `handler` | Callable[Ellipsis, Any] | Async handler function |
Register a decorated task function for scheduling.
| Parameter | Type | Description |
|---|---|---|
| `task_func` | Any | Task function decorated with @scheduled |
Enqueue a job for processing
def schedule_job_sync( job_template: JobProtocol | JobTemplateProtocol, cron_expression: str, job_id: str | None = None ) -> str | None
Schedule a job with cron expression
Remove a scheduled job
Get worker pool statistics
Get scheduled jobs information
TaskQueuedEvent
Section titled “TaskQueuedEvent”Emitted when a task is accepted into a processing queue.
Attributes: task_id: Unique identifier of the queued task. queue_name: Name of the queue that accepted the task.
TaskRateLimitConfig
Section titled “TaskRateLimitConfig”Configuration for rate limiting.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskScheduler
Section titled “TaskScheduler”Task scheduler for cron-like job scheduling
TaskScheduler manages periodic job execution using cron expressions. Jobs are checked at regular intervals and enqueued when due.
Example
scheduler = TaskScheduler()
# Schedule jobtemplate = JobTemplateProtocol(name="cleanup", args=())scheduler.schedule_job(template, "0 0 * * *") # Daily at midnight
# Start schedulerawait scheduler.start_scheduler(queue.enqueue)Initialize task scheduler.
| Parameter | Type | Description |
|---|---|---|
| `check_interval` | float | How often to check for due jobs (seconds). |
| `store` | SchedulerStore | None | Optional SchedulerStore for durable persistence of job definitions and next-run times. When provided, all schedule/unschedule operations are persisted and the scheduler can be restored after restart via restore_from_store. |
def schedule_job_sync( job_template: JobTemplateProtocol | JobProtocol, cron_expression: str, job_id: str | None = None ) -> str
Schedule a job with cron expression (sync, no persistence).
For the primary async version with store persistence, use schedule_job.
| Parameter | Type | Description |
|---|---|---|
| `job_template` | JobTemplateProtocol | JobProtocol | JobProtocol template or JobProtocol to schedule |
| `cron_expression` | str | Cron expression (e.g., "0 9 * * 1-5") |
| `job_id` | str | None | Optional job ID, generated if not provided |
| Type | Description |
|---|---|
| str | JobProtocol ID for the scheduled job |
async def schedule_job( job_template: JobTemplateProtocol | JobProtocol, cron_expression: str, job_id: str | None = None ) -> str
Schedule a job and persist to store if one is configured.
This is the primary async method. For a synchronous variant without persistence, use schedule_job_sync.
| Parameter | Type | Description |
|---|---|---|
| `job_template` | JobTemplateProtocol | JobProtocol | JobProtocol template or JobProtocol to schedule. |
| `cron_expression` | str | Cron expression (e.g., ``"0 9 * * 1-5"``). |
| `job_id` | str | None | Optional job ID, generated if not provided. |
| Type | Description |
|---|---|
| str | JobProtocol ID for the scheduled job. |
Restore all persisted job definitions from the configured store.
Loads every job persisted by previous scheduler instances and re-registers them in scheduled_jobs. Call this once during startup before invoking start_scheduler to resume the full scheduling state.
| Type | Description |
|---|---|
| int | Number of jobs restored. |
| Exception | Description |
|---|---|
| RuntimeError | If no SchedulerStore was provided at construction. |
Remove a scheduled job (sync, no store deletion).
For the primary async version with store deletion, use unschedule_job.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID to remove |
| Type | Description |
|---|---|
| bool | True if job was removed, False if not found |
Remove a scheduled job and delete it from the store.
This is the primary async method. For a synchronous variant without store deletion, use unschedule_job_sync.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID to remove. |
| Type | Description |
|---|---|
| bool | True if job was removed, False if not found. |
def get_scheduled_jobs() -> list[ScheduledJob]
Get all scheduled jobs
| Type | Description |
|---|---|
| list[ScheduledJob] | List of all scheduled jobs |
def get_scheduled_job(job_id: str) -> ScheduledJob | None
Get a specific scheduled job
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID |
| Type | Description |
|---|---|
| ScheduledJob | None | ScheduledJob or None if not found |
Start the scheduler loop
| Parameter | Type | Description |
|---|---|---|
| `enqueue_callback` | Callable[[JobProtocol], Any] | Async function to enqueue jobs when they are due |
Stop the scheduler gracefully
Enable a scheduled job
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID |
| Type | Description |
|---|---|
| bool | True if job was enabled |
Disable a scheduled job
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | JobProtocol ID |
| Type | Description |
|---|---|
| bool | True if job was disabled |
Get next run times for all enabled scheduled jobs
| Type | Description |
|---|---|
| dict[str, float] | Dictionary mapping job ID to next run timestamp |
TaskSchedulerConfig
Section titled “TaskSchedulerConfig”Configuration for task scheduler.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskStartedHook
Section titled “TaskStartedHook”Payload fired when a worker picks up and begins executing a task.
Attributes: task_name: Name or type label of the task being executed. task_id: Unique identifier of the task instance.
TaskStep
Section titled “TaskStep”A single step in a workflow.
| Parameter | Type | Description |
|---|---|---|
| `name` | Human-readable step name. | |
| `handler` | Async callable that takes input and returns output. | |
| `on_error` | Optional error handler. | |
| `timeout` | Optional timeout in seconds. |
TaskTimeoutConfig
Section titled “TaskTimeoutConfig”Configuration for task timeout behavior.
TaskWorker
Section titled “TaskWorker”Individual task worker for job execution
TaskWorker processes jobs from a queue using registered handlers. Each worker runs independently and tracks its own statistics.
Example
worker = TaskWorker("worker-1", queue, handlers)await worker.start()# ... worker processes jobs ...await worker.stop()def __init__( worker_id: str, queue: TaskQueueProtocol, handler_registry: dict[str, Callable[Ellipsis, Any]], config: TaskWorkerConfig | None = None, services: TaskWorkerServices | None = None, *, middleware_pipeline: TaskMiddlewarePipeline | None = None )
Initialize task worker.
| Parameter | Type | Description |
|---|---|---|
| `worker_id` | str | Unique worker identifier. |
| `queue` | TaskQueueProtocol | TaskQueueProtocol to pull jobs from. |
| `handler_registry` | dict[str, Callable[Ellipsis, Any]] | Dict mapping job names to handler functions. |
| `config` | TaskWorkerConfig | None | Optional worker configuration controlling timeouts and execution behaviour. Defaults to TaskWorkerConfig() when not provided. |
| `services` | TaskWorkerServices | None | Optional bundle of infrastructure services (DLQ, result store, progress store, dashboard, retry policy, task manager, idempotency manager, logger). Use TaskWorkerServices to compose only the services you need; absent services degrade gracefully. Pass a ``logger`` via ``services.logger`` to inject a custom bound logger; otherwise a module logger is created and bound to ``worker_id`` automatically. |
| `middleware_pipeline` | TaskMiddlewarePipeline | None | Optional middleware pipeline for cross-cutting concerns. |
def get_stats() -> WorkerJobStats
Get worker statistics
| Type | Description |
|---|---|
| WorkerJobStats | Current worker statistics |
Check if worker is currently processing a job
| Type | Description |
|---|---|
| bool | True if worker has a job in progress |
def get_progress_tracker(job_id: str) -> ProgressTracker | None
Get a ProgressTracker for a job (opt-in for handlers).
Handlers can use this to report progress. The worker must have a progress_store configured.
| Parameter | Type | Description |
|---|---|---|
| `job_id` | str | The job ID to track progress for |
| Type | Description |
|---|---|
| ProgressTracker | None | ProgressTracker instance if progress_store is configured, None otherwise |
TaskWorkerConfig
Section titled “TaskWorkerConfig”Configuration for task workers.
model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)
TaskWorkerServices
Section titled “TaskWorkerServices”Optional infrastructure services injected into a TaskWorker.
Groups all optional infrastructure dependencies so that the
TaskWorker constructor stays focused on the mandatory parameters.
All fields default to None — omitted services degrade gracefully.
Attributes:
dead_letter_queue: DLQ for routing permanently-failed jobs.
result_store: Persistent store for job execution results.
progress_store: Store for in-progress task progress updates.
dashboard: Observability dashboard for worker and job metrics.
retry_policy: Retry policy implementation (from lexigram-resilience).
task_manager: Kernel task manager for critical shutdown registration.
idempotency_manager: Manager for at-most-once job execution.
logger: Optional logger instance.
TasksModule
Section titled “TasksModule”Background task workers, scheduling, and async job queues.
Call configure to configure the task-processing subsystem.
Usage (in-memory queue for development)
from lexigram.tasks.backends.memory import MemoryTaskQueue
@module( imports=[TasksModule.configure(queue=MemoryTaskQueue())])class AppModule(Module): passUsage (Redis queue for production)
from lexigram.tasks.backends.redis import RedisTaskQueue
@module( imports=[ TasksModule.configure( queue=RedisTaskQueue(url="redis://localhost"), worker_count=4, ) ])class AppModule(Module): passdef configure( cls, queue: Any | None = None, worker_count: int = 1, enable_scheduler: bool = True ) -> DynamicModule
Create a TasksModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `queue` | Any | None | TaskQueueProtocol implementation. Defaults to MemoryTaskQueue. |
| `worker_count` | int | Number of concurrent task workers. |
| `enable_scheduler` | bool | Whether to enable the job scheduler. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def scope( cls, *handlers: type ) -> DynamicModule
Scope task handler classes into a feature module.
Registers the given task handler classes as providers so they are discovered and wired by the task executor. The parent module graph must already include TasksModule.configure — this does not create a new queue or worker pool.
Uses the anonymous token pattern so both configure() and
scope() can coexist in the same compiled graph without a
ModuleDuplicateError.
Example
@module( imports=[ TasksModule.configure(queue=RedisTaskQueue(url=...)), TasksModule.scope(SendEmailTask, GenerateReportTask), ])class NotificationFeatureModule(Module): pass| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule scoped to this feature. |
def stub(cls) -> DynamicModule
Return an in-memory TasksModule for unit testing.
Uses a MemoryTaskQueue with a single worker and no job scheduler.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule backed by an in-memory task queue. |
TimeoutMiddleware
Section titled “TimeoutMiddleware”Enforces execution timeout on tasks.
Uses asyncio.wait_for to enforce timeout on task execution.
async def before_execute(ctx: TaskExecutionContext) -> None
Compute and store the effective timeout in the execution context metadata.
async def execute_with_timeout( handler: Any, job: JobProtocol, ctx: TaskExecutionContext ) -> JobResult
Execute handler with timeout enforcement.
| Parameter | Type | Description |
|---|---|---|
| `handler` | Any | The handler to execute |
| `job` | JobProtocol | The job being executed |
| `ctx` | TaskExecutionContext | Execution context |
| Type | Description |
|---|---|
| JobResult | JobResult from execution |
UniqueTask
Section titled “UniqueTask”Decorator for unique task execution.
Prevents duplicate execution of the same task using in-memory locks.
Example
lock_manager = LockManager()@unique_task( lock_manager=lock_manager, key_func=lambda user_id: f"sync_user:{user_id}", timeout=3600)async def sync_user(user_id: str): await sync_user_data(user_id)def __init__( lock_manager: LockManager, key_func: Callable[[Any], str], timeout: float = 60.0, skip_if_locked: bool = True )
Initialize unique task decorator.
| Parameter | Type | Description |
|---|---|---|
| `lock_manager` | LockManager | LockManager instance for creating locks |
| `key_func` | Callable[[Any], str] | Function to generate lock key from task args |
| `timeout` | float | Lock timeout in seconds |
| `skip_if_locked` | bool | If True, skip execution if lock held; if False, wait |
WorkerJobStats
Section titled “WorkerJobStats”Worker statistics for monitoring performance.
Tracks execution metrics for an individual worker including job counts, timing, and activity tracking.
Get worker uptime in seconds.
def record_job(result: JobResult) -> None
Record job execution statistics.
| Parameter | Type | Description |
|---|---|---|
| `result` | JobResult | JobProtocol execution result. |
WorkerPool
Section titled “WorkerPool”Pool of task workers with monitoring and scaling
WorkerPool manages multiple TaskWorker instances for concurrent job processing. Supports dynamic scaling and aggregate statistics.
Example
pool = WorkerPool(queue, handlers, size=4)await pool.start()stats = pool.get_pool_stats()await pool.scale_to(8) # Scale upawait pool.stop()def __init__( queue: TaskQueueProtocol, handler_registry: dict[str, Callable[Ellipsis, Any]], size: int = 1, logger: Logger | None = None, task_manager: TaskManagerProtocol | None = None, dead_letter_queue: DeadLetterQueue | None = None, hooks: HookRegistryProtocol | None = None )
Initialize worker pool
| Parameter | Type | Description |
|---|---|---|
| `queue` | TaskQueueProtocol | TaskQueueProtocol for workers to pull from |
| `handler_registry` | dict[str, Callable[Ellipsis, Any]] | Dict mapping job names to handlers |
| `size` | int | Number of workers to create |
| `task_manager` | TaskManagerProtocol | None | Optional kernel task manager for critical task registration during worker lifecycle. |
| `dead_letter_queue` | DeadLetterQueue | None | Optional shared DLQ for permanently failed jobs. |
Start the worker pool
Creates and starts all workers in the pool.
Stop the worker pool gracefully.
| Parameter | Type | Description |
|---|---|---|
| `drain` | bool | If ``True``, wait for each worker to finish its current job before stopping. Defaults to ``False`` for a fast cancel-and-stop. |
| `timeout` | float | None | Per-worker seconds to wait when *drain* is ``True``. ``None`` means wait indefinitely. Defaults to 30 seconds. |
def get_worker_stats() -> list[WorkerJobStats]
Get statistics for all workers
| Type | Description |
|---|---|
| list[WorkerJobStats] | List of WorkerJobStats for each worker |
Get aggregate pool statistics
| Type | Description |
|---|---|
| dict[str, Any] | Dictionary with pool-wide statistics |
Scale the worker pool to a new size
Dynamically add or remove workers.
| Parameter | Type | Description |
|---|---|---|
| `new_size` | int | Target number of workers |
WorkflowResult
Section titled “WorkflowResult”Result from a complete workflow execution.
WorkflowStatus
Section titled “WorkflowStatus”Enumeration of possible workflow execution states.
Functions
Section titled “Functions”
Create a sequential TaskChain from the given steps.
Each step receives the output of the previous step as its input.
| Parameter | Type | Description |
|---|---|---|
| `stop_on_error` | bool | Whether to abort the pipeline on the first failure. Defaults to ``True``. |
Example
result = await chain( TaskStep("validate", validate_input), TaskStep("process", process_data), TaskStep("persist", save_to_db),).execute(raw_data)
Decorate an async function to add a .delay() background-dispatch method.
The decorated function is unchanged when called directly — it still returns
a coroutine that must be await-ed. The .delay() method is the only
addition; it schedules the coroutine as a background asyncio.Task
and stores the reference to prevent premature garbage collection.
| Parameter | Type | Description |
|---|---|---|
| `fn` | Callable[_P, Coroutine[Any, Any, _R]] | An async function (coroutine function) to wrap. |
| Type | Description |
|---|---|
| _DelayedCallable[_P, _R] | A _DelayedCallable that is callable exactly like ``fn`` but also exposes ``.delay(*args, **kwargs) -> asyncio.Task``. |
Example
@delayasync def flush_audit_log(batch: list[AuditEntry]) -> None: await audit_store.save_many(batch)
# Direct call (awaited in the current task):await flush_audit_log(batch=entries)
# Background dispatch (does not block):flush_audit_log.delay(batch=entries)distributed_lock
Section titled “distributed_lock”
async def distributed_lock( key: str, timeout: float = 60.0, lock_manager: LockManager | None = None ) -> AsyncGenerator[InMemoryLock, None]
Context manager for in-memory locks.
Deprecated: Use LockManager.acquire() instead for better DI integration.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Unique lock key |
| `timeout` | float | Lock timeout in seconds |
| `lock_manager` | LockManager | None | Optional LockManager instance (creates new if None) |
| Type | Description |
|---|---|
| AsyncGenerator[InMemoryLock, None] | InMemoryLock instance |
Example
manager = LockManager() # Should be from DI containerasync with distributed_lock("resource:123", lock_manager=manager): await process_resource(123)scheduled
Section titled “scheduled”
def scheduled( cron: str, name: str | None = None, priority: Priority | int = Priority.NORMAL, max_retries: int = 3, timeout: float | None = None ) -> Callable[[Callable[Ellipsis, Any]], Any]
Decorator to define a scheduled task.
| Parameter | Type | Description |
|---|---|---|
| `cron` | str | Cron expression (e.g., "0 9 * * *") |
| `name` | str | None | Task name |
| `priority` | Priority | int | Task priority |
| `max_retries` | int | Maximum retry attempts |
| `timeout` | float | None | Execution timeout |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Any] | Decorated task function with scheduling metadata |
def task( name: str | None = None, priority: Priority | int = Priority.NORMAL, max_retries: int = 3, timeout: float | None = None, queue: str = 'default', idempotency_key: str | None = None ) -> Callable[[Callable[Ellipsis, Any]], Any]
Decorator to define a task.
Converts a regular function into a task that can be enqueued and executed by workers.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | None | Task name (defaults to function name) |
| `priority` | Priority | int | Task priority (HIGH, NORMAL, LOW) |
| `max_retries` | int | Maximum retry attempts on failure |
| `timeout` | float | None | Execution timeout in seconds |
| `queue` | str | Queue name for this task |
| `idempotency_key` | str | None | Optional key for idempotent execution |
| Type | Description |
|---|---|
| Callable[[Callable[Ellipsis, Any]], Any] | Decorated function with .delay() and .apply_async() methods |
Exceptions
Section titled “Exceptions”QueueFullError
Section titled “QueueFullError”The queue has reached its maximum capacity and cannot accept new tasks.
The caller should back off and retry after a delay, or apply backpressure.
TaskCancelledError
Section titled “TaskCancelledError”Raised when a task is cancelled
TaskError
Section titled “TaskError”Base exception for all task-related errors
TaskExecutionError
Section titled “TaskExecutionError”Raised when a task execution fails
TaskNotFoundError
Section titled “TaskNotFoundError”Raised when a task cannot be found
TaskTimeoutError
Section titled “TaskTimeoutError”Raised when a task execution exceeds timeout
TaskValidationError
Section titled “TaskValidationError”Raised when task parameters are invalid