Skip to content
GitHubDiscord

API Reference

Protocol for tracking and broadcasting task progress.

Implementations must be safe for concurrent use — multiple coroutines may call update on the same task_id simultaneously, and multiple consumers may subscribe to the same task.

The subscribe method returns a live AsyncIterator that yields a new ProgressSnapshot every time update, complete, or fail is called for the given task. The iterator terminates automatically when the task reaches a terminal state (COMPLETE or FAILED).

Example

async def export_data(tracker: ProgressTrackerProtocol) -> None:
records = await fetch_all()
total = len(records)
for i, record in enumerate(records, start=1):
await process(record)
await tracker.update("export-1", i, total, f"Row {i}/{total}")
await tracker.complete("export-1", "Export finished")
async def update(
    task_id: str,
    current: int,
    total: int,
    message: str = ''
) -> None

Record incremental progress for a task.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`current`intUnits completed so far.
`total`intTotal units to process (0 = unknown).
`message`strOptional human-readable status message.
async def complete(
    task_id: str,
    result: str = ''
) -> None

Mark a task as successfully completed.

Closes all active subscriptions for task_id after broadcasting the terminal ProgressSnapshot.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`result`strOptional human-readable completion message.
async def fail(
    task_id: str,
    error: str
) -> None

Mark a task as failed.

Closes all active subscriptions for task_id after broadcasting the terminal ProgressSnapshot.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`error`strDescription of the failure.
async def get(task_id: str) -> ProgressSnapshot | None

Return the current progress state for a task.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
Returns
TypeDescription
ProgressSnapshot | NoneThe most recent ProgressSnapshot, or ``None`` if the task is not known to this tracker.
def subscribe(task_id: str) -> AsyncIterator[ProgressSnapshot]

Subscribe to live progress updates for a task.

Returns an async iterator that yields a new ProgressSnapshot each time the task’s state changes. The iterator stops automatically when the task reaches a terminal state (COMPLETE or FAILED).

If the task is already in a terminal state when subscribe is called, the iterator yields the final snapshot once and then stops.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task to observe.
Returns
TypeDescription
AsyncIterator[ProgressSnapshot]An AsyncIterator of ProgressSnapshot objects.

Example

async for snap in tracker.subscribe("job-42"):
print(f"{snap.percent:.1f}% — {snap.message}")

Protocol for task executor implementations.

Executors process tasks from a queue by dispatching them to registered handlers.

Example

class TaskExecutorProtocol:
async def execute(self, task: Task) -> Any:
handler = self._handlers.get(task.name)
if handler:
return await handler(task.payload)
raise UnknownTaskError(task.name)
async def execute(task: Any) -> Any

Execute a task using the registered handler.

Parameters
ParameterTypeDescription
`task`AnyTask to execute.
Returns
TypeDescription
AnyTask execution result.
Raises
ExceptionDescription
UnknownTaskErrorIf no handler registered for task.
def register_handler(
    task_name: str,
    handler: Any
) -> None

Register a handler for a task type.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
`handler`AnyAsync callable to handle the task.
def get_handler(task_name: str) -> Any | None

Get the handler for a task type.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
Returns
TypeDescription
Any | NoneHandler if registered, None otherwise.

Protocol for task queue implementations.

This defines the contract for background task queues (Redis-based, PostgreSQL-based, in-memory, etc.).

Example

class RedisTaskQueue:
async def enqueue(self, task: Task) -> str:
task_id = str(uuid4())
await self._redis.rpush("tasks", task.to_json())
return task_id
async def dequeue(self) -> Task | None:
data = await self._redis.lpop("tasks")
return Task.from_json(data) if data else None
async def enqueue(task: Any) -> Result[str, TaskQueueError]

Add a task to the queue.

Parameters
ParameterTypeDescription
`task`AnyTask instance to enqueue.
Returns
TypeDescription
Result[str, TaskQueueError]Ok(task_id) on success; Err(TaskQueueError) if the queue reports a recoverable failure (e.g. full queue, invalid payload). Only infrastructure failures (lost connection) are raised as exceptions.
async def dequeue() -> Any | None

Remove and return the next task.

Returns
TypeDescription
Any | NoneNext Task or None if queue is empty.
async def get_task_count() -> int

Get the number of tasks in the queue.

Returns
TypeDescription
intNumber of pending tasks.
async def clear() -> None

Clear all tasks from the queue.

async def ack(task_id: str) -> None

Acknowledge successful processing of a dequeued task.

Signals that the task has been processed successfully and can be permanently discarded from any in-flight tracking.

Parameters
ParameterTypeDescription
`task_id`strID of the task to acknowledge.
async def nack(
    task_id: str,
    requeue: bool = True
) -> None

Negative-acknowledge a dequeued task.

Signals that the task could not be processed. The task is optionally requeued for another processing attempt.

Parameters
ParameterTypeDescription
`task_id`strID of the task to negative-acknowledge.
`requeue`boolIf True, return the task to the queue for retry. If False, discard the task permanently.
async def close() -> None

Close the queue connection and cleanup resources.


Branch based on a condition evaluated against the input.

Example

workflow = BranchStep( condition=lambda data: data[“amount”] > 1000, if_true=TaskStep(“manual_review”, manual_review), if_false=TaskStep(“auto_approve”, auto_approve), )

def __init__(
    condition: Callable[[Any], bool],
    if_true: TaskStep,
    if_false: TaskStep
) -> None
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Evaluate the condition and execute either the true or false branch.


Distributed progress store backed by the platform ``CacheBackendProtocol``.

Stores serialised ProgressInfo objects in the configured cache backend so that progress is visible to all worker processes. TTL-based expiry is delegated to the backend.

Parameters
ParameterTypeDescription
`cache`Platform cache backend (Redis, Memcached, …).
`ttl`Seconds before a progress entry expires. Defaults to 3600 (1 hour).
def __init__(
    cache: CacheBackendProtocol,
    *,
    ttl: int = 3600
) -> None
async def save(info: ProgressInfo) -> None

Persist a ProgressInfo entry to the cache backend.

Parameters
ParameterTypeDescription
`info`ProgressInfoProgress state to save.
async def get(job_id: str) -> ProgressInfo | None

Return the ProgressInfo for the given job ID, or None if not found.

Parameters
ParameterTypeDescription
`job_id`strUnique identifier for the job.
async def delete(job_id: str) -> None

Remove the progress entry for the given job ID.

Parameters
ParameterTypeDescription
`job_id`strUnique identifier for the job.
async def list_active() -> list[ProgressInfo]

Return all active (incomplete) progress entries.

Note This implementation scans the key prefix via get, which may be slow on large datasets. Consider using a Redis-specific implementation with SCAN for production at scale.

Returns
TypeDescription
list[ProgressInfo]List of incomplete ProgressInfo.

Distributed result store backed by the platform ``CacheBackendProtocol``.

Stores serialised JobResult objects in the configured cache backend so that results are visible to all worker processes. TTL-based expiry is delegated to the backend.

Parameters
ParameterTypeDescription
`cache`Platform cache backend (Redis, Memcached, …).
`ttl`Seconds before a result entry expires. Defaults to 3600 (1 hour).
`poll_interval`Seconds between polls in wait. Defaults to 0.5.
def __init__(
    cache: CacheBackendProtocol,
    *,
    ttl: int = 3600,
    poll_interval: float = 0.5
) -> None
async def store(
    job_id: str,
    result: JobResult
) -> None

Persist a job result in the cache backend with TTL.

async def get(job_id: str) -> JobResult | None

Return the stored result, or None if absent or expired.

async def delete(job_id: str) -> bool

Remove the result from the cache and return True if it existed.

async def wait(
    job_id: str,
    *,
    timeout: float = 30.0,
    poll_interval: float | None = None
) -> JobResult | None

Poll the cache until the result appears or the timeout elapses.

async def cleanup_expired() -> int

No-op: the cache backend handles TTL expiry automatically.


Wrapper for cron expression parsing.

Provides a unified interface for cron expression handling with optional croniter dependency.

Note This class requires the croniter optional dependency. See lexigram.tasks.scheduling.cron for installation instructions. Instantiation raises :exc:ImportError when croniter is absent.

Example

cron = CronExpression("0 9 * * 1-5") # Weekdays at 9am
next_run = cron.get_next()
def __init__(expression: str)

Initialize cron expression

Parameters
ParameterTypeDescription
`expression`strCron expression string
Raises
ExceptionDescription
ImportErrorIf croniter is not installed
ValueErrorIf expression is invalid
def get_next(base_time: float | None = None) -> float

Get next execution time

Parameters
ParameterTypeDescription
`base_time`float | NoneBase time (defaults to current time)
Returns
TypeDescription
floatUnix timestamp of next execution
def get_prev(base_time: float | None = None) -> float

Get previous execution time

Parameters
ParameterTypeDescription
`base_time`float | NoneBase time (defaults to current time)
Returns
TypeDescription
floatUnix timestamp of previous execution

Manages failed jobs for inspection, retry, and cleanup.
def __init__(
    *,
    max_size: int = 10000,
    retention_hours: float = 168
) -> None
def add(
    job: JobProtocol,
    error: str,
    traceback: str | None = None,
    **metadata: Any
) -> None

Add a failed job to the DLQ.

Parameters
ParameterTypeDescription
`job`JobProtocolThe failed job.
`error`strError message.
`traceback`str | NoneFull traceback string. **metadata: Additional metadata.
def get(job_id: str) -> FailureRecord | None

Get a specific failure record.

def list_failed(
    *,
    limit: int = 50,
    job_name: str | None = None
) -> list[FailureRecord]

List failed jobs, optionally filtered by job name.

Parameters
ParameterTypeDescription
`limit`intMaximum records to return.
`job_name`str | NoneFilter by job type name.
Returns
TypeDescription
list[FailureRecord]List of failure records (newest first).
def retry(job_id: str) -> JobProtocol | None

Remove a job from DLQ and prepare it for retry.

Returns the job with status reset to PENDING, or None if not found.

def retry_all() -> list[JobProtocol]

Retry all failed jobs. Returns list of jobs to re-enqueue.

def remove(job_id: str) -> bool

Permanently remove a failure from the DLQ.

def purge(
    *,
    older_than_hours: float | None = None
) -> int

Purge old failures.

Parameters
ParameterTypeDescription
`older_than_hours`float | NoneRemove records older than this. Defaults to retention_hours.
Returns
TypeDescription
intNumber of records purged.
def clear() -> int

Clear all DLQ entries. Returns count cleared.

property size() -> int

Return the number of currently tracked failure records.

def get_stats() -> dict[str, Any]

Get DLQ statistics.


In-memory lock for preventing duplicate task execution within a single process.

Note: This is NOT a distributed lock. For true distributed locking across multiple processes/servers, use a Redis-based or database-based lock implementation.

Example

lock_manager = LockManager()
async with lock_manager.acquire("user:123:sync", timeout=60):
await sync_user_data(user_id=123)
def __init__(
    key: str,
    timeout: float,
    locks_dict: dict[str, tuple[float, asyncio.Lock]],
    locks_lock: asyncio.Lock
)

Initialize in-memory lock.

Parameters
ParameterTypeDescription
`key`strUnique lock key
`timeout`floatLock timeout in seconds (auto-release)
`locks_dict`dict[str, tuple[float, asyncio.Lock]]Shared locks dictionary (from LockManager)
`locks_lock`asyncio.LockShared lock for accessing locks_dict (from LockManager)
async def acquire() -> bool

Acquire the lock.

Returns
TypeDescription
boolTrue if lock was acquired, False if already held
async def try_acquire() -> bool

Try to acquire lock without waiting.

Returns
TypeDescription
boolTrue if acquired, False if already held
async def release() -> None

Release the lock.


Record of a single task execution.
def to_dict() -> dict[str, Any]

Serialize the execution record to a plain dictionary.


Record of a failed job in the DLQ.
def to_dict() -> dict[str, Any]

Serialize the failure record to a plain dictionary.

def from_dict(
    cls,
    data: dict[str, Any]
) -> FailureRecord

Deserialize a failure record from a plain dictionary.

Parameters
ParameterTypeDescription
`data`dict[str, Any]Dictionary as produced by to_dict.
Returns
TypeDescription
FailureRecordReconstructed FailureRecord.

System-wide throughput cap applied across all queues.

Delegates to a single TokenBucket for consistent rate enforcement regardless of which queue a task originates from.

Example

limiter = GlobalRateLimiter(rate=100, per=1.0) # 100 tasks / sec
await limiter.acquire()
def __init__(
    rate: int,
    per: float = 1.0,
    burst: int | None = None
) -> None

Initialise the global rate limiter.

Parameters
ParameterTypeDescription
`rate`intMaximum allowed tasks per time period.
`per`floatLength of the time period in seconds.
`burst`int | NoneMaximum burst capacity (defaults to *rate*).
async def acquire() -> None

Block until a global token is available.

async def try_acquire() -> bool

Non-blocking global token acquisition.

Returns
TypeDescription
bool``True`` if a token was obtained; ``False`` if globally rate-limited.

Registry for task handlers that also satisfies the TaskExecutorProtocol contract.

Maps task names to their handler functions. Handlers can be registered and retrieved by name. The execute method dispatches a task to its registered handler, making this class usable wherever a TaskExecutorProtocol is expected.

Extends Registry for unified introspection and lifecycle hooks.

Example

registry = HandlerRegistry()
@registry.register("send_email")
async def send_email(to: str, subject: str):
...
handler = registry.get("send_email")
def __init__() -> None

Initialize handler registry.

def register(
    name: str,
    handler: Callable[Ellipsis, Awaitable[Any]] | None = None
) -> Callable[Ellipsis, Any]

Register a task handler.

Can be used as a decorator or called directly.

Parameters
ParameterTypeDescription
`name`strTask name.
`handler`Callable[Ellipsis, Awaitable[Any]] | NoneHandler function (if not using as decorator).
Returns
TypeDescription
Callable[Ellipsis, Any]Handler function or decorator.

Example

# As decorator
@registry.register("task_name")
async def my_handler(...):
pass
# Direct call
registry.register("task_name", my_handler)
def register_handler(
    task_name: str,
    handler: Any
) -> None

Register a handler for a task type (TaskExecutorProtocol method).

This method satisfies the register_handler requirement defined in lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a protocol-mandated entry point that delegates directly to register so that HandlerRegistry can be used wherever a TaskExecutorProtocol is expected without any additional adapter. Do not remove — the protocol contract requires this exact signature.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
`handler`AnyAsync callable to handle the task.
def get_handler(task_name: str) -> Any | None

Get the handler for a task type (TaskExecutorProtocol method).

This method satisfies the get_handler requirement defined in lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a protocol-mandated entry point that delegates directly to get so that HandlerRegistry can be used wherever a TaskExecutorProtocol is expected without any additional adapter. Do not remove — the protocol contract requires this exact signature.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
Returns
TypeDescription
Any | NoneHandler if registered, ``None`` otherwise.
async def execute(task: Any) -> Any

Execute a task using the registered handler (TaskExecutorProtocol protocol).

Resolves the handler for task.name, then invokes it with task.args and task.kwargs. Supports both sync and async handlers.

Parameters
ParameterTypeDescription
`task`AnyTask object with ``name``, ``args``, and ``kwargs`` attributes.
Returns
TypeDescription
AnyResult returned by the handler.
Raises
ExceptionDescription
TaskNotFoundErrorIf no handler is registered for the task name.
def remove(name: str) -> bool

Remove handler from registry.

Parameters
ParameterTypeDescription
`name`strTask name.
Returns
TypeDescription
boolTrue if handler was removed.
def list_handlers() -> list[str]

List all registered handler names.

Returns
TypeDescription
list[str]List of task names.
def to_dict() -> dict[str, Callable[Ellipsis, Awaitable[Any]]]

Get handlers as dictionary.

Returns
TypeDescription
dict[str, Callable[Ellipsis, Awaitable[Any]]]Dictionary of all handlers.

In-memory progress store for development/testing.
def __init__() -> None
async def save(info: ProgressInfo) -> None

Persist a ProgressInfo entry in the in-memory store.

async def get(job_id: str) -> ProgressInfo | None

Return the ProgressInfo for the given job ID, or None if not found.

async def delete(job_id: str) -> None

Remove the progress entry for the given job ID if it exists.

async def list_active() -> list[ProgressInfo]

Return all progress entries whose current count has not yet reached total.


Thread-safe, in-memory ProgressTrackerProtocol implementation.

Maintains the latest ProgressSnapshot for each tracked task and fans out every state change to all active subscribers via asyncio.Queue.

Subscriptions are kept alive until the task reaches a terminal state (COMPLETE or FAILED), at which point the iterator closes automatically. Subscribers that call subscribe after the task has already finished receive the terminal snapshot once and then stop.

All public methods are safe to call concurrently from multiple coroutines.

def __init__() -> None
async def update(
    task_id: str,
    current: int,
    total: int,
    message: str = ''
) -> None

Record incremental progress and broadcast to all subscribers.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`current`intUnits completed so far.
`total`intTotal units to process (0 means unknown).
`message`strOptional human-readable status line.
async def complete(
    task_id: str,
    result: str = ''
) -> None

Mark a task as successfully completed and close all subscriptions.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`result`strOptional human-readable completion message.
async def fail(
    task_id: str,
    error: str
) -> None

Mark a task as failed and close all subscriptions.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`error`strDescription of the failure.
async def get(task_id: str) -> ProgressSnapshot | None

Return the current progress state for a task.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
Returns
TypeDescription
ProgressSnapshot | NoneThe most recent ProgressSnapshot, or ``None`` if the task has not been seen by this tracker.
def subscribe(task_id: str) -> AsyncGenerator[ProgressSnapshot, None]

Subscribe to live progress updates for a task.

Returns an async generator that yields one ProgressSnapshot per state change. The generator stops automatically when the task reaches a terminal state. If the task is already finished, the terminal snapshot is yielded once and the generator stops immediately.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task to observe.
Returns
TypeDescription
AsyncGenerator[ProgressSnapshot, None]An async generator of ProgressSnapshot objects.

In-memory result store with TTL-based expiry.

Suitable for development and single-process deployments. For production, implement a Redis-backed store.

def __init__(
    *,
    ttl: int = 3600,
    max_size: int = 10000
) -> None
async def store(
    job_id: str,
    result: JobResult
) -> None

Persist a job result, evicting the oldest entry when at capacity.

async def get(job_id: str) -> JobResult | None

Return the result for the given job ID, or None if absent or expired.

async def delete(job_id: str) -> bool

Remove the result for the given job ID and return True if it existed.

async def wait(
    job_id: str,
    *,
    timeout: float = 30.0,
    poll_interval: float = 0.5
) -> JobResult | None

Wait for a result to appear, with timeout.

async def cleanup_expired() -> int

Remove all entries whose TTL has elapsed and return the count removed.

def get_stats() -> dict[str, Any]

Return store statistics including size, capacity, and waiter count.


Represents a background job with comprehensive status tracking

Jobs are executable work units that can be queued, scheduled, tracked, and retried. They support priority ordering, dependency management, and timeout control.

Attributes: id: Unique job identifier name: Human-readable job name/type args: Positional arguments for job handler kwargs: Keyword arguments for job handler priority: Execution priority (higher = more important) max_retries: Maximum retry attempts on failure timeout: Execution timeout in seconds depends_on: List of job IDs this job depends on status: Current job status created_at: Timestamp when job was created started_at: Timestamp when job started executing completed_at: Timestamp when job finished retry_count: Current number of retry attempts last_error: Most recent error message result: JobProtocol execution result scheduled_at: Timestamp for scheduled execution cron_expression: Cron expression for recurring jobs

property is_pending() -> bool

Check if job is pending execution

property is_running() -> bool

Check if job is currently running

property is_completed() -> bool

Check if job completed successfully

property is_failed() -> bool

Check if job failed

property can_retry() -> bool

Check if job can be retried (hasn’t exceeded max_retries)

property duration_ms() -> float | None

Get job execution duration in milliseconds

def mark_running() -> None

Mark job as running and record start time

def mark_completed(result: JobResult) -> None

Mark job as completed with result

Parameters
ParameterTypeDescription
`result`JobResultJobProtocol execution result
def mark_failed(error: str) -> None

Mark job as failed with error message

Parameters
ParameterTypeDescription
`error`strError message describing the failure
def mark_retrying() -> None

Mark job as retrying and increment retry count

def mark_cancelled() -> None

Mark job as cancelled

def to_dict() -> dict[str, Any]

Convert job to dictionary for serialization

Returns
TypeDescription
dict[str, Any]Dictionary representation of job
def from_dict(
    cls,
    data: dict[str, Any]
) -> JobProtocol

Create job from dictionary

Parameters
ParameterTypeDescription
`data`dict[str, Any]Dictionary representation of job
Returns
TypeDescription
JobProtocolJobProtocol instance

Result of a job execution

Captures the outcome of job execution including success status, return data, error information, and performance metrics.

def ok(
    cls,
    data: Any = None,
    duration: float = 0.0
) -> JobResult

Create a successful result

Parameters
ParameterTypeDescription
`data`AnyReturn value from the job
`duration`floatExecution duration in seconds
Returns
TypeDescription
JobResultJobResult with success=True
def fail(
    cls,
    error: str,
    retry_count: int = 0,
    duration: float = 0.0
) -> JobResult

Create a failure result

Parameters
ParameterTypeDescription
`error`strError message describing the failure
`retry_count`intNumber of retry attempts made
`duration`floatExecution duration in seconds
Returns
TypeDescription
JobResultJobResult with success=False
def to_dict() -> dict[str, Any]

Convert result to dictionary for serialization

def from_dict(
    cls,
    data: dict[str, Any]
) -> JobResult

Create result from dictionary


JobProtocol execution status.

Template for creating scheduled jobs

JobTemplateProtocol defines a reusable job configuration that can be instantiated multiple times with different IDs.

Attributes: name: JobProtocol name/type args: Positional arguments kwargs: Keyword arguments priority: Execution priority max_retries: Maximum retry attempts timeout: Execution timeout depends_on: JobProtocol dependencies template

def create_job(job_id: str = '') -> JobProtocol

Create a JobProtocol instance from this template

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID (generated if empty)
Returns
TypeDescription
JobProtocolNew JobProtocol instance
def from_job(
    cls,
    job: JobProtocol
) -> JobTemplateProtocol

Create template from existing job

Parameters
ParameterTypeDescription
`job`JobProtocolJobProtocol to convert to template
Returns
TypeDescription
JobTemplateProtocolJobTemplateProtocol instance

Logs task start, completion, and failures.
async def before_execute(ctx: TaskExecutionContext) -> None

Log task start before execution.

async def after_execute(ctx: TaskExecutionContext) -> None

Log task completion status and duration after execution.

async def on_error(
    ctx: TaskExecutionContext,
    error: Exception
) -> None

Log the error when a task raises an exception.


In-memory task queue implementation

Uses Python’s heapq for priority-based ordering. Tasks are stored in memory and will be lost on process restart. Thread-safe using asyncio.Lock for concurrent access.

Characteristics:

  • Storage: In-memory heap-based priority queue
  • Concurrency: asyncio.Lock for thread safety
  • Persistence: No persistence (volatile)
  • Use Case: Development, testing, single-process applications
  • Limitations: No persistence, single-process only

Example

queue = MemoryTaskQueue()
await queue.enqueue(task)
task = await queue.dequeue()
def __init__() -> None

Initialize memory task queue

def set_hook_registry(hooks: HookRegistryProtocol | None) -> None

Attach an optional hook registry after provider boot wiring.

async def enqueue(task: JobProtocol) -> Result[str, TaskError]

Add a task to the queue, returning Ok(task_id) or Err(TaskError) on failure.

Tasks are stored in a min-heap with negative priority to achieve max-heap behavior (highest priority first).

Parameters
ParameterTypeDescription
`task`JobProtocolTask to enqueue
Returns
TypeDescription
Result[str, TaskError]Ok containing the enqueued task id, or Err containing a TaskError.
async def dequeue() -> JobProtocol | None

Remove and return the next task from the queue

Returns the highest priority task. If multiple tasks have the same priority, returns the oldest (FIFO). Respects task delay - tasks with future available_at are skipped.

Returns
TypeDescription
JobProtocol | NoneNext task or None if queue is empty or no tasks are available
async def ack(task_id: str) -> None

Acknowledge successful processing of a task

Removes the task from the in-flight tracking set.

Parameters
ParameterTypeDescription
`task_id`strID of the task to acknowledge
async def nack(
    task_id: str,
    requeue: bool = True
) -> None

Negative-acknowledge a task, optionally requeuing it

Removes the task from in-flight tracking. If requeue is True the task is pushed back onto the priority heap for retry.

Parameters
ParameterTypeDescription
`task_id`strID of the task to negative-acknowledge
`requeue`boolIf True, return the task to the queue for retry
async def get_task_count() -> int

Get the number of tasks in the queue

Returns
TypeDescription
intNumber of pending tasks
async def clear() -> None

Clear all tasks from the queue

async def close() -> None

Close the queue connection

For memory queue, there are no external resources to clean up.


Collects per-task-type execution metrics.
def __init__() -> None
async def after_execute(ctx: TaskExecutionContext) -> None

Record execution metrics for the completed task.

async def on_error(
    ctx: TaskExecutionContext,
    error: Exception
) -> None

Increment failure counter for the task type that raised an error.

def get_stats() -> dict[str, Any]

Return aggregated execution statistics keyed by task name.


Configuration for a single named task queue backend.

Used in TaskConfig.backends to declare multiple task queues that the framework registers as Named DI bindings.

Parameters
ParameterTypeDescription
`name`Unique backend identifier. Used as the Named() DI key.
`primary`Whether this is the primary backend. Primary backends also receive the unnamed TaskQueueProtocol binding.
`type`Backend type ('memory', 'redis', 'rabbitmq', 'postgres').
`redis_url`Redis connection URL (when type='redis').
`amqp_url`AMQP connection URL (when type='rabbitmq').
`postgres_dsn`Postgres DSN (when type='postgres').
`queue_name`Queue name for this backend instance.

Example

backends:
- name: primary
primary: true
type: redis
redis_url: "${REDIS_URL}"
- name: notifications
type: rabbitmq
amqp_url: "${AMQP_URL}"

Inject by name

queue: Annotated[TaskQueueProtocol, Named("notifications")]

Task priority levels for queue ordering (higher number = higher priority).

The scale is 020 and is separate from the kernel’s ProviderPriority (0100). Workers dequeue higher-priority jobs before lower-priority ones.

Values: LOW (0): Background tasks that can be deferred indefinitely. NORMAL (5): Default priority for routine task processing. HIGH (10): Time-sensitive tasks processed before NORMAL queue. CRITICAL (20): Highest priority; reserved for urgent system tasks.


Current progress state of a task.
property is_complete() -> bool

Return True when the current count has reached the total.

property elapsed_seconds() -> float

Return seconds elapsed since the task started, or 0 if not started.


Immutable point-in-time view of a task's progress.
Parameters
ParameterTypeDescription
`task_id`Unique identifier for the tracked task.
`current`Number of units completed so far.
`total`Total number of units to complete (0 means unknown).
`status`Current lifecycle state.
`message`Human-readable status message.
`error`Error description when ``status`` is ``FAILED``; empty otherwise.
property percent() -> float

Completion percentage in the range [0.0, 100.0].

Returns 0.0 when total is unknown (zero).


Lifecycle states for a tracked task.

Values are lowercase strings so they round-trip cleanly through JSON.


Abstract storage for progress state.
async def save(info: ProgressInfo) -> None

Save progress state.

async def get(job_id: str) -> ProgressInfo | None

Get progress for a job.

async def delete(job_id: str) -> None

Remove progress entry.

async def list_active() -> list[ProgressInfo]

List all active (incomplete) progress entries.


Track and report task progress.

Injected into task handlers to enable progress reporting.

def __init__(
    job_id: str,
    store: ProgressStore
) -> None
async def update(
    current: int,
    total: int,
    message: str = '',
    **metadata: Any
) -> None

Update progress.

Parameters
ParameterTypeDescription
`current`intCurrent progress count.
`total`intTotal items to process.
`message`strHuman-readable status message. **metadata: Additional metadata.
async def complete(message: str = 'Complete') -> None

Mark progress as complete.

async def fail(message: str) -> None

Mark progress as failed.

property info() -> ProgressInfo

Return the current ProgressInfo snapshot.


Per-queue rate limiter.

Manages independent rate limits for named task queues, delegating each limit to a TokenBucket.

Example

limiter = QueueRateLimiter()
limiter.add_limit("emails", rate=5, per=1.0) # 5 / sec
limiter.add_limit("reports", rate=1, per=60.0) # 1 / min
await limiter.acquire("emails")
def add_limit(
    queue: str,
    rate: int,
    per: float = 1.0,
    burst: int | None = None
) -> None

Register a rate limit for a named queue.

Parameters
ParameterTypeDescription
`queue`strQueue name to apply the limit to.
`rate`intAllowed requests per time period.
`per`floatLength of the time period in seconds.
`burst`int | NoneMaximum burst size (defaults to *rate*).
async def acquire(queue: str) -> None

Block until a token is available for queue.

Parameters
ParameterTypeDescription
`queue`strQueue name; a no-op for unlisted queues.
async def try_acquire(queue: str) -> bool

Attempt a non-blocking token acquisition for queue.

Parameters
ParameterTypeDescription
`queue`strQueue name.
Returns
TypeDescription
bool``True`` if a token was obtained or the queue has no limit; ``False`` if the queue is currently rate-limited.

Abstract result store backend.
async def store(
    job_id: str,
    result: JobResult
) -> None

Store a job result.

async def get(job_id: str) -> JobResult | None

Get a result by job ID. Returns None if not found or expired.

async def delete(job_id: str) -> bool

Delete a result. Returns True if deleted.

async def wait(
    job_id: str,
    *,
    timeout: float = 30.0,
    poll_interval: float = 0.5
) -> JobResult | None

Wait for a result with timeout.

async def cleanup_expired() -> int

Remove expired results. Returns count removed.


Represents a scheduled job with cron expression

Attributes: job_template: Template for creating job instances cron_expression: Cron expression string next_run: Unix timestamp of next execution enabled: Whether job is enabled for execution

def calculate_next_run() -> float

Calculate the next run time based on cron expression

Returns
TypeDescription
floatUnix timestamp of next execution

Result from a single workflow step.

Configuration for task queue backends.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Sequential task execution — output of each step feeds the next.

Example

chain = TaskChain([ TaskStep(“parse”, parse_data), TaskStep(“validate”, validate_data), TaskStep(“save”, save_data), ]) result = await chain.execute(raw_input)

def __init__(
    steps: list[TaskStep],
    *,
    stop_on_error: bool = True,
    state_store: WorkflowStateStore | None = None,
    workflow_id: str | None = None
) -> None
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Execute all steps in sequence, threading output of each step into the next.

If a workflow_id and state_store were provided at construction, this method will attempt to resume from the last saved checkpoint — any step whose name appears in the persisted state’s completed steps will be skipped, and the last completed step’s output will seed the next step.


Fan-out then collect — parallel steps + callback with all results.

Example

chord = TaskChord( steps=[TaskStep(“a”, fetch_a), TaskStep(“b”, fetch_b)], callback=TaskStep(“merge”, merge_results), ) result = await chord.execute()

def __init__(
    steps: list[TaskStep],
    callback: TaskStep
) -> None
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Fan out to all group steps then call the callback with their collected results.


Emitted when a task finishes successfully.

Attributes: task_id: Unique identifier of the completed task. worker_id: Identifier of the worker that executed the task.


Payload fired when a task finishes successfully.

Attributes: task_name: Name or type label of the completed task. task_id: Unique identifier of the task instance.


Hierarchical root configuration for Lexigram Tasks.

Attributes: name: Configuration name (default: “tasks”) enabled: Whether tasks module is enabled backend: Task queue backend configuration worker: Task worker settings scheduler: Task scheduler settings retry: Task retry configuration rate_limit: Task rate limiting timeout: Task timeout settings extra: Extra configuration

def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Validate task configuration for the given environment.

Parameters
ParameterTypeDescription
`env`Environment | NoneTarget environment; resolved from ``LEX_ENV`` when ``None``.
Returns
TypeDescription
list[ConfigIssue]List of ConfigIssue instances.
def from_named(
    cls,
    entry: NamedTaskConfig
) -> TaskConfig

Build a single-backend TaskConfig from a NamedTaskConfig entry.

Used internally by TasksProvider to create per-backend configs.

Parameters
ParameterTypeDescription
`entry`NamedTaskConfigThe named backend entry to materialise.
Returns
TypeDescription
TaskConfigA TaskConfig configured for the single named backend.

Aggregated task observability dashboard.
def __init__(
    *,
    history_size: int = 1000,
    window_seconds: float = 300
) -> None
def record_execution(
    task_name: str,
    *,
    job_id: str = '',
    duration_ms: float = 0.0,
    success: bool = True,
    error: str | None = None,
    worker_id: str | None = None
) -> None

Record a task execution.

def get_summary() -> dict[str, Any]

Get dashboard summary with throughput and error rates.

def get_per_task_stats() -> dict[str, dict[str, Any]]

Get per-task-type statistics with latency percentiles.

def get_recent_executions(
    *,
    limit: int = 50,
    task_name: str | None = None
) -> list[dict[str, Any]]

Get recent execution history.

def get_error_breakdown() -> dict[str, list[dict[str, Any]]]

Get recent errors grouped by task type.

def reset() -> None

Reset all counters and history.


Payload fired when a task is placed on the queue.

Attributes: task_name: Name or type label of the enqueued task. queue_name: Name of the queue the task was added to.


Context passed through the middleware pipeline.

Emitted when a task fails after all retry attempts are exhausted.

Attributes: task_id: Unique identifier of the failed task. error: Human-readable description of the failure reason.


Payload fired when a task raises an unhandled exception.

Attributes: task_name: Name or type label of the failed task. task_id: Unique identifier of the task instance. reason: Short description or exception message.


Parallel task execution — all steps run concurrently.

Example

group = TaskGroup([ TaskStep(“email”, send_email), TaskStep(“sms”, send_sms), ]) result = await group.execute(user_data)

def __init__(
    steps: list[TaskStep],
    *,
    max_concurrency: int | None = None,
    state_store: WorkflowStateStore | None = None,
    workflow_id: str | None = None
) -> None
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Run all steps concurrently, respecting max_concurrency if set.


Health status for task processing system

Aggregates health information from queue, workers, and scheduler.

property is_healthy() -> bool

Check if system is healthy

property success_rate() -> float

Calculate job success rate

def to_dict() -> dict[str, Any]

Convert to dictionary for serialization


Abstract base for task middleware.
async def before_execute(ctx: TaskExecutionContext) -> None

Called before task execution. Override to modify or log.

async def after_execute(ctx: TaskExecutionContext) -> None

Called after task execution. Override to observe or log.

async def on_error(
    ctx: TaskExecutionContext,
    error: Exception
) -> None

Called when task execution fails.


Chains middleware around task execution.
def __init__() -> None
def add(middleware: TaskMiddleware) -> None

Append a middleware instance to the pipeline.

async def execute(
    job: JobProtocol,
    handler: Any
) -> JobResult

Execute a job through the middleware pipeline.

Parameters
ParameterTypeDescription
`job`JobProtocolThe job to execute.
`handler`AnyThe async handler function.
Returns
TypeDescription
JobResultJobResult from execution.

Task processing provider for Lexigram Framework

TaskProvider integrates task processing with the Lexigram Framework, providing dependency injection, lifecycle management, and health monitoring.

Example

from lexigram.app import Application
from lexigram.tasks import TaskProvider, MemoryTaskQueue
app = Application()
queue = MemoryTaskQueue()
provider = TaskProvider(queue, worker_count=4)
app.use(provider)
await app.start()
def __init__(
    queue: TaskQueueProtocol,
    worker_count: int = 1,
    enable_scheduler: bool = True
)

Initialize task provider

Parameters
ParameterTypeDescription
`queue`TaskQueueProtocolTaskQueueProtocol implementation to use
`worker_count`intNumber of workers to create
`enable_scheduler`boolWhether to enable job scheduling
def from_config(
    cls,
    config: TaskConfig,
    **context: Any
) -> TaskProvider

Create a TaskProvider from a TaskConfig.

Context kwargs may include a pre-built ‘queue’. If not provided, a MemoryTaskQueue is created as default.

async def register(container: ContainerRegistrarProtocol) -> None

Register task services with the DI container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolDI registrar to bind task services into.
async def boot(container: ContainerResolverProtocol) -> None

Start the task provider.

Called by the framework on application startup after all registrations.

async def shutdown() -> None

Shutdown the task provider gracefully

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check task provider health.

In multi-backend mode the overall status is the worst individual status across all named queues. TaskQueueProtocol does not expose a health_check() method, so liveness is probed via get_task_count().

Returns
TypeDescription
HealthCheckResultHealthCheckResult with current status and metrics.
def register_handler(
    task_name: str,
    handler: Callable[Ellipsis, Any]
) -> None

Register a task handler

Parameters
ParameterTypeDescription
`task_name`strName of the task type
`handler`Callable[Ellipsis, Any]Async handler function
def register_scheduled_task(task_func: Any) -> None

Register a decorated task function for scheduling.

Parameters
ParameterTypeDescription
`task_func`AnyTask function decorated with @scheduled
async def enqueue_job(job: JobProtocol) -> str

Enqueue a job for processing

def schedule_job_sync(
    job_template: JobProtocol | JobTemplateProtocol,
    cron_expression: str,
    job_id: str | None = None
) -> str | None

Schedule a job with cron expression

def unschedule_job_sync(job_id: str) -> bool

Remove a scheduled job

def get_worker_stats() -> dict[str, Any] | None

Get worker pool statistics

def get_scheduled_jobs() -> dict[str, Any] | None

Get scheduled jobs information


Emitted when a task is accepted into a processing queue.

Attributes: task_id: Unique identifier of the queued task. queue_name: Name of the queue that accepted the task.


Configuration for rate limiting.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Task scheduler for cron-like job scheduling

TaskScheduler manages periodic job execution using cron expressions. Jobs are checked at regular intervals and enqueued when due.

Example

scheduler = TaskScheduler()
# Schedule job
template = JobTemplateProtocol(name="cleanup", args=())
scheduler.schedule_job(template, "0 0 * * *") # Daily at midnight
# Start scheduler
await scheduler.start_scheduler(queue.enqueue)
def __init__(
    check_interval: float = 60.0,
    store: SchedulerStore | None = None
)

Initialize task scheduler.

Parameters
ParameterTypeDescription
`check_interval`floatHow often to check for due jobs (seconds).
`store`SchedulerStore | NoneOptional SchedulerStore for durable persistence of job definitions and next-run times. When provided, all schedule/unschedule operations are persisted and the scheduler can be restored after restart via restore_from_store.
def schedule_job_sync(
    job_template: JobTemplateProtocol | JobProtocol,
    cron_expression: str,
    job_id: str | None = None
) -> str

Schedule a job with cron expression (sync, no persistence).

For the primary async version with store persistence, use schedule_job.

Parameters
ParameterTypeDescription
`job_template`JobTemplateProtocol | JobProtocolJobProtocol template or JobProtocol to schedule
`cron_expression`strCron expression (e.g., "0 9 * * 1-5")
`job_id`str | NoneOptional job ID, generated if not provided
Returns
TypeDescription
strJobProtocol ID for the scheduled job
async def schedule_job(
    job_template: JobTemplateProtocol | JobProtocol,
    cron_expression: str,
    job_id: str | None = None
) -> str

Schedule a job and persist to store if one is configured.

This is the primary async method. For a synchronous variant without persistence, use schedule_job_sync.

Parameters
ParameterTypeDescription
`job_template`JobTemplateProtocol | JobProtocolJobProtocol template or JobProtocol to schedule.
`cron_expression`strCron expression (e.g., ``"0 9 * * 1-5"``).
`job_id`str | NoneOptional job ID, generated if not provided.
Returns
TypeDescription
strJobProtocol ID for the scheduled job.
async def restore_from_store() -> int

Restore all persisted job definitions from the configured store.

Loads every job persisted by previous scheduler instances and re-registers them in scheduled_jobs. Call this once during startup before invoking start_scheduler to resume the full scheduling state.

Returns
TypeDescription
intNumber of jobs restored.
Raises
ExceptionDescription
RuntimeErrorIf no SchedulerStore was provided at construction.
def unschedule_job_sync(job_id: str) -> bool

Remove a scheduled job (sync, no store deletion).

For the primary async version with store deletion, use unschedule_job.

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to remove
Returns
TypeDescription
boolTrue if job was removed, False if not found
async def unschedule_job(job_id: str) -> bool

Remove a scheduled job and delete it from the store.

This is the primary async method. For a synchronous variant without store deletion, use unschedule_job_sync.

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to remove.
Returns
TypeDescription
boolTrue if job was removed, False if not found.
def get_scheduled_jobs() -> list[ScheduledJob]

Get all scheduled jobs

Returns
TypeDescription
list[ScheduledJob]List of all scheduled jobs
def get_scheduled_job(job_id: str) -> ScheduledJob | None

Get a specific scheduled job

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID
Returns
TypeDescription
ScheduledJob | NoneScheduledJob or None if not found
async def start_scheduler(enqueue_callback: Callable[[JobProtocol], Any]) -> None

Start the scheduler loop

Parameters
ParameterTypeDescription
`enqueue_callback`Callable[[JobProtocol], Any]Async function to enqueue jobs when they are due
async def stop_scheduler() -> None

Stop the scheduler gracefully

def enable_job(job_id: str) -> bool

Enable a scheduled job

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID
Returns
TypeDescription
boolTrue if job was enabled
def disable_job(job_id: str) -> bool

Disable a scheduled job

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID
Returns
TypeDescription
boolTrue if job was disabled
def get_next_run_times() -> dict[str, float]

Get next run times for all enabled scheduled jobs

Returns
TypeDescription
dict[str, float]Dictionary mapping job ID to next run timestamp

Configuration for task scheduler.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Payload fired when a worker picks up and begins executing a task.

Attributes: task_name: Name or type label of the task being executed. task_id: Unique identifier of the task instance.


A single step in a workflow.
Parameters
ParameterTypeDescription
`name`Human-readable step name.
`handler`Async callable that takes input and returns output.
`on_error`Optional error handler.
`timeout`Optional timeout in seconds.

Configuration for task timeout behavior.

Individual task worker for job execution

TaskWorker processes jobs from a queue using registered handlers. Each worker runs independently and tracks its own statistics.

Example

worker = TaskWorker("worker-1", queue, handlers)
await worker.start()
# ... worker processes jobs ...
await worker.stop()
def __init__(
    worker_id: str,
    queue: TaskQueueProtocol,
    handler_registry: dict[str, Callable[Ellipsis, Any]],
    config: TaskWorkerConfig | None = None,
    services: TaskWorkerServices | None = None,
    *,
    middleware_pipeline: TaskMiddlewarePipeline | None = None
)

Initialize task worker.

Parameters
ParameterTypeDescription
`worker_id`strUnique worker identifier.
`queue`TaskQueueProtocolTaskQueueProtocol to pull jobs from.
`handler_registry`dict[str, Callable[Ellipsis, Any]]Dict mapping job names to handler functions.
`config`TaskWorkerConfig | NoneOptional worker configuration controlling timeouts and execution behaviour. Defaults to TaskWorkerConfig() when not provided.
`services`TaskWorkerServices | NoneOptional bundle of infrastructure services (DLQ, result store, progress store, dashboard, retry policy, task manager, idempotency manager, logger). Use TaskWorkerServices to compose only the services you need; absent services degrade gracefully. Pass a ``logger`` via ``services.logger`` to inject a custom bound logger; otherwise a module logger is created and bound to ``worker_id`` automatically.
`middleware_pipeline`TaskMiddlewarePipeline | NoneOptional middleware pipeline for cross-cutting concerns.
def get_stats() -> WorkerJobStats

Get worker statistics

Returns
TypeDescription
WorkerJobStatsCurrent worker statistics
def is_busy() -> bool

Check if worker is currently processing a job

Returns
TypeDescription
boolTrue if worker has a job in progress
def get_progress_tracker(job_id: str) -> ProgressTracker | None

Get a ProgressTracker for a job (opt-in for handlers).

Handlers can use this to report progress. The worker must have a progress_store configured.

Parameters
ParameterTypeDescription
`job_id`strThe job ID to track progress for
Returns
TypeDescription
ProgressTracker | NoneProgressTracker instance if progress_store is configured, None otherwise

Configuration for task workers.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Optional infrastructure services injected into a TaskWorker.

Groups all optional infrastructure dependencies so that the TaskWorker constructor stays focused on the mandatory parameters. All fields default to None — omitted services degrade gracefully.

Attributes: dead_letter_queue: DLQ for routing permanently-failed jobs. result_store: Persistent store for job execution results. progress_store: Store for in-progress task progress updates. dashboard: Observability dashboard for worker and job metrics. retry_policy: Retry policy implementation (from lexigram-resilience). task_manager: Kernel task manager for critical shutdown registration. idempotency_manager: Manager for at-most-once job execution. logger: Optional logger instance.


Background task workers, scheduling, and async job queues.

Call configure to configure the task-processing subsystem.

Usage (in-memory queue for development)

from lexigram.tasks.backends.memory import MemoryTaskQueue
@module(
imports=[TasksModule.configure(queue=MemoryTaskQueue())]
)
class AppModule(Module):
pass

Usage (Redis queue for production)

from lexigram.tasks.backends.redis import RedisTaskQueue
@module(
imports=[
TasksModule.configure(
queue=RedisTaskQueue(url="redis://localhost"),
worker_count=4,
)
]
)
class AppModule(Module):
pass
def configure(
    cls,
    queue: Any | None = None,
    worker_count: int = 1,
    enable_scheduler: bool = True
) -> DynamicModule

Create a TasksModule with explicit configuration.

Parameters
ParameterTypeDescription
`queue`Any | NoneTaskQueueProtocol implementation. Defaults to MemoryTaskQueue.
`worker_count`intNumber of concurrent task workers.
`enable_scheduler`boolWhether to enable the job scheduler.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def scope(
    cls,
    *handlers: type
) -> DynamicModule

Scope task handler classes into a feature module.

Registers the given task handler classes as providers so they are discovered and wired by the task executor. The parent module graph must already include TasksModule.configure — this does not create a new queue or worker pool.

Uses the anonymous token pattern so both configure() and scope() can coexist in the same compiled graph without a ModuleDuplicateError.

Example

@module(
imports=[
TasksModule.configure(queue=RedisTaskQueue(url=...)),
TasksModule.scope(SendEmailTask, GenerateReportTask),
]
)
class NotificationFeatureModule(Module):
pass
Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule scoped to this feature.
def stub(cls) -> DynamicModule

Return an in-memory TasksModule for unit testing.

Uses a MemoryTaskQueue with a single worker and no job scheduler.

Returns
TypeDescription
DynamicModuleA DynamicModule backed by an in-memory task queue.

Enforces execution timeout on tasks.

Uses asyncio.wait_for to enforce timeout on task execution.

def __init__(
    default_timeout: float = 300.0,
    max_timeout: float | None = None
) -> None
async def before_execute(ctx: TaskExecutionContext) -> None

Compute and store the effective timeout in the execution context metadata.

async def execute_with_timeout(
    handler: Any,
    job: JobProtocol,
    ctx: TaskExecutionContext
) -> JobResult

Execute handler with timeout enforcement.

Parameters
ParameterTypeDescription
`handler`AnyThe handler to execute
`job`JobProtocolThe job being executed
`ctx`TaskExecutionContextExecution context
Returns
TypeDescription
JobResultJobResult from execution

Decorator for unique task execution.

Prevents duplicate execution of the same task using in-memory locks.

Example

lock_manager = LockManager()
@unique_task(
lock_manager=lock_manager,
key_func=lambda user_id: f"sync_user:{user_id}",
timeout=3600
)
async def sync_user(user_id: str):
await sync_user_data(user_id)
def __init__(
    lock_manager: LockManager,
    key_func: Callable[[Any], str],
    timeout: float = 60.0,
    skip_if_locked: bool = True
)

Initialize unique task decorator.

Parameters
ParameterTypeDescription
`lock_manager`LockManagerLockManager instance for creating locks
`key_func`Callable[[Any], str]Function to generate lock key from task args
`timeout`floatLock timeout in seconds
`skip_if_locked`boolIf True, skip execution if lock held; if False, wait

Worker statistics for monitoring performance.

Tracks execution metrics for an individual worker including job counts, timing, and activity tracking.

property uptime() -> float

Get worker uptime in seconds.

def record_job(result: JobResult) -> None

Record job execution statistics.

Parameters
ParameterTypeDescription
`result`JobResultJobProtocol execution result.

Pool of task workers with monitoring and scaling

WorkerPool manages multiple TaskWorker instances for concurrent job processing. Supports dynamic scaling and aggregate statistics.

Example

pool = WorkerPool(queue, handlers, size=4)
await pool.start()
stats = pool.get_pool_stats()
await pool.scale_to(8) # Scale up
await pool.stop()
def __init__(
    queue: TaskQueueProtocol,
    handler_registry: dict[str, Callable[Ellipsis, Any]],
    size: int = 1,
    logger: Logger | None = None,
    task_manager: TaskManagerProtocol | None = None,
    dead_letter_queue: DeadLetterQueue | None = None,
    hooks: HookRegistryProtocol | None = None
)

Initialize worker pool

Parameters
ParameterTypeDescription
`queue`TaskQueueProtocolTaskQueueProtocol for workers to pull from
`handler_registry`dict[str, Callable[Ellipsis, Any]]Dict mapping job names to handlers
`size`intNumber of workers to create
`task_manager`TaskManagerProtocol | NoneOptional kernel task manager for critical task registration during worker lifecycle.
`dead_letter_queue`DeadLetterQueue | NoneOptional shared DLQ for permanently failed jobs.
async def start() -> None

Start the worker pool

Creates and starts all workers in the pool.

async def stop(
    drain: bool = False,
    timeout: float | None = 30.0
) -> None

Stop the worker pool gracefully.

Parameters
ParameterTypeDescription
`drain`boolIf ``True``, wait for each worker to finish its current job before stopping. Defaults to ``False`` for a fast cancel-and-stop.
`timeout`float | NonePer-worker seconds to wait when *drain* is ``True``. ``None`` means wait indefinitely. Defaults to 30 seconds.
def get_worker_stats() -> list[WorkerJobStats]

Get statistics for all workers

Returns
TypeDescription
list[WorkerJobStats]List of WorkerJobStats for each worker
def get_pool_stats() -> dict[str, Any]

Get aggregate pool statistics

Returns
TypeDescription
dict[str, Any]Dictionary with pool-wide statistics
async def scale_to(new_size: int) -> None

Scale the worker pool to a new size

Dynamically add or remove workers.

Parameters
ParameterTypeDescription
`new_size`intTarget number of workers

Result from a complete workflow execution.

Enumeration of possible workflow execution states.

def chain(
    *steps: TaskStep,
    stop_on_error: bool = True
) -> TaskChain

Create a sequential TaskChain from the given steps.

Each step receives the output of the previous step as its input.

Parameters
ParameterTypeDescription
`stop_on_error`boolWhether to abort the pipeline on the first failure. Defaults to ``True``.
Returns
TypeDescription
TaskChainA configured TaskChain ready to ``await .execute()``.

Example

result = await chain(
TaskStep("validate", validate_input),
TaskStep("process", process_data),
TaskStep("persist", save_to_db),
).execute(raw_data)

def delay(fn: Callable[_P, Coroutine[Any, Any, _R]]) -> _DelayedCallable[_P, _R]

Decorate an async function to add a .delay() background-dispatch method.

The decorated function is unchanged when called directly — it still returns a coroutine that must be await-ed. The .delay() method is the only addition; it schedules the coroutine as a background asyncio.Task and stores the reference to prevent premature garbage collection.

Parameters
ParameterTypeDescription
`fn`Callable[_P, Coroutine[Any, Any, _R]]An async function (coroutine function) to wrap.
Returns
TypeDescription
_DelayedCallable[_P, _R]A _DelayedCallable that is callable exactly like ``fn`` but also exposes ``.delay(*args, **kwargs) -> asyncio.Task``.

Example

@delay
async def flush_audit_log(batch: list[AuditEntry]) -> None:
await audit_store.save_many(batch)
# Direct call (awaited in the current task):
await flush_audit_log(batch=entries)
# Background dispatch (does not block):
flush_audit_log.delay(batch=entries)

async def distributed_lock(
    key: str,
    timeout: float = 60.0,
    lock_manager: LockManager | None = None
) -> AsyncGenerator[InMemoryLock, None]

Context manager for in-memory locks.

Deprecated: Use LockManager.acquire() instead for better DI integration.

Parameters
ParameterTypeDescription
`key`strUnique lock key
`timeout`floatLock timeout in seconds
`lock_manager`LockManager | NoneOptional LockManager instance (creates new if None)
Yields
TypeDescription
AsyncGenerator[InMemoryLock, None]InMemoryLock instance

Example

manager = LockManager() # Should be from DI container
async with distributed_lock("resource:123", lock_manager=manager):
await process_resource(123)

def scheduled(
    cron: str,
    name: str | None = None,
    priority: Priority | int = Priority.NORMAL,
    max_retries: int = 3,
    timeout: float | None = None
) -> Callable[[Callable[Ellipsis, Any]], Any]

Decorator to define a scheduled task.

Parameters
ParameterTypeDescription
`cron`strCron expression (e.g., "0 9 * * *")
`name`str | NoneTask name
`priority`Priority | intTask priority
`max_retries`intMaximum retry attempts
`timeout`float | NoneExecution timeout
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Any]Decorated task function with scheduling metadata

def task(
    name: str | None = None,
    priority: Priority | int = Priority.NORMAL,
    max_retries: int = 3,
    timeout: float | None = None,
    queue: str = 'default',
    idempotency_key: str | None = None
) -> Callable[[Callable[Ellipsis, Any]], Any]

Decorator to define a task.

Converts a regular function into a task that can be enqueued and executed by workers.

Parameters
ParameterTypeDescription
`name`str | NoneTask name (defaults to function name)
`priority`Priority | intTask priority (HIGH, NORMAL, LOW)
`max_retries`intMaximum retry attempts on failure
`timeout`float | NoneExecution timeout in seconds
`queue`strQueue name for this task
`idempotency_key`str | NoneOptional key for idempotent execution
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Any]Decorated function with .delay() and .apply_async() methods

The queue has reached its maximum capacity and cannot accept new tasks.

The caller should back off and retry after a delay, or apply backpressure.

def __init__(
    message: str = 'Task queue is full',
    *,
    queue_name: str | None = None,
    capacity: int | None = None,
    **kwargs: Any
) -> None

Raised when a task is cancelled

Base exception for all task-related errors

Raised when a task execution fails
def __init__(
    message: str,
    original_error: Exception | None = None,
    details: dict[str, Any] | None = None,
    hint: str | None = None
) -> None

Raised when a task cannot be found

Raised when a task execution exceeds timeout

Raised when task parameters are invalid