Skip to content
GitHub

Architecture

Internal design of the lexigram-tasks package.


lexigram-tasks provides background asynchronous job processing within a Lexigram application. It is an extension package that depends on lexigram + lexigram-contracts and imports from lexigram-resilience for fault-tolerance primitives (the documented exception for cross-extension imports).

flowchart LR
    App[Application Code]
    Tasks[lexigram-tasks]
    Core[lexigram-core]
    Contracts[lexigram-contracts]
    Resilience[lexigram-resilience]

    App -->|schedules / enqueues| Tasks
    Tasks -->|depends on| Core
    Tasks -->|depends on| Contracts
    Tasks -->|retry + circuit breaker| Resilience
    Tasks -->|writes results| Results[(Result Store)]
    Tasks -->|persists to| Backend[(Queue Backend)]

Tasks operate outside the request-response cycle — they are deferred, retried, monitored, and managed through a dedicated worker pool.


The system follows a four-component pipeline:

scheduler → queue → worker → result
flowchart LR
    subgraph App[Application]
        Dec[@task decorator]
        Ts[TaskScheduler]
    end
    subgraph Queue[Queue Layer]
        Q[TaskQueueProtocol]
        DLQ[DeadLetterQueue]
    end
    subgraph Workers[Worker Pool]
        W1[Worker 1]
        W2[Worker 2]
        WN[Worker N]
    end
    subgraph Storage[Storage]
        RS[ResultStore]
        PS[ProgressStore]
    end

    Dec -->|enqueue| Q
    Ts -->|enqueue at cron time| Q
    Q -->|dequeue| W1
    Q -->|dequeue| W2
    Q -->|dequeue| WN
    W1 -->|success| RS
    W1 -->|permanent failure| DLQ
    W2 -->|progress updates| PS

Dispatch (dispatch/) provides the public delay() helper that resolves the queue from DI and enqueues a job in one call.


Tasks are defined with the @task decorator which wraps a function with enqueue metadata and registry methods:

from lexigram.tasks import task
@task(name="send_email", max_retries=3, timeout=30)
async def send_email(to: str, subject: str, body: str):
# handler logic
...
AttributeDefaultDescription
namefunction nameUnique task identifier
priorityPriority.NORMALQueue ordering (LOW=0, NORMAL=5, HIGH=10, CRITICAL=20)
max_retries3Retry attempts before DLQ
timeoutNoneExecution timeout in seconds
queue"default"Target queue name
idempotency_keyNoneKey for deduplication
# Direct call (no queue)
result = await send_email(to="a@b.com", ...)
# Signature without executing
job = send_email.signature(to="a@b.com", ...)
# Enqueue to a specific queue (async)
job = await send_email.apply_async(queue_instance, to="a@b.com", ...)

The HandlerRegistry maps task names to handler functions and satisfies TaskExecutorProtocol:

registry = HandlerRegistry()
# As decorator
@registry.register("send_email")
async def handle_send_email(to: str, ...):
...
# Direct registration
registry.register("send_email", handle_send_email)
# Execute by task name
result = await registry.execute(task)

The registry supports both sync and async handlers, raises TaskNotFoundError for unregistered tasks, and is injected into WorkerPool so workers pick up handlers registered after pool creation.


Recurring tasks use cron expressions via the @scheduled decorator:

from lexigram.tasks import scheduled
@scheduled(cron="0 9 * * 1-5", name="daily_report", timeout=120)
async def generate_daily_report():
...

The scheduler maintains a dict of ScheduledJob entries, each with a cron expression and next-run timestamp. On each tick (default 60s interval) it checks all enabled jobs, enqueues any that are due, and recalculates their next run.

sequenceDiagram
    participant App as Application
    participant TP as TaskProvider
    participant TS as TaskScheduler
    participant Q as TaskQueue
    participant W as WorkerPool

    App->>TP: register_scheduled_task(func)
    TP->>TS: schedule_job(job_template, cron)
    TS->>TS: calculate_next_run()
    activate TS
    Note over TS: loop every check_interval
    TS->>TS: cron.get_next() < time.now()
    TS-->>Q: enqueue(Job)
    deactivate TS
    Q-->>W: dequeue → execute
    W-->>TS: (next tick)
    TS->>TS: calculate_next_run() again
FeatureImplementation
Cron expressionsCronExpression parser supports standard 5-field cron
Dependency orderingDependencyResolver validates acyclic graph at schedule time
PersistenceSchedulerStore interface for durable schedule state
Enable/disablePer-job toggle via enable_job() / disable_job()
Restore on restartrestore_from_store() reloads persisted schedules

One-off delayed jobs use asyncio.create_task with the dispatch module. There is no separate “delay” queue — delayed enqueue is the caller’s responsibility.


lexigram-tasks supports four built-in queue backends via a registry pattern (backends/registry.py):

registry = TaskBackendRegistry.with_defaults()
# Registered: memory, redis, rabbitmq, postgres
queue = registry.create_backend(config) # → TaskQueueProtocol instance
BackendFactoryExtraProsCons
Memory_create_memoryNoneZero deps, instant setupVolatile, restart loses jobs
Redis_create_redislexigram-tasks[redis]Fast, persistent, pub/sub patternsRequires Redis server
RabbitMQ_create_rabbitmqlexigram-tasks[rabbitmq]Durable, routing, ACK semanticsHeavier infra
Postgres_create_postgreslexigram-tasks[postgres]No extra service, transactionalPoll-based, higher latency

TaskConfig.backends supports declaring multiple named queues. Each gets registered under Annotated[TaskQueueProtocol, Named(name)]:

queue = await container.resolve(
Annotated[TaskQueueProtocol, Named("notifications")]
)

The primary backend (first entry, or primary=True) also receives the unnamed TaskQueueProtocol binding for backward compatibility.

When a job exhausts its retries it is routed to the DeadLetterQueue for manual inspection or automated replay:

class DLQProtocol:
async def add(self, job, error, attempt_count) -> None
async def retry(self, job_id) -> bool # back to main queue
async def purge(self, job_id) -> bool # permanent delete

The WorkerPool manages N concurrent TaskWorker instances:

flowchart TB
    subgraph Pool[WorkerPool]
        direction LR
        W1[Worker 1]
        W2[Worker 2]
        WN[Worker N]
    end
    subgraph PerWorker[Per-Job Execution]
        subgraph M[Middleware Pipeline]
            L[LoggingMiddleware]
            Mw[MetricsMiddleware]
            T[TimeoutMiddleware]
        end
        H[Handler]
        R[ResultStore]
        DLQ[DeadLetterQueue]
    end

    Pool -->|dequeue| W1
    W1 --> M
    M --> H
    H -->|success| R
    H -->|retries exhausted| DLQ
pool = WorkerPool(
queue=redis_queue,
handler_registry=registry,
size=4,
dead_letter_queue=DeadLetterQueue(),
)
await pool.start()
await pool.scale_to(8) # dynamic scaling
await pool.stop(drain=True, timeout=30) # graceful shutdown
FeatureMechanism
Concurrencyasyncio — N workers each pulling from the same queue
Dynamic scalingscale_to(N) — adds or removes workers at runtime
Graceful drainstop(drain=True) — finish current job before exit
Metricsget_pool_stats() — active workers, jobs succeeded/failed, avg processing time
System metricsOptional psutil integration for CPU/memory per worker

Each worker loops: dequeue → resolve dependencies → middleware pipeline → execute handler → record result.

# Retry policy from lexigram-resilience (injected via DI)
if self._retry_policy is not None:
result = await self._retry_policy.execute(_execute)
else:
result = await _execute() # single attempt
Failure ScenarioHandling
TimeoutErrorCaught by TimeoutMiddleware, recorded as TaskTimeoutError
TaskNotFoundErrorNo handler registered — immediate fail
TaskExecutionErrorHandler raised — retry if policy configured
Permanent failureAfter retries exhausted → DeadLetterQueue + TaskFailedHook
Infrastructure failureRaised as exception (not Result) — propagates up

Built-in middleware stack runs per-job:

  1. LoggingMiddleware — structured log with job id, name, duration
  2. MetricsMiddleware — counters / timing to TaskMetricsCollector
  3. TimeoutMiddlewareasyncio.wait_for with capping at max_timeout

Custom middleware implements TaskMiddleware and is added to the pipeline.


TaskProvider integrates with the Lexigram DI lifecycle:

sequenceDiagram
    participant App as Application
    participant TP as TaskProvider
    participant C as Container
    participant Q as TaskQueue
    participant WP as WorkerPool
    participant TS as TaskScheduler
    participant HC as HealthChecker

    App->>TP: __init__(queue, worker_count)
    App->>C: freeze()
    C->>TP: register(container)
    TP->>C: singleton(TaskQueueProtocol)
    TP->>C: singleton(TaskExecutorProtocol)
    TP->>C: singleton(HandlerRegistry)
    TP->>C: singleton(WorkerPool)
    TP->>C: singleton(TaskScheduler)
    alt multi-backend
        TP->>C: singleton(TaskQueueProtocol, name="notifications")
        TP->>C: singleton(TaskQueueProtocol, name="batch")
    end
    TP->>C: discover entry points
    TP->>C: register admin widgets
    C-->>App: container ready

    App->>TP: boot(container)
    TP->>TP: connect named queues (parallel)
    TP->>TP: upgrade ResultStore to CacheBackendResultStore
    TP->>WP: WorkerPool(queue, handlers, N)
    WP->>WP: start workers (parallel ALL_SETTLED)
    alt scheduler enabled
        TP->>TS: TaskScheduler()
        TP->>TS: start_scheduler(enqueue_callback)
    end
    TP->>HC: register health check (READINESS)

    App->>TP: shutdown()
    TP->>TS: stop_scheduler()
    TP->>WP: stop(drain=True)
    TP->>Q: close (LIFO for named queues)

ProviderPriority.INFRASTRUCTURE (value 10) — boots early, after config and diagnostics.

The provider exposes a READINESS health check that probes get_task_count() on the queue and aggregates worker pool stats. In multi-backend mode the overall status is the worst individual status across all named queues.


All protocols consumed by lexigram-tasks live in lexigram.contracts.infra.tasks:

ProtocolImport PathUsage
TaskQueueProtocollexigram.contracts.infra.tasksCore queue contract — enqueue, dequeue, ack, nack, close
TaskExecutorProtocollexigram.contracts.infra.tasksHandler dispatch — execute, register_handler, get_handler
TaskProviderProtocollexigram.contracts.infra.tasksProvider lifecycle — register, boot, shutdown, health_check
JobProtocollexigram.contracts.infra.tasksJob data model — id, name, args, kwargs, priority, status
JobTemplateProtocollexigram.contracts.infra.tasksRecurring job template — name, args, kwargs, depends_on
TaskWorkerProtocollexigram.contracts.infra.tasksWorker contract — start, stop
DLQProtocollexigram.contracts.infra.tasksDead-letter queue — add, get, retry, purge
ProgressTrackerProtocollexigram.contracts.infra.tasks.progressProgress reporting for long-running jobs
CacheBackendProtocollexigram.contracts.infra.cacheOptional upgrade for distributed result storage
RetryConfiglexigram.contracts.infra.resilience.modelsRetry policy configuration
HookRegistryProtocollexigram.contracts.coreLifecycle hooks for task events
HealthCheckResultlexigram.contracts.core.healthHealth check data structure

PointMechanism
Custom backendImplement TaskQueueProtocol + register via lexigram.tasks.backends entry point, or use TaskBackendRegistry.register()
Custom middlewareSubclass TaskMiddleware and add to TaskMiddlewarePipeline
Custom result storeImplement ResultStore and inject via DI
Custom progress storeImplement ProgressTrackerProtocol and inject via DI
Custom error handlerUse RetryPolicyProtocol from lexigram-resilience for custom retry/backoff
WorkflowsUse TaskChain, TaskGroup, TaskChord for composed task graphs
Admin widgetsExtend TasksAdminContributor or register widget handlers
CLI commandsExtend TasksCliContributor
Scheduler storeImplement SchedulerStore for durable schedule persistence
Named multi-backendDeclare backends in TaskConfig — each gets a named DI binding