Skip to content
GitHub

API Reference

Protocol for tracking and broadcasting task progress.

Implementations must be safe for concurrent use — multiple coroutines may call update on the same task_id simultaneously, and multiple consumers may subscribe to the same task.

The subscribe method returns a live AsyncIterator that yields a new ProgressSnapshot every time update, complete, or fail is called for the given task. The iterator terminates automatically when the task reaches a terminal state (COMPLETE or FAILED).

Example

async def export_data(tracker: ProgressTrackerProtocol) -> None:
records = await fetch_all()
total = len(records)
for i, record in enumerate(records, start=1):
await process(record)
await tracker.update("export-1", i, total, f"Row {i}/{total}")
await tracker.complete("export-1", "Export finished")
update
async def update(
    task_id: str,
    current: int,
    total: int,
    message: str = ''
) -> None

Record incremental progress for a task.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`current`intUnits completed so far.
`total`intTotal units to process (0 = unknown).
`message`strOptional human-readable status message.
complete
async def complete(
    task_id: str,
    result: str = ''
) -> None

Mark a task as successfully completed.

Closes all active subscriptions for task_id after broadcasting the terminal ProgressSnapshot.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`result`strOptional human-readable completion message.
fail
async def fail(
    task_id: str,
    error: str
) -> None

Mark a task as failed.

Closes all active subscriptions for task_id after broadcasting the terminal ProgressSnapshot.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`error`strDescription of the failure.
get
async def get(task_id: str) -> ProgressSnapshot | None

Return the current progress state for a task.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
Returns
TypeDescription
ProgressSnapshot | NoneThe most recent ProgressSnapshot, or ``None`` if the task is not known to this tracker.
subscribe
def subscribe(task_id: str) -> AsyncIterator[ProgressSnapshot]

Subscribe to live progress updates for a task.

Returns an async iterator that yields a new ProgressSnapshot each time the task’s state changes. The iterator stops automatically when the task reaches a terminal state (COMPLETE or FAILED).

If the task is already in a terminal state when subscribe is called, the iterator yields the final snapshot once and then stops.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task to observe.
Returns
TypeDescription
AsyncIterator[ProgressSnapshot]An AsyncIterator of ProgressSnapshot objects.

Example

async for snap in tracker.subscribe("job-42"):
print(f"{snap.percent:.1f}% — {snap.message}")

Protocol for task executor implementations.

Executors process tasks from a queue by dispatching them to registered handlers.

Example

class TaskExecutorProtocol:
async def execute(self, task: Task) -> Any:
handler = self._handlers.get(task.name)
if handler:
return await handler(task.payload)
raise UnknownTaskError(task.name)
execute
async def execute(task: Any) -> Any

Execute a task using the registered handler.

Parameters
ParameterTypeDescription
`task`AnyTask to execute.
Returns
TypeDescription
AnyTask execution result.
Raises
ExceptionDescription
UnknownTaskErrorIf no handler registered for task.
register_handler
def register_handler(
    task_name: str,
    handler: Any
) -> None

Register a handler for a task type.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
`handler`AnyAsync callable to handle the task.
get_handler
def get_handler(task_name: str) -> Any | None

Get the handler for a task type.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
Returns
TypeDescription
Any | NoneHandler if registered, None otherwise.

Protocol for task queue implementations.

This defines the contract for background task queues (Redis-based, PostgreSQL-based, in-memory, etc.).

Example

class RedisTaskQueue:
async def enqueue(self, task: Task) -> str:
task_id = str(uuid4())
await self._redis.rpush("tasks", task.to_json())
return task_id
async def dequeue(self) -> Task | None:
data = await self._redis.lpop("tasks")
return Task.from_json(data) if data else None
enqueue
async def enqueue(task: Any) -> Result[str, TaskQueueError]

Add a task to the queue.

Parameters
ParameterTypeDescription
`task`AnyTask instance to enqueue.
Returns
TypeDescription
Result[str, TaskQueueError]Ok(task_id) on success; Err(TaskQueueError) if the queue reports a recoverable failure (e.g. full queue, invalid payload). Only infrastructure failures (lost connection) are raised as exceptions.
dequeue
async def dequeue() -> Any | None

Remove and return the next task.

Returns
TypeDescription
Any | NoneNext Task or None if queue is empty.
get_task_count
async def get_task_count() -> int

Get the number of tasks in the queue.

Returns
TypeDescription
intNumber of pending tasks.
clear
async def clear() -> None

Clear all tasks from the queue.

get_job
async def get_job(job_id: str) -> Any | None

Get a job by ID.

Parameters
ParameterTypeDescription
`job_id`strUnique job identifier.
Returns
TypeDescription
Any | NoneJob data or None if not found.
delete_job
async def delete_job(job_id: str) -> bool

Delete a job from the queue.

Parameters
ParameterTypeDescription
`job_id`strUnique job identifier.
Returns
TypeDescription
boolTrue if the job was deleted.
count
async def count(
    queue_name: str,
    status: Any | None = None
) -> int

Count queued jobs.

Parameters
ParameterTypeDescription
`queue_name`strQueue to inspect.
`status`Any | NoneOptional status filter.
Returns
TypeDescription
intNumber of matching jobs.
ack
async def ack(task_id: str) -> None

Acknowledge successful processing of a dequeued task.

Signals that the task has been processed successfully and can be permanently discarded from any in-flight tracking.

Parameters
ParameterTypeDescription
`task_id`strID of the task to acknowledge.
nack
async def nack(
    task_id: str,
    requeue: bool = True
) -> None

Negative-acknowledge a dequeued task.

Signals that the task could not be processed. The task is optionally requeued for another processing attempt.

Parameters
ParameterTypeDescription
`task_id`strID of the task to negative-acknowledge.
`requeue`boolIf True, return the task to the queue for retry. If False, discard the task permanently.
close
async def close() -> None

Close the queue connection and cleanup resources.


Tracks and shuts down background asyncio tasks.

Inject as a singleton from any DI provider. Call shutdown() during application stop to cancel all in-flight tasks.

Example

manager = BackgroundTaskManager()
task = manager.track(some_coroutine())
# … later …
await manager.shutdown(timeout=30.0)
manager = BackgroundTaskManager()
task = manager.track(some_coroutine())
# … later …
await manager.shutdown(timeout=30.0)
__init__
def __init__() -> None
track
def track(coro: Awaitable[T]) -> asyncio.Task[T]

Schedule coro as an asyncio task and track the handle.

The task is removed from the tracked set automatically via a done_callback so completed tasks don’t accumulate.

Parameters
ParameterTypeDescription
`coro`Awaitable[T]Any awaitable to wrap in a task.
Returns
TypeDescription
asyncio.Task[T]The created asyncio.Task.
track_named
def track_named(
    name: str,
    coro: Awaitable[T]
) -> asyncio.Task[T]

Like track but attach a human-readable name for logs.

Named tasks appear in shutdown() timeout warnings so operators can identify which work is still running at stop time.

Parameters
ParameterTypeDescription
`name`strLabel surfaced in observability/log output.
`coro`Awaitable[T]Any awaitable to wrap in a task.
Returns
TypeDescription
asyncio.Task[T]The created asyncio.Task.
pending_count
property pending_count() -> int

Number of tasks that have not yet completed.

shutdown
async def shutdown(timeout: float = 30.0) -> None

Cancel all pending tracked tasks and await them.

After cancellation each task is given up to timeout seconds collectively to finish (honour CancelledError and finalise any finally blocks). Tasks that do not stop within the timeout are logged by name; no exception is raised.

Parameters
ParameterTypeDescription
`timeout`floatSeconds to wait for all tasks to finish after cancellation. Default 30 seconds.

Branch based on a condition evaluated against the input.

Example

workflow = BranchStep( condition=lambda data: data[“amount”] > 1000, if_true=TaskStep(“manual_review”, manual_review), if_false=TaskStep(“auto_approve”, auto_approve), )

__init__
def __init__(
    condition: Callable[[Any], bool],
    if_true: TaskStep,
    if_false: TaskStep
) -> None
execute
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Evaluate the condition and execute either the true or false branch.


Distributed progress store backed by the platform ``CacheBackendProtocol``.

Stores serialised ProgressInfo objects in the configured cache backend so that progress is visible to all worker processes. TTL-based expiry is delegated to the backend.

Parameters
ParameterTypeDescription
`cache`Platform cache backend (Redis, Memcached, …).
`ttl`Seconds before a progress entry expires. Defaults to 3600 (1 hour).
__init__
def __init__(
    cache: CacheBackendProtocol,
    *,
    ttl: int = 3600
) -> None
save
async def save(info: ProgressInfo) -> None

Persist a ProgressInfo entry to the cache backend.

Parameters
ParameterTypeDescription
`info`ProgressInfoProgress state to save.
get
async def get(job_id: str) -> ProgressInfo | None

Return the ProgressInfo for the given job ID, or None if not found.

Parameters
ParameterTypeDescription
`job_id`strUnique identifier for the job.
delete
async def delete(job_id: str) -> None

Remove the progress entry for the given job ID.

Parameters
ParameterTypeDescription
`job_id`strUnique identifier for the job.
list_active
async def list_active() -> list[ProgressInfo]

Return all active (incomplete) progress entries.

Note This implementation scans the key prefix via get, which may be slow on large datasets. Consider using a Redis-specific implementation with SCAN for production at scale.

Returns
TypeDescription
list[ProgressInfo]List of incomplete ProgressInfo.

Distributed result store backed by the platform ``CacheBackendProtocol``.

Stores serialised JobResult objects in the configured cache backend so that results are visible to all worker processes. TTL-based expiry is delegated to the backend.

Parameters
ParameterTypeDescription
`cache`Platform cache backend (Redis, Memcached, …).
`ttl`Seconds before a result entry expires. Defaults to 3600 (1 hour).
`poll_interval`Seconds between polls in wait. Defaults to 0.5.
__init__
def __init__(
    cache: CacheBackendProtocol,
    *,
    ttl: int = 3600,
    poll_interval: float = 0.5
) -> None
store
async def store(
    job_id: str,
    result: JobResult
) -> None

Persist a job result in the cache backend with TTL.

get
async def get(job_id: str) -> JobResult | None

Return the stored result, or None if absent or expired.

delete
async def delete(job_id: str) -> bool

Remove the result from the cache and return True if it existed.

wait
async def wait(
    job_id: str,
    *,
    timeout: float = 30.0,
    poll_interval: float | None = None
) -> JobResult | None

Poll the cache until the result appears or the timeout elapses.

cleanup_expired
async def cleanup_expired() -> int

No-op: the cache backend handles TTL expiry automatically.


Wrapper for cron expression parsing.

Provides a unified interface for cron expression handling with optional croniter dependency.

Note This class requires the croniter optional dependency. See lexigram.tasks.scheduling.cron for installation instructions. Instantiation raises :exc:ImportError when croniter is absent.

Example

cron = CronExpression("0 9 * * 1-5") # Weekdays at 9am
next_run = cron.get_next()
__init__
def __init__(expression: str)

Initialize cron expression

Parameters
ParameterTypeDescription
`expression`strCron expression string
Raises
ExceptionDescription
ImportErrorIf croniter is not installed
ValueErrorIf expression is invalid
get_next
def get_next(base_time: float | None = None) -> float

Get next execution time

Parameters
ParameterTypeDescription
`base_time`float | NoneBase time (defaults to current time)
Returns
TypeDescription
floatUnix timestamp of next execution
get_prev
def get_prev(base_time: float | None = None) -> float

Get previous execution time

Parameters
ParameterTypeDescription
`base_time`float | NoneBase time (defaults to current time)
Returns
TypeDescription
floatUnix timestamp of previous execution

Manages failed jobs for inspection, retry, and cleanup.
__init__
def __init__(
    *,
    max_size: int = 10000,
    retention_hours: float = 168
) -> None
add
def add(
    job: JobProtocol,
    error: str,
    traceback: str | None = None,
    **metadata: Any
) -> None

Add a failed job to the DLQ.

Parameters
ParameterTypeDescription
`job`JobProtocolThe failed job.
`error`strError message.
`traceback`str | NoneFull traceback string. **metadata: Additional metadata.
get
def get(job_id: str) -> FailureRecord | None

Get a specific failure record.

list_failed
def list_failed(
    *,
    limit: int = 50,
    job_name: str | None = None
) -> list[FailureRecord]

List failed jobs, optionally filtered by job name.

Parameters
ParameterTypeDescription
`limit`intMaximum records to return.
`job_name`str | NoneFilter by job type name.
Returns
TypeDescription
list[FailureRecord]List of failure records (newest first).
retry
def retry(job_id: str) -> JobProtocol | None

Remove a job from DLQ and prepare it for retry.

Returns the job with status reset to PENDING, or None if not found.

retry_all
def retry_all() -> list[JobProtocol]

Retry all failed jobs. Returns list of jobs to re-enqueue.

remove
def remove(job_id: str) -> bool

Permanently remove a failure from the DLQ.

purge
def purge(
    *,
    older_than_hours: float | None = None
) -> int

Purge old failures.

Parameters
ParameterTypeDescription
`older_than_hours`float | NoneRemove records older than this. Defaults to retention_hours.
Returns
TypeDescription
intNumber of records purged.
clear
def clear() -> int

Clear all DLQ entries. Returns count cleared.

size
property size() -> int

Return the number of currently tracked failure records.

get_stats
def get_stats() -> dict[str, Any]

Get DLQ statistics.


In-memory lock for preventing duplicate task execution within a single process.

Note: This is NOT a distributed lock. For true distributed locking across multiple processes/servers, use a Redis-based or database-based lock implementation.

Example

lock_manager = LockManager()
async with lock_manager.acquire("user:123:sync", timeout=60):
await sync_user_data(user_id=123)
__init__
def __init__(
    key: str,
    timeout: float,
    locks_dict: dict[str, tuple[float, asyncio.Lock]],
    locks_lock: asyncio.Lock
)

Initialize in-memory lock.

Parameters
ParameterTypeDescription
`key`strUnique lock key
`timeout`floatLock timeout in seconds (auto-release)
`locks_dict`dict[str, tuple[float, asyncio.Lock]]Shared locks dictionary (from LockManager)
`locks_lock`asyncio.LockShared lock for accessing locks_dict (from LockManager)
acquire
async def acquire() -> bool

Acquire the lock.

Returns
TypeDescription
boolTrue if lock was acquired, False if already held
try_acquire
async def try_acquire() -> bool

Try to acquire lock without waiting.

Returns
TypeDescription
boolTrue if acquired, False if already held
release
async def release() -> None

Release the lock.


Record of a single task execution.
to_dict
def to_dict() -> dict[str, Any]

Serialize the execution record to a plain dictionary.


Record of a failed job in the DLQ.
to_dict
def to_dict() -> dict[str, Any]

Serialize the failure record to a plain dictionary.

from_dict
def from_dict(
    cls,
    data: dict[str, Any]
) -> FailureRecord

Deserialize a failure record from a plain dictionary.

Parameters
ParameterTypeDescription
`data`dict[str, Any]Dictionary as produced by to_dict.
Returns
TypeDescription
FailureRecordReconstructed FailureRecord.

System-wide throughput cap applied across all queues.

Delegates to a single TokenBucket for consistent rate enforcement regardless of which queue a task originates from.

Example

limiter = GlobalRateLimiter(rate=100, per=1.0) # 100 tasks / sec
await limiter.acquire()
limiter = GlobalRateLimiter(rate=100, per=1.0) # 100 tasks / sec
await limiter.acquire()
__init__
def __init__(
    rate: int,
    per: float = 1.0,
    burst: int | None = None
) -> None

Initialise the global rate limiter.

Parameters
ParameterTypeDescription
`rate`intMaximum allowed tasks per time period.
`per`floatLength of the time period in seconds.
`burst`int | NoneMaximum burst capacity (defaults to *rate*).
acquire
async def acquire() -> None

Block until a global token is available.

try_acquire
async def try_acquire() -> bool

Non-blocking global token acquisition.

Returns
TypeDescription
bool``True`` if a token was obtained; ``False`` if globally rate-limited.

Registry for task handlers that also satisfies the TaskExecutorProtocol contract.

Maps task names to their handler functions. Handlers can be registered and retrieved by name. The execute method dispatches a task to its registered handler, making this class usable wherever a TaskExecutorProtocol is expected.

Extends Registry for unified introspection and lifecycle hooks.

Example

handler = registry.get("send_email")
```python
__init__
def __init__() -> None

Initialize handler registry.

register
def register(
    name: str,
    handler: Callable[Ellipsis, Awaitable[Any]] | None = None
) -> Callable[Ellipsis, Any]

Register a task handler.

Can be used as a decorator or called directly.

Parameters
ParameterTypeDescription
`name`strTask name.
`handler`Callable[Ellipsis, Awaitable[Any]] | NoneHandler function (if not using as decorator).
Returns
TypeDescription
Callable[Ellipsis, Any]Handler function or decorator.

Example

# As decorator
@registry.register("task_name")
async def my_handler(...):
pass
# Direct call
registry.register("task_name", my_handler)
register_handler
def register_handler(
    task_name: str,
    handler: Any
) -> None

Register a handler for a task type (TaskExecutorProtocol method).

This method satisfies the register_handler requirement defined in lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a protocol-mandated entry point that delegates directly to register so that HandlerRegistry can be used wherever a TaskExecutorProtocol is expected without any additional adapter. Do not remove — the protocol contract requires this exact signature.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
`handler`AnyAsync callable to handle the task.
get_handler
def get_handler(task_name: str) -> Any | None

Get the handler for a task type (TaskExecutorProtocol method).

This method satisfies the get_handler requirement defined in lexigram.contracts.infra.tasks.protocols.TaskExecutorProtocol. It is a protocol-mandated entry point that delegates directly to get so that HandlerRegistry can be used wherever a TaskExecutorProtocol is expected without any additional adapter. Do not remove — the protocol contract requires this exact signature.

Parameters
ParameterTypeDescription
`task_name`strName of the task type.
Returns
TypeDescription
Any | NoneHandler if registered, ``None`` otherwise.
execute
async def execute(task: Any) -> Any

Execute a task using the registered handler (TaskExecutorProtocol protocol).

Resolves the handler for task.name, then invokes it with task.args and task.kwargs. Supports both sync and async handlers.

Parameters
ParameterTypeDescription
`task`AnyTask object with ``name``, ``args``, and ``kwargs`` attributes.
Returns
TypeDescription
AnyResult returned by the handler.
Raises
ExceptionDescription
TaskNotFoundErrorIf no handler is registered for the task name.
remove
def remove(name: str) -> bool

Remove handler from registry.

Parameters
ParameterTypeDescription
`name`strTask name.
Returns
TypeDescription
boolTrue if handler was removed.
list_handlers
def list_handlers() -> list[str]

List all registered handler names.

Returns
TypeDescription
list[str]List of task names.
to_dict
def to_dict() -> dict[str, Callable[Ellipsis, Awaitable[Any]]]

Get handlers as dictionary.

Returns
TypeDescription
dict[str, Callable[Ellipsis, Awaitable[Any]]]Dictionary of all handlers.

In-memory progress store for development/testing.
__init__
def __init__() -> None
save
async def save(info: ProgressInfo) -> None

Persist a ProgressInfo entry in the in-memory store.

get
async def get(job_id: str) -> ProgressInfo | None

Return the ProgressInfo for the given job ID, or None if not found.

delete
async def delete(job_id: str) -> None

Remove the progress entry for the given job ID if it exists.

list_active
async def list_active() -> list[ProgressInfo]

Return all progress entries whose current count has not yet reached total.


Thread-safe, in-memory ProgressTrackerProtocol implementation.

Maintains the latest ProgressSnapshot for each tracked task and fans out every state change to all active subscribers via asyncio.Queue.

Subscriptions are kept alive until the task reaches a terminal state (COMPLETE or FAILED), at which point the iterator closes automatically. Subscribers that call subscribe after the task has already finished receive the terminal snapshot once and then stop.

All public methods are safe to call concurrently from multiple coroutines.

__init__
def __init__() -> None
update
async def update(
    task_id: str,
    current: int,
    total: int,
    message: str = ''
) -> None

Record incremental progress and broadcast to all subscribers.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`current`intUnits completed so far.
`total`intTotal units to process (0 means unknown).
`message`strOptional human-readable status line.
complete
async def complete(
    task_id: str,
    result: str = ''
) -> None

Mark a task as successfully completed and close all subscriptions.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`result`strOptional human-readable completion message.
fail
async def fail(
    task_id: str,
    error: str
) -> None

Mark a task as failed and close all subscriptions.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
`error`strDescription of the failure.
get
async def get(task_id: str) -> ProgressSnapshot | None

Return the current progress state for a task.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task.
Returns
TypeDescription
ProgressSnapshot | NoneThe most recent ProgressSnapshot, or ``None`` if the task has not been seen by this tracker.
subscribe
def subscribe(task_id: str) -> AsyncGenerator[ProgressSnapshot, None]

Subscribe to live progress updates for a task.

Returns an async generator that yields one ProgressSnapshot per state change. The generator stops automatically when the task reaches a terminal state. If the task is already finished, the terminal snapshot is yielded once and the generator stops immediately.

Parameters
ParameterTypeDescription
`task_id`strUnique identifier for the task to observe.
Returns
TypeDescription
AsyncGenerator[ProgressSnapshot, None]An async generator of ProgressSnapshot objects.

In-memory result store with TTL-based expiry.

Suitable for development and single-process deployments. For production, implement a Redis-backed store.

__init__
def __init__(
    *,
    ttl: int = 3600,
    max_size: int = 10000
) -> None
store
async def store(
    job_id: str,
    result: JobResult
) -> None

Persist a job result, evicting the oldest entry when at capacity.

get
async def get(job_id: str) -> JobResult | None

Return the result for the given job ID, or None if absent or expired.

delete
async def delete(job_id: str) -> bool

Remove the result for the given job ID and return True if it existed.

wait
async def wait(
    job_id: str,
    *,
    timeout: float = 30.0,
    poll_interval: float = 0.5
) -> JobResult | None

Wait for a result to appear, with timeout.

cleanup_expired
async def cleanup_expired() -> int

Remove all entries whose TTL has elapsed and return the count removed.

get_stats
def get_stats() -> dict[str, Any]

Return store statistics including size, capacity, and waiter count.


Represents a background job with comprehensive status tracking

Jobs are executable work units that can be queued, scheduled, tracked, and retried. They support priority ordering, dependency management, and timeout control.

Attributes: id: Unique job identifier name: Human-readable job name/type args: Positional arguments for job handler kwargs: Keyword arguments for job handler priority: Execution priority (higher = more important) max_retries: Maximum retry attempts on failure timeout: Execution timeout in seconds depends_on: List of job IDs this job depends on status: Current job status created_at: Timestamp when job was created started_at: Timestamp when job started executing completed_at: Timestamp when job finished retry_count: Current number of retry attempts last_error: Most recent error message result: JobProtocol execution result scheduled_at: Timestamp for scheduled execution cron_expression: Cron expression for recurring jobs

is_pending
property is_pending() -> bool

Check if job is pending execution

is_running
property is_running() -> bool

Check if job is currently running

is_completed
property is_completed() -> bool

Check if job completed successfully

is_failed
property is_failed() -> bool

Check if job failed

can_retry
property can_retry() -> bool

Check if job can be retried (hasn’t exceeded max_retries)

duration_ms
property duration_ms() -> float | None

Get job execution duration in milliseconds

mark_running
def mark_running() -> None

Mark job as running and record start time

mark_completed
def mark_completed(result: JobResult) -> None

Mark job as completed with result

Parameters
ParameterTypeDescription
`result`JobResultJobProtocol execution result
mark_failed
def mark_failed(error: str) -> None

Mark job as failed with error message

Parameters
ParameterTypeDescription
`error`strError message describing the failure
mark_retrying
def mark_retrying() -> None

Mark job as retrying and increment retry count

mark_cancelled
def mark_cancelled() -> None

Mark job as cancelled

to_dict
def to_dict() -> dict[str, Any]

Convert job to dictionary for serialization

Returns
TypeDescription
dict[str, Any]Dictionary representation of job
from_dict
def from_dict(
    cls,
    data: dict[str, Any]
) -> JobProtocol

Create job from dictionary

Parameters
ParameterTypeDescription
`data`dict[str, Any]Dictionary representation of job
Returns
TypeDescription
JobProtocolJobProtocol instance

Result of a job execution

Captures the outcome of job execution including success status, return data, error information, and performance metrics.

ok
def ok(
    cls,
    data: Any = None,
    duration: float = 0.0
) -> JobResult

Create a successful result

Parameters
ParameterTypeDescription
`data`AnyReturn value from the job
`duration`floatExecution duration in seconds
Returns
TypeDescription
JobResultJobResult with success=True
fail
def fail(
    cls,
    error: str,
    retry_count: int = 0,
    duration: float = 0.0
) -> JobResult

Create a failure result

Parameters
ParameterTypeDescription
`error`strError message describing the failure
`retry_count`intNumber of retry attempts made
`duration`floatExecution duration in seconds
Returns
TypeDescription
JobResultJobResult with success=False
to_dict
def to_dict() -> dict[str, Any]

Convert result to dictionary for serialization

from_dict
def from_dict(
    cls,
    data: dict[str, Any]
) -> JobResult

Create result from dictionary


JobProtocol execution status.

Template for creating scheduled jobs

JobTemplateProtocol defines a reusable job configuration that can be instantiated multiple times with different IDs.

Attributes: name: JobProtocol name/type args: Positional arguments kwargs: Keyword arguments priority: Execution priority max_retries: Maximum retry attempts timeout: Execution timeout depends_on: JobProtocol dependencies template

create_job
def create_job(job_id: str = '') -> JobProtocol

Create a JobProtocol instance from this template

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID (generated if empty)
Returns
TypeDescription
JobProtocolNew JobProtocol instance
from_job
def from_job(
    cls,
    job: JobProtocol
) -> JobTemplateProtocol

Create template from existing job

Parameters
ParameterTypeDescription
`job`JobProtocolJobProtocol to convert to template
Returns
TypeDescription
JobTemplateProtocolJobTemplateProtocol instance

Logs task start, completion, and failures.
before_execute
async def before_execute(ctx: TaskExecutionContext) -> None

Log task start before execution.

after_execute
async def after_execute(ctx: TaskExecutionContext) -> None

Log task completion status and duration after execution.

on_error
async def on_error(
    ctx: TaskExecutionContext,
    error: Exception
) -> None

Log the error when a task raises an exception.


In-memory task queue implementation

Uses Python’s heapq for priority-based ordering. Tasks are stored in memory and will be lost on process restart. Thread-safe using asyncio.Lock for concurrent access.

Characteristics:

  • Storage: In-memory heap-based priority queue
  • Concurrency: asyncio.Lock for thread safety
  • Persistence: No persistence (volatile)
  • Use Case: Development, testing, single-process applications
  • Limitations: No persistence, single-process only

Example

queue = MemoryTaskQueue()
await queue.enqueue(task)
task = await queue.dequeue()
__init__
def __init__() -> None

Initialize memory task queue

set_hook_registry
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None

Attach an optional hook registry after provider boot wiring.

enqueue
async def enqueue(task: JobProtocol) -> Result[str, TaskError]

Add a task to the queue, returning Ok(task_id) or Err(TaskError) on failure.

Tasks are stored in a min-heap with negative priority to achieve max-heap behavior (highest priority first).

Parameters
ParameterTypeDescription
`task`JobProtocolTask to enqueue
Returns
TypeDescription
Result[str, TaskError]Ok containing the enqueued task id, or Err containing a TaskError.
dequeue
async def dequeue() -> JobProtocol | None

Remove and return the next task from the queue

Returns the highest priority task. If multiple tasks have the same priority, returns the oldest (FIFO). Respects task delay - tasks with future available_at are skipped.

Returns
TypeDescription
JobProtocol | NoneNext task or None if queue is empty or no tasks are available
ack
async def ack(task_id: str) -> None

Acknowledge successful processing of a task

Removes the task from the in-flight tracking set.

Parameters
ParameterTypeDescription
`task_id`strID of the task to acknowledge
nack
async def nack(
    task_id: str,
    requeue: bool = True
) -> None

Negative-acknowledge a task, optionally requeuing it

Removes the task from in-flight tracking. If requeue is True the task is pushed back onto the priority heap for retry.

Parameters
ParameterTypeDescription
`task_id`strID of the task to negative-acknowledge
`requeue`boolIf True, return the task to the queue for retry
get_task_count
async def get_task_count() -> int

Get the number of tasks in the queue

Returns
TypeDescription
intNumber of pending tasks
clear
async def clear() -> None

Clear all tasks from the queue

close
async def close() -> None

Close the queue connection

For memory queue, there are no external resources to clean up.


Collects per-task-type execution metrics.
__init__
def __init__() -> None
after_execute
async def after_execute(ctx: TaskExecutionContext) -> None

Record execution metrics for the completed task.

on_error
async def on_error(
    ctx: TaskExecutionContext,
    error: Exception
) -> None

Increment failure counter for the task type that raised an error.

get_stats
def get_stats() -> dict[str, Any]

Return aggregated execution statistics keyed by task name.


Configuration for a single named task queue backend.

Used in TaskConfig.backends to declare multiple task queues that the framework registers as Named DI bindings.

Parameters
ParameterTypeDescription
`name`Unique backend identifier. Used as the Named() DI key.
`primary`Whether this is the primary backend. Primary backends also receive the unnamed TaskQueueProtocol binding.
`type`Backend type ('memory', 'redis', 'rabbitmq', 'postgres').
`redis_url`Redis connection URL (when type='redis').
`amqp_url`AMQP connection URL (when type='rabbitmq').
`postgres_dsn`Postgres DSN (when type='postgres').
`queue_name`Queue name for this backend instance.

Example

backends:
- name: primary
primary: true
type: redis
redis_url: "${REDIS_URL}"
- name: notifications
type: rabbitmq
amqp_url: "${AMQP_URL}"
backends:
- name: primary
primary: true
type: redis
redis_url: "${REDIS_URL}"
- name: notifications
type: rabbitmq
amqp_url: "${AMQP_URL}"

Inject by name

queue: Annotated[TaskQueueProtocol, Named("notifications")]
queue: Annotated[TaskQueueProtocol, Named("notifications")]

Controls what happens when ScheduledWorker.run_cycle raises.

Attributes: LOG_AND_CONTINUE: Log the error, then resume the normal schedule. BACKOFF: Log the error and double the sleep before the next cycle (up to 10 * interval_seconds). STOP: Log the error and stop the worker permanently.


Task priority levels for queue ordering (higher number = higher priority).

The scale is 020 and is separate from the kernel’s ProviderPriority (0100). Workers dequeue higher-priority jobs before lower-priority ones.

Values: LOW (0): Background tasks that can be deferred indefinitely. NORMAL (5): Default priority for routine task processing. HIGH (10): Time-sensitive tasks processed before NORMAL queue. CRITICAL (20): Highest priority; reserved for urgent system tasks.


Current progress state of a task.
is_complete
property is_complete() -> bool

Return True when the current count has reached the total.

elapsed_seconds
property elapsed_seconds() -> float

Return seconds elapsed since the task started, or 0 if not started.


Immutable point-in-time view of a task's progress.
Parameters
ParameterTypeDescription
`task_id`Unique identifier for the tracked task.
`current`Number of units completed so far.
`total`Total number of units to complete (0 means unknown).
`status`Current lifecycle state.
`message`Human-readable status message.
`error`Error description when ``status`` is ``FAILED``; empty otherwise.
percent
property percent() -> float

Completion percentage in the range [0.0, 100.0].

Returns 0.0 when total is unknown (zero).


Lifecycle states for a tracked task.

Values are lowercase strings so they round-trip cleanly through JSON.


Abstract storage for progress state.
save
async def save(info: ProgressInfo) -> None

Save progress state.

get
async def get(job_id: str) -> ProgressInfo | None

Get progress for a job.

delete
async def delete(job_id: str) -> None

Remove progress entry.

list_active
async def list_active() -> list[ProgressInfo]

List all active (incomplete) progress entries.


Track and report task progress.

Injected into task handlers to enable progress reporting.

__init__
def __init__(
    job_id: str,
    store: ProgressStore
) -> None
update
async def update(
    current: int,
    total: int,
    message: str = '',
    **metadata: Any
) -> None

Update progress.

Parameters
ParameterTypeDescription
`current`intCurrent progress count.
`total`intTotal items to process.
`message`strHuman-readable status message. **metadata: Additional metadata.
complete
async def complete(message: str = 'Complete') -> None

Mark progress as complete.

fail
async def fail(message: str) -> None

Mark progress as failed.

info
property info() -> ProgressInfo

Return the current ProgressInfo snapshot.


Per-queue rate limiter.

Manages independent rate limits for named task queues, delegating each limit to a TokenBucket.

Example

limiter = QueueRateLimiter()
limiter.add_limit("emails", rate=5, per=1.0) # 5 / sec
limiter.add_limit("reports", rate=1, per=60.0) # 1 / min
await limiter.acquire("emails")
limiter = QueueRateLimiter()
limiter.add_limit("emails", rate=5, per=1.0) # 5 / sec
limiter.add_limit("reports", rate=1, per=60.0) # 1 / min
await limiter.acquire("emails")
add_limit
def add_limit(
    queue: str,
    rate: int,
    per: float = 1.0,
    burst: int | None = None
) -> None

Register a rate limit for a named queue.

Parameters
ParameterTypeDescription
`queue`strQueue name to apply the limit to.
`rate`intAllowed requests per time period.
`per`floatLength of the time period in seconds.
`burst`int | NoneMaximum burst size (defaults to *rate*).
acquire
async def acquire(queue: str) -> None

Block until a token is available for queue.

Parameters
ParameterTypeDescription
`queue`strQueue name; a no-op for unlisted queues.
try_acquire
async def try_acquire(queue: str) -> bool

Attempt a non-blocking token acquisition for queue.

Parameters
ParameterTypeDescription
`queue`strQueue name.
Returns
TypeDescription
bool``True`` if a token was obtained or the queue has no limit; ``False`` if the queue is currently rate-limited.

Abstract result store backend.
store
async def store(
    job_id: str,
    result: JobResult
) -> None

Store a job result.

get
async def get(job_id: str) -> JobResult | None

Get a result by job ID. Returns None if not found or expired.

delete
async def delete(job_id: str) -> bool

Delete a result. Returns True if deleted.

wait
async def wait(
    job_id: str,
    *,
    timeout: float = 30.0,
    poll_interval: float = 0.5
) -> JobResult | None

Wait for a result with timeout.

cleanup_expired
async def cleanup_expired() -> int

Remove expired results. Returns count removed.


Represents a scheduled job with cron expression

Attributes: job_template: Template for creating job instances cron_expression: Cron expression string next_run: Unix timestamp of next execution enabled: Whether job is enabled for execution

calculate_next_run
def calculate_next_run() -> float

Calculate the next run time based on cron expression

Returns
TypeDescription
floatUnix timestamp of next execution

Periodic worker base class.

Subclass and implement run_cycle. The base class handles the scheduling loop, error policy, graceful shutdown, and task tracking via BackgroundTaskManager.

Class-level defaults (override per subclass or via constructor)

class MyWorker(ScheduledWorker):
interval_seconds = 60.0
initial_delay_seconds = 0.0
max_jitter_seconds = 0.0
on_error_policy = OnErrorPolicy.LOG_AND_CONTINUE
class MyWorker(ScheduledWorker):
interval_seconds = 60.0
initial_delay_seconds = 0.0
max_jitter_seconds = 0.0
on_error_policy = OnErrorPolicy.LOG_AND_CONTINUE

Constructor injection is the preferred way to customise at runtime

worker = MyWorker(
task_manager=task_manager,
interval_seconds=30.0,
on_error_policy=OnErrorPolicy.BACKOFF,
)
worker = MyWorker(
task_manager=task_manager,
interval_seconds=30.0,
on_error_policy=OnErrorPolicy.BACKOFF,
)
__init__
def __init__(
    task_manager: BackgroundTaskManager,
    *,
    interval_seconds: float | None = None,
    initial_delay_seconds: float | None = None,
    max_jitter_seconds: float | None = None,
    on_error_policy: OnErrorPolicy | None = None
) -> None

Initialise the worker.

Parameters
ParameterTypeDescription
`task_manager`BackgroundTaskManagerShared BackgroundTaskManager that owns the task handle and participates in framework shutdown.
`interval_seconds`float | NoneOverride interval_seconds.
`initial_delay_seconds`float | NoneOverride initial_delay_seconds.
`max_jitter_seconds`float | NoneOverride max_jitter_seconds.
`on_error_policy`OnErrorPolicy | NoneOverride on_error_policy.
start
async def start() -> None

Start the worker loop as a tracked background task.

Safe to call only once. Calling start() while already running is a no-op (logs a warning).

stop
async def stop(grace_seconds: float = 2.0) -> None

Signal the worker to stop and wait for it to finish.

Graceful: the stop event is set so the sleeping phase wakes up early, and the current cycle (if any) is allowed to finish. If the task has not completed within grace_seconds, it is forcibly cancelled.

Parameters
ParameterTypeDescription
`grace_seconds`floatSeconds to wait for a clean stop before cancelling. Default is 2 seconds.
run_cycle
async def run_cycle() -> None

Override to implement one unit of periodic work.

Called once per interval. Must not swallow asyncio.CancelledError.


Result from a single workflow step.

Configuration for task queue backends.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Sequential task execution — output of each step feeds the next.

Example

chain = TaskChain([ TaskStep(“parse”, parse_data), TaskStep(“validate”, validate_data), TaskStep(“save”, save_data), ]) result = await chain.execute(raw_input)

__init__
def __init__(
    steps: list[TaskStep],
    *,
    stop_on_error: bool = True,
    state_store: WorkflowStateStore | None = None,
    workflow_id: str | None = None
) -> None
execute
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Execute all steps in sequence, threading output of each step into the next.

If a workflow_id and state_store were provided at construction, this method will attempt to resume from the last saved checkpoint — any step whose name appears in the persisted state’s completed steps will be skipped, and the last completed step’s output will seed the next step.


Fan-out then collect — parallel steps + callback with all results.

Example

chord = TaskChord( steps=[TaskStep(“a”, fetch_a), TaskStep(“b”, fetch_b)], callback=TaskStep(“merge”, merge_results), ) result = await chord.execute()

__init__
def __init__(
    steps: list[TaskStep],
    callback: TaskStep
) -> None
execute
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Fan out to all group steps then call the callback with their collected results.


Emitted when a task finishes successfully.

Attributes: task_id: Unique identifier of the completed task. worker_id: Identifier of the worker that executed the task.


Payload fired when a task finishes successfully.

Attributes: task_name: Name or type label of the completed task. task_id: Unique identifier of the task instance.


Hierarchical root configuration for Lexigram Tasks.

Attributes: name: Configuration name (default: “tasks”) enabled: Whether tasks module is enabled backend: Task queue backend configuration worker: Task worker settings scheduler: Task scheduler settings retry: Task retry configuration rate_limit: Task rate limiting timeout: Task timeout settings extra: Extra configuration

is_production
property is_production() -> bool

Check if running in production environment.

is_development
property is_development() -> bool

Check if running in development environment.

is_test
property is_test() -> bool

Check if running in test environment.

validate_for_environment
def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Validate task configuration for the given environment.

Parameters
ParameterTypeDescription
`env`Environment | NoneTarget environment; resolved from ``LEX_ENV`` when ``None``.
Returns
TypeDescription
list[ConfigIssue]List of ConfigIssue instances.
from_named
def from_named(
    cls,
    entry: NamedTaskConfig
) -> TaskConfig

Build a single-backend TaskConfig from a NamedTaskConfig entry.

Used internally by TasksProvider to create per-backend configs.

Parameters
ParameterTypeDescription
`entry`NamedTaskConfigThe named backend entry to materialise.
Returns
TypeDescription
TaskConfigA TaskConfig configured for the single named backend.

Aggregated task observability dashboard.
__init__
def __init__(
    *,
    history_size: int = 1000,
    window_seconds: float = 300
) -> None
record_execution
def record_execution(
    task_name: str,
    *,
    job_id: str = '',
    duration_ms: float = 0.0,
    success: bool = True,
    error: str | None = None,
    worker_id: str | None = None
) -> None

Record a task execution.

get_summary
def get_summary() -> dict[str, Any]

Get dashboard summary with throughput and error rates.

get_per_task_stats
def get_per_task_stats() -> dict[str, dict[str, Any]]

Get per-task-type statistics with latency percentiles.

get_recent_executions
def get_recent_executions(
    *,
    limit: int = 50,
    task_name: str | None = None
) -> list[dict[str, Any]]

Get recent execution history.

get_error_breakdown
def get_error_breakdown() -> dict[str, list[dict[str, Any]]]

Get recent errors grouped by task type.

reset
def reset() -> None

Reset all counters and history.


Payload fired when a task is placed on the queue.

Attributes: task_name: Name or type label of the enqueued task. queue_name: Name of the queue the task was added to.


Context passed through the middleware pipeline.

Emitted when a task fails after all retry attempts are exhausted.

Attributes: task_id: Unique identifier of the failed task. error: Human-readable description of the failure reason.


Payload fired when a task raises an unhandled exception.

Attributes: task_name: Name or type label of the failed task. task_id: Unique identifier of the task instance. reason: Short description or exception message.


Parallel task execution — all steps run concurrently.

Example

group = TaskGroup([ TaskStep(“email”, send_email), TaskStep(“sms”, send_sms), ]) result = await group.execute(user_data)

__init__
def __init__(
    steps: list[TaskStep],
    *,
    max_concurrency: int | None = None,
    state_store: WorkflowStateStore | None = None,
    workflow_id: str | None = None
) -> None
execute
async def execute(input_data: Any = None) -> Result[WorkflowResult, WorkflowError]

Run all steps concurrently, respecting max_concurrency if set.


Health status for task processing system

Aggregates health information from queue, workers, and scheduler.

is_healthy
property is_healthy() -> bool

Check if system is healthy

success_rate
property success_rate() -> float

Calculate job success rate

to_dict
def to_dict() -> dict[str, Any]

Convert to dictionary for serialization


Abstract base for task middleware.
before_execute
async def before_execute(ctx: TaskExecutionContext) -> None

Called before task execution. Override to modify or log.

after_execute
async def after_execute(ctx: TaskExecutionContext) -> None

Called after task execution. Override to observe or log.

on_error
async def on_error(
    ctx: TaskExecutionContext,
    error: Exception
) -> None

Called when task execution fails.


Chains middleware around task execution.
__init__
def __init__() -> None
add
def add(middleware: TaskMiddleware) -> None

Append a middleware instance to the pipeline.

execute
async def execute(
    job: JobProtocol,
    handler: Any
) -> JobResult

Execute a job through the middleware pipeline.

Parameters
ParameterTypeDescription
`job`JobProtocolThe job to execute.
`handler`AnyThe async handler function.
Returns
TypeDescription
JobResultJobResult from execution.

Task processing provider for Lexigram Framework

TaskProvider integrates task processing with the Lexigram Framework, providing dependency injection, lifecycle management, and health monitoring.

Example

from lexigram.app import Application
from lexigram.tasks import TaskProvider, MemoryTaskQueue
app = Application()
queue = MemoryTaskQueue()
provider = TaskProvider(queue, worker_count=4)
app.use(provider)
await app.start()
__init__
def __init__(
    queue: TaskQueueProtocol,
    worker_count: int = 1,
    enable_scheduler: bool = True
)

Initialize task provider

Parameters
ParameterTypeDescription
`queue`TaskQueueProtocolTaskQueueProtocol implementation to use
`worker_count`intNumber of workers to create
`enable_scheduler`boolWhether to enable job scheduling
from_config
def from_config(
    cls,
    config: TaskConfig,
    **context: Any
) -> TaskProvider

Create a TaskProvider from a TaskConfig.

Context kwargs may include a pre-built ‘queue’. If not provided, a MemoryTaskQueue is created as default.

register
async def register(container: ContainerRegistrarProtocol) -> None

Register task services with the DI container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolDI registrar to bind task services into.
boot
async def boot(container: ContainerResolverProtocol) -> None

Start the task provider.

Called by the framework on application startup after all registrations.

shutdown
async def shutdown() -> None

Shutdown the task provider gracefully

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check task provider health.

In multi-backend mode the overall status is the worst individual status across all named queues. TaskQueueProtocol does not expose a health_check() method, so liveness is probed via get_task_count().

Returns
TypeDescription
HealthCheckResultHealthCheckResult with current status and metrics.
register_handler
def register_handler(
    task_name: str,
    handler: Callable[Ellipsis, Any]
) -> None

Register a task handler

Parameters
ParameterTypeDescription
`task_name`strName of the task type
`handler`Callable[Ellipsis, Any]Async handler function
register_scheduled_task
def register_scheduled_task(task_func: Any) -> None

Register a decorated task function for scheduling.

Validates that the task has required metadata (_task_name and _cron). Registration is atomic: if scheduling fails, the handler is rolled back.

Parameters
ParameterTypeDescription
`task_func`AnyTask function decorated with @scheduled
Raises
ExceptionDescription
TaskRegistrationErrorIf _task_name or _cron is missing
enqueue_job
async def enqueue_job(job: JobProtocol) -> str

Enqueue a job for processing

schedule_job_sync
def schedule_job_sync(
    job_template: JobProtocol | JobTemplateProtocol,
    cron_expression: str,
    job_id: str | None = None
) -> str | None

Schedule a job with cron expression

unschedule_job_sync
def unschedule_job_sync(job_id: str) -> bool

Remove a scheduled job

get_worker_stats
def get_worker_stats() -> dict[str, Any] | None

Get worker pool statistics

get_scheduled_jobs
def get_scheduled_jobs() -> dict[str, Any] | None

Get scheduled jobs information

refresh_worker_handlers
def refresh_worker_handlers() -> None

Refresh handler mappings in all workers.

Call this after registering new handlers to ensure workers can execute jobs registered after pool creation.


Emitted when a task is accepted into a processing queue.

Attributes: task_id: Unique identifier of the queued task. queue_name: Name of the queue that accepted the task.


Configuration for rate limiting.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Task scheduler for cron-like job scheduling

TaskScheduler manages periodic job execution using cron expressions. Jobs are checked at regular intervals and enqueued when due.

Example

scheduler = TaskScheduler()
# Schedule job
template = JobTemplateProtocol(name="cleanup", args=())
scheduler.schedule_job(template, "0 0 * * *") # Daily at midnight
# Start scheduler
await scheduler.start_scheduler(queue.enqueue)
__init__
def __init__(
    check_interval: float = 60.0,
    store: SchedulerStore | None = None
)

Initialize task scheduler.

Parameters
ParameterTypeDescription
`check_interval`floatHow often to check for due jobs (seconds).
`store`SchedulerStore | NoneOptional SchedulerStore for durable persistence of job definitions and next-run times. When provided, all schedule/unschedule operations are persisted and the scheduler can be restored after restart via restore_from_store.
schedule_job_sync
def schedule_job_sync(
    job_template: JobTemplateProtocol | JobProtocol,
    cron_expression: str,
    job_id: str | None = None
) -> str

Schedule a job with cron expression (sync, no persistence).

For the primary async version with store persistence, use schedule_job.

Parameters
ParameterTypeDescription
`job_template`JobTemplateProtocol | JobProtocolJobProtocol template or JobProtocol to schedule
`cron_expression`strCron expression (e.g., "0 9 * * 1-5")
`job_id`str | NoneOptional job ID, generated if not provided
Returns
TypeDescription
strJobProtocol ID for the scheduled job
schedule_job
async def schedule_job(
    job_template: JobTemplateProtocol | JobProtocol,
    cron_expression: str,
    job_id: str | None = None
) -> str

Schedule a job and persist to store if one is configured.

This is the primary async method. For a synchronous variant without persistence, use schedule_job_sync.

Parameters
ParameterTypeDescription
`job_template`JobTemplateProtocol | JobProtocolJobProtocol template or JobProtocol to schedule.
`cron_expression`strCron expression (e.g., ``"0 9 * * 1-5"``).
`job_id`str | NoneOptional job ID, generated if not provided.
Returns
TypeDescription
strJobProtocol ID for the scheduled job.
restore_from_store
async def restore_from_store() -> int

Restore all persisted job definitions from the configured store.

Loads every job persisted by previous scheduler instances and re-registers them in scheduled_jobs. Call this once during startup before invoking start_scheduler to resume the full scheduling state.

Returns
TypeDescription
intNumber of jobs restored.
Raises
ExceptionDescription
RuntimeErrorIf no SchedulerStore was provided at construction.
unschedule_job_sync
def unschedule_job_sync(job_id: str) -> bool

Remove a scheduled job (sync, no store deletion).

For the primary async version with store deletion, use unschedule_job.

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to remove
Returns
TypeDescription
boolTrue if job was removed, False if not found
unschedule_job
async def unschedule_job(job_id: str) -> bool

Remove a scheduled job and delete it from the store.

This is the primary async method. For a synchronous variant without store deletion, use unschedule_job_sync.

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID to remove.
Returns
TypeDescription
boolTrue if job was removed, False if not found.
get_scheduled_jobs
def get_scheduled_jobs() -> list[ScheduledJob]

Get all scheduled jobs

Returns
TypeDescription
list[ScheduledJob]List of all scheduled jobs
get_scheduled_job
def get_scheduled_job(job_id: str) -> ScheduledJob | None

Get a specific scheduled job

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID
Returns
TypeDescription
ScheduledJob | NoneScheduledJob or None if not found
start_scheduler
async def start_scheduler(enqueue_callback: Callable[[JobProtocol], Any]) -> None

Start the scheduler loop

Parameters
ParameterTypeDescription
`enqueue_callback`Callable[[JobProtocol], Any]Async function to enqueue jobs when they are due
stop_scheduler
async def stop_scheduler() -> None

Stop the scheduler gracefully

enable_job
def enable_job(job_id: str) -> bool

Enable a scheduled job

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID
Returns
TypeDescription
boolTrue if job was enabled
disable_job
def disable_job(job_id: str) -> bool

Disable a scheduled job

Parameters
ParameterTypeDescription
`job_id`strJobProtocol ID
Returns
TypeDescription
boolTrue if job was disabled
get_next_run_times
def get_next_run_times() -> dict[str, float]

Get next run times for all enabled scheduled jobs

Returns
TypeDescription
dict[str, float]Dictionary mapping job ID to next run timestamp

Configuration for task scheduler.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Payload fired when a worker picks up and begins executing a task.

Attributes: task_name: Name or type label of the task being executed. task_id: Unique identifier of the task instance.


A single step in a workflow.
Parameters
ParameterTypeDescription
`name`Human-readable step name.
`handler`Async callable that takes input and returns output.
`on_error`Optional error handler.
`timeout`Optional timeout in seconds.

Configuration for task timeout behavior.

Individual task worker for job execution

TaskWorker processes jobs from a queue using registered handlers. Each worker runs independently and tracks its own statistics.

Example

worker = TaskWorker("worker-1", queue, handlers)
await worker.start()
# ... worker processes jobs ...
await worker.stop()
__init__
def __init__(
    worker_id: str,
    queue: TaskQueueProtocol,
    handler_registry: dict[str, Callable[Ellipsis, Any]],
    config: TaskWorkerConfig | None = None,
    services: TaskWorkerServices | None = None,
    *,
    middleware_pipeline: TaskMiddlewarePipeline | None = None
)

Initialize task worker.

Parameters
ParameterTypeDescription
`worker_id`strUnique worker identifier.
`queue`TaskQueueProtocolTaskQueueProtocol to pull jobs from.
`handler_registry`dict[str, Callable[Ellipsis, Any]]Dict mapping job names to handler functions.
`config`TaskWorkerConfig | NoneOptional worker configuration controlling timeouts and execution behaviour. Defaults to TaskWorkerConfig() when not provided.
`services`TaskWorkerServices | NoneOptional bundle of infrastructure services (DLQ, result store, progress store, dashboard, retry policy, task manager, idempotency manager, logger). Use TaskWorkerServices to compose only the services you need; absent services degrade gracefully. Pass a ``logger`` via ``services.logger`` to inject a custom bound logger; otherwise a module logger is created and bound to ``worker_id`` automatically.
`middleware_pipeline`TaskMiddlewarePipeline | NoneOptional middleware pipeline for cross-cutting concerns.
get_stats
def get_stats() -> WorkerJobStats

Get worker statistics

Returns
TypeDescription
WorkerJobStatsCurrent worker statistics
is_busy
def is_busy() -> bool

Check if worker is currently processing a job

Returns
TypeDescription
boolTrue if worker has a job in progress
get_progress_tracker
def get_progress_tracker(job_id: str) -> ProgressTracker | None

Get a ProgressTracker for a job (opt-in for handlers).

Handlers can use this to report progress. The worker must have a progress_store configured.

Parameters
ParameterTypeDescription
`job_id`strThe job ID to track progress for
Returns
TypeDescription
ProgressTracker | NoneProgressTracker instance if progress_store is configured, None otherwise

Configuration for task workers.

model_config: ClassVar[ConfigDict] = ConfigDict(extra=“ignore”)


Optional infrastructure services injected into a TaskWorker.

Groups all optional infrastructure dependencies so that the TaskWorker constructor stays focused on the mandatory parameters. All fields default to None — omitted services degrade gracefully.

Attributes: container: DI container for resolving task dependencies. dead_letter_queue: DLQ for routing permanently-failed jobs. result_store: Persistent store for job execution results. progress_store: Store for in-progress task progress updates. dashboard: Observability dashboard for worker and job metrics. retry_policy: Retry policy implementation (from lexigram-resilience). task_manager: Kernel task manager for critical shutdown registration. idempotency_manager: Manager for at-most-once job execution. logger: Optional logger instance.


Background task workers, scheduling, and async job queues.

Call configure to configure the task-processing subsystem.

Usage (in-memory queue for development)

from lexigram.tasks.backends.memory import MemoryTaskQueue
@module(
imports=[TasksModule.configure(queue=MemoryTaskQueue())]
)
class AppModule(Module):
pass
from lexigram.tasks.backends.memory import MemoryTaskQueue
@module(
imports=[TasksModule.configure(queue=MemoryTaskQueue())]
)
class AppModule(Module):
pass

Usage (Redis queue for production)

from lexigram.tasks.backends.redis import RedisTaskQueue
@module(
imports=[
TasksModule.configure(
queue=RedisTaskQueue(url="redis://localhost"),
worker_count=4,
)
]
)
class AppModule(Module):
pass
from lexigram.tasks.backends.redis import RedisTaskQueue
@module(
imports=[
TasksModule.configure(
queue=RedisTaskQueue(url="redis://localhost"),
worker_count=4,
)
]
)
class AppModule(Module):
pass
configure
def configure(
    cls,
    queue: Any | None = None,
    worker_count: int = 1,
    enable_scheduler: bool = True
) -> DynamicModule

Create a TasksModule with explicit configuration.

Parameters
ParameterTypeDescription
`queue`Any | NoneTaskQueueProtocol implementation. Defaults to MemoryTaskQueue.
`worker_count`intNumber of concurrent task workers.
`enable_scheduler`boolWhether to enable the job scheduler.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
scope
def scope(
    cls,
    *handlers: type,
    **kwargs: Any
) -> DynamicModule

Scope task handler classes into a feature module.

Registers the given task handler classes as providers so they are discovered and wired by the task executor. The parent module graph must already include TasksModule.configure — this does not create a new queue or worker pool.

Uses the anonymous token pattern so both configure() and scope() can coexist in the same compiled graph without a ModuleDuplicateError.

Example

@module(
imports=[
TasksModule.configure(queue=RedisTaskQueue(url=...)),
TasksModule.scope(SendEmailTask, GenerateReportTask),
]
)
class NotificationFeatureModule(Module):
pass
@module(
imports=[
TasksModule.configure(queue=RedisTaskQueue(url=...)),
TasksModule.scope(SendEmailTask, GenerateReportTask),
]
)
class NotificationFeatureModule(Module):
pass
Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule scoped to this feature.
stub
def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return an in-memory TasksModule for unit testing.

Uses a MemoryTaskQueue with a single worker and no job scheduler.

Returns
TypeDescription
DynamicModuleA DynamicModule backed by an in-memory task queue.

Enforces execution timeout on tasks.

Uses asyncio.wait_for to enforce timeout on task execution.

__init__
def __init__(
    default_timeout: float = 300.0,
    max_timeout: float | None = None
) -> None
before_execute
async def before_execute(ctx: TaskExecutionContext) -> None

Compute and store the effective timeout in the execution context metadata.

execute_with_timeout
async def execute_with_timeout(
    handler: Any,
    job: JobProtocol,
    ctx: TaskExecutionContext,
    resolved_args: list[Any] | None = None,
    resolved_kwargs: dict[str, Any] | None = None
) -> JobResult

Execute handler with timeout enforcement.

Parameters
ParameterTypeDescription
`handler`AnyThe handler to execute
`job`JobProtocolThe job being executed
`ctx`TaskExecutionContextExecution context
`resolved_args`list[Any] | NonePre-resolved positional arguments (from DI)
`resolved_kwargs`dict[str, Any] | NonePre-resolved keyword arguments (from DI)
Returns
TypeDescription
JobResultJobResult from execution

Decorator for unique task execution.

Prevents duplicate execution of the same task using in-memory locks.

Example

lock_manager = LockManager()
@unique_task(
lock_manager=lock_manager,
key_func=lambda user_id: f"sync_user:{user_id}",
timeout=3600
)
async def sync_user(user_id: str):
await sync_user_data(user_id)
__init__
def __init__(
    lock_manager: LockManager,
    key_func: Callable[[Any], str],
    timeout: float = 60.0,
    skip_if_locked: bool = True
)

Initialize unique task decorator.

Parameters
ParameterTypeDescription
`lock_manager`LockManagerLockManager instance for creating locks
`key_func`Callable[[Any], str]Function to generate lock key from task args
`timeout`floatLock timeout in seconds
`skip_if_locked`boolIf True, skip execution if lock held; if False, wait

Worker statistics for monitoring performance.

Tracks execution metrics for an individual worker including job counts, timing, and activity tracking.

uptime
property uptime() -> float

Get worker uptime in seconds.

record_job
def record_job(result: JobResult) -> None

Record job execution statistics.

Parameters
ParameterTypeDescription
`result`JobResultJobProtocol execution result.

Pool of task workers with monitoring and scaling

WorkerPool manages multiple TaskWorker instances for concurrent job processing. Supports dynamic scaling and aggregate statistics.

Example

pool = WorkerPool(queue, handlers, size=4)
await pool.start()
stats = pool.get_pool_stats()
await pool.scale_to(8) # Scale up
await pool.stop()
__init__
def __init__(
    queue: TaskQueueProtocol,
    handler_registry: dict[str, Callable[Ellipsis, Any]] | HandlerRegistry,
    size: int = 1,
    logger: Logger | None = None,
    task_manager: TaskManagerProtocol | None = None,
    dead_letter_queue: DeadLetterQueue | None = None,
    hooks: HookRegistryProtocol | None = None,
    container: Any = None
)

Initialize worker pool

Parameters
ParameterTypeDescription
`queue`TaskQueueProtocolTaskQueueProtocol for workers to pull from
`handler_registry`dict[str, Callable[Ellipsis, Any]] | HandlerRegistryDict mapping job names to handlers, or HandlerRegistry for dynamic lookups (allows handlers registered after pool creation).
`size`intNumber of workers to create
`task_manager`TaskManagerProtocol | NoneOptional kernel task manager for critical task registration during worker lifecycle.
`dead_letter_queue`DeadLetterQueue | NoneOptional shared DLQ for permanently failed jobs.
`container`AnyOptional DI container for resolving task dependencies at runtime.
refresh_handlers
def refresh_handlers() -> dict[str, Callable[Ellipsis, Any]]

Refresh handlers from registry if using dynamic registry.

start
async def start() -> None

Start the worker pool

Creates and starts all workers in the pool.

stop
async def stop(
    drain: bool = False,
    timeout: float | None = 30.0
) -> None

Stop the worker pool gracefully.

Parameters
ParameterTypeDescription
`drain`boolIf ``True``, wait for each worker to finish its current job before stopping. Defaults to ``False`` for a fast cancel-and-stop.
`timeout`float | NonePer-worker seconds to wait when *drain* is ``True``. ``None`` means wait indefinitely. Defaults to 30 seconds.
get_worker_stats
def get_worker_stats() -> list[WorkerJobStats]

Get statistics for all workers

Returns
TypeDescription
list[WorkerJobStats]List of WorkerJobStats for each worker
get_pool_stats
def get_pool_stats() -> dict[str, Any]

Get aggregate pool statistics

Returns
TypeDescription
dict[str, Any]Dictionary with pool-wide statistics
scale_to
async def scale_to(new_size: int) -> None

Scale the worker pool to a new size

Dynamically add or remove workers.

Parameters
ParameterTypeDescription
`new_size`intTarget number of workers

Result from a complete workflow execution.

Enumeration of possible workflow execution states.

chain
def chain(
    *steps: TaskStep,
    stop_on_error: bool = True
) -> TaskChain
Create a sequential TaskChain from the given steps.

Each step receives the output of the previous step as its input.

Parameters
ParameterTypeDescription
`stop_on_error`boolWhether to abort the pipeline on the first failure. Defaults to ``True``.
Returns
TypeDescription
TaskChainA configured TaskChain ready to ``await .execute()``.

Example

result = await chain(
TaskStep("validate", validate_input),
TaskStep("process", process_data),
TaskStep("persist", save_to_db),
).execute(raw_data)
result = await chain(
TaskStep("validate", validate_input),
TaskStep("process", process_data),
TaskStep("persist", save_to_db),
).execute(raw_data)

delay
def delay(fn: Callable[_P, Coroutine[Any, Any, _R]]) -> _DelayedCallable[_P, _R]
Decorate an async function to add a ``.delay()`` background-dispatch method.

The decorated function is unchanged when called directly — it still returns a coroutine that must be await-ed. The .delay() method is the only addition; it schedules the coroutine as a background asyncio.Task and stores the reference to prevent premature garbage collection.

Parameters
ParameterTypeDescription
`fn`Callable[_P, Coroutine[Any, Any, _R]]An async function (coroutine function) to wrap.
Returns
TypeDescription
_DelayedCallable[_P, _R]A _DelayedCallable that is callable exactly like ``fn`` but also exposes ``.delay(*args, **kwargs) -> asyncio.Task``.

Example

@delay
async def flush_audit_log(batch: list[AuditEntry]) -> None:
await audit_store.save_many(batch)
# Direct call (awaited in the current task):
await flush_audit_log(batch=entries)
# Background dispatch (does not block):
flush_audit_log.delay(batch=entries)

distributed_lock
async def distributed_lock(
    key: str,
    timeout: float = 60.0,
    lock_manager: LockManager | None = None
) -> AsyncGenerator[InMemoryLock, None]
Context manager for in-memory locks.

Deprecated: Use LockManager.acquire() instead for better DI integration.

Parameters
ParameterTypeDescription
`key`strUnique lock key
`timeout`floatLock timeout in seconds
`lock_manager`LockManager | NoneOptional LockManager instance (creates new if None)
Yields
TypeDescription
AsyncGenerator[InMemoryLock, None]InMemoryLock instance

Example

manager = LockManager() # Should be from DI container
async with distributed_lock("resource:123", lock_manager=manager):
await process_resource(123)

scheduled
def scheduled(
    cron: str,
    name: str | None = None,
    priority: Priority | int = Priority.NORMAL,
    max_retries: int = 3,
    timeout: float | None = None
) -> Callable[[Callable[Ellipsis, Any]], Any]
Decorator to define a scheduled task.
Parameters
ParameterTypeDescription
`cron`strCron expression (e.g., "0 9 * * *")
`name`str | NoneTask name
`priority`Priority | intTask priority
`max_retries`intMaximum retry attempts
`timeout`float | NoneExecution timeout
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Any]Decorated task function with scheduling metadata

task
def task(
    name: str | None = None,
    priority: Priority | int = Priority.NORMAL,
    max_retries: int = 3,
    timeout: float | None = None,
    queue: str = 'default',
    idempotency_key: str | None = None
) -> Callable[[Callable[Ellipsis, Any]], Any]
Decorator to define a task.

Converts a regular function into a task that can be enqueued and executed by workers.

Parameters
ParameterTypeDescription
`name`str | NoneTask name (defaults to function name)
`priority`Priority | intTask priority (HIGH, NORMAL, LOW)
`max_retries`intMaximum retry attempts on failure
`timeout`float | NoneExecution timeout in seconds
`queue`strQueue name for this task
`idempotency_key`str | NoneOptional key for idempotent execution
Returns
TypeDescription
Callable[[Callable[Ellipsis, Any]], Any]Decorated function with .delay() and .apply_async() methods

The queue has reached its maximum capacity and cannot accept new tasks.

The caller should back off and retry after a delay, or apply backpressure.

__init__
def __init__(
    message: str = 'Task queue is full',
    *,
    queue_name: str | None = None,
    capacity: int | None = None,
    **kwargs: Any
) -> None

Raised when a task is cancelled

Base exception for all task-related errors

Raised when a task execution fails
__init__
def __init__(
    message: str,
    original_error: Exception | None = None,
    details: dict[str, Any] | None = None,
    hint: str | None = None
) -> None

Raised when a task cannot be found

Raised when a task execution exceeds timeout

Raised when task parameters are invalid