Tasks (lexigram-tasks)
Background task processing for Lexigram Framework — Scheduling, workers, and job queues
Overview
Section titled “Overview”lexigram-tasks provides background task execution, scheduling, and async job queues with support for Redis, RabbitMQ, PostgreSQL, and in-memory backends. It features configurable retry policies, dead-letter queues, priority queues, rate limiting, cron scheduling, and Prometheus metrics. All services are wired via TasksProvider, which registers task protocols with the DI container.
Full documentation: docs.lexigram.dev
Install
Section titled “Install”uv add lexigram-tasks# Optional extrasuv add "lexigram-tasks[redis]" # Redis backenduv add "lexigram-tasks[rabbitmq]" # RabbitMQ backendQuick Start
Section titled “Quick Start”from lexigram import Applicationfrom lexigram.di.module import Module, module
# Import the module from the packagefrom lexigram.tasks import TasksModule
@module(imports=[TasksModule.configure(...)])class AppModule(Module): pass
app = Application(modules=[AppModule])if __name__ == "__main__": app.run()Configuration
Section titled “Configuration”Zero-config usage: Call
TasksModule.configure()with no arguments to use defaults.
Option 1 — YAML file
Section titled “Option 1 — YAML file”tasks: backend: type: redis redis_url: redis://localhost:6379/0 worker: worker_count: 4 max_concurrent_tasks: 10 scheduler: enabled: true timezone: UTCOption 2 — Profiles + Environment Variables (recommended)
Section titled “Option 2 — Profiles + Environment Variables (recommended)”export LEX_TASKS__ENABLED=true# Environment variables for each fieldOption 3 — Python
Section titled “Option 3 — Python”from lexigram.tasks.config import TaskConfigfrom lexigram.tasks import TasksModule
config = TaskConfig( backend=TaskBackendConfig(type="redis", redis_url="redis://localhost:6379/0"), worker=TaskWorkerConfig(worker_count=4, max_concurrent_tasks=10),)TasksModule.configure(queue=queue, worker_count=4, enable_scheduler=True)Config reference
Section titled “Config reference”| Field | Default | Env var | Description |
|---|---|---|---|
backend.type | memory | LEX_TASKS__BACKEND__TYPE | Queue backend (redis, rabbitmq, postgres, memory) |
backend.redis_url | redis://localhost:6379/0 | LEX_TASKS__BACKEND__REDIS_URL | Redis connection URL |
backend.amqp_url | amqp://localhost | LEX_TASKS__BACKEND__AMQP_URL | AMQP connection URL |
worker.worker_count | 4 | LEX_TASKS__WORKER__WORKER_COUNT | Number of worker processes |
worker.max_concurrent_tasks | 10 | LEX_TASKS__WORKER__MAX_CONCURRENT_TASKS | Max tasks executed in parallel per worker |
worker.default_timeout | 300 | LEX_TASKS__WORKER__DEFAULT_TIMEOUT | Task execution timeout in seconds |
worker.max_retries | 3 | LEX_TASKS__WORKER__MAX_RETRIES | Maximum retry attempts per task |
scheduler.enabled | true | LEX_TASKS__SCHEDULER__ENABLED | Enable cron-based task scheduling |
scheduler.timezone | UTC | LEX_TASKS__SCHEDULER__TIMEZONE | Timezone for cron schedule evaluation |
Module Factory Methods
Section titled “Module Factory Methods”| Method | Description |
|---|---|
TasksModule.configure(queue, worker_count, enable_scheduler) | Configure with explicit queue and worker settings |
TasksModule.stub() | Minimal config for testing |
Key Features
Section titled “Key Features”- Multiple backends — Redis, RabbitMQ, PostgreSQL (transactional), in-memory
- Retry policies — Configurable
retries,backoff,max_delayper task - Dead-letter queue — Failed jobs routed to DLQ for inspection / replay
- Priority queues —
urgent,default,bulkbuilt-in; custom queues supported - Rate limiting — Per-queue and per-task throughput caps
- Concurrency — Bounded worker pool with backpressure
- Cron scheduling — Cron-expression task scheduling via
@scheduleddecorator - Observability — Prometheus metrics for queue depth, latency, error rate
- Health checks —
/health/tasksendpoint vialexigram-monitor
Testing
Section titled “Testing”async with Application.boot(modules=[TasksModule.stub()]) as app: # your test code ...BackgroundTaskManager
Section titled “BackgroundTaskManager”A container-injectable service for fire-and-go tasks that ensures no task handle is lost and all pending work is cancelled on framework shutdown.
from lexigram.tasks import BackgroundTaskManager
class MyService: def __init__(self, task_manager: BackgroundTaskManager) -> None: self._tasks = task_manager
async def kick_off_work(self) -> None: self._tasks.track(self._do_something())
async def kick_off_named_work(self) -> None: self._tasks.track_named("my-named-job", self._do_something())
async def check_pending(self) -> int: return self._tasks.pending_count
# In your Provider.shutdown():await task_manager.shutdown(timeout=30.0)Register as a singleton in your provider:
from lexigram.tasks import BackgroundTaskManagerfrom lexigram.di.provider import Provider
class MyProvider(Provider): async def register(self, container): container.singleton(BackgroundTaskManager, BackgroundTaskManager())
async def shutdown(self): mgr = await self._container.resolve(BackgroundTaskManager) await mgr.shutdown(timeout=30.0)ScheduledWorker
Section titled “ScheduledWorker”A base class for services that run a cycle of work on a fixed interval — replacing hand-rolled while not stop_event loops.
from lexigram.tasks import BackgroundTaskManager, OnErrorPolicy, ScheduledWorker
class RetentionWorker(ScheduledWorker): interval_seconds = 3600.0 # run every hour initial_delay_seconds = 5.0 # wait 5 s before the first cycle on_error_policy = OnErrorPolicy.LOG_AND_CONTINUE # (default)
async def run_cycle(self) -> None: await self._repo.delete_expired_records()
# In your provider.boot():task_manager = await container.resolve(BackgroundTaskManager)self._worker = RetentionWorker(task_manager=task_manager)await self._worker.start()
# In your provider.shutdown():await self._worker.stop()Override at construction time to tune per-instance without subclassing:
worker = RetentionWorker( task_manager=task_manager, interval_seconds=300.0, max_jitter_seconds=30.0, on_error_policy=OnErrorPolicy.BACKOFF,)OnErrorPolicy values:
| Value | Behaviour on run_cycle error |
|---|---|
LOG_AND_CONTINUE | Log the exception and resume on the next interval (default). |
BACKOFF | Log and double the sleep time (up to 10× interval). |
STOP | Log and stop the worker permanently. |
Key Source Files
Section titled “Key Source Files”| File | What it contains |
|---|---|
src/lexigram/tasks/module.py | TasksModule class with factory methods |
src/lexigram/tasks/di/provider.py | TasksProvider — wires task protocols into DI container |
src/lexigram/tasks/config.py | TaskConfig and sub-config classes |
src/lexigram/tasks/backends/ | Backend implementations (memory, redis, rabbitmq, postgres) |
src/lexigram/tasks/scheduling/ | Cron scheduler and scheduled task decorators |
src/lexigram/tasks/background_task_manager.py | BackgroundTaskManager — lifecycle-aware task tracking (LEX-006) |
src/lexigram/tasks/scheduled_worker.py | ScheduledWorker + OnErrorPolicy — periodic worker base class (LEX-005) |