Skip to content
GitHub

Tasks (lexigram-tasks)

Background task processing for Lexigram Framework — Scheduling, workers, and job queues


lexigram-tasks provides background task execution, scheduling, and async job queues with support for Redis, RabbitMQ, PostgreSQL, and in-memory backends. It features configurable retry policies, dead-letter queues, priority queues, rate limiting, cron scheduling, and Prometheus metrics. All services are wired via TasksProvider, which registers task protocols with the DI container.


Full documentation: docs.lexigram.dev

Terminal window
uv add lexigram-tasks
# Optional extras
uv add "lexigram-tasks[redis]" # Redis backend
uv add "lexigram-tasks[rabbitmq]" # RabbitMQ backend
from lexigram import Application
from lexigram.di.module import Module, module
# Import the module from the package
from lexigram.tasks import TasksModule
@module(imports=[TasksModule.configure(...)])
class AppModule(Module):
pass
app = Application(modules=[AppModule])
if __name__ == "__main__":
app.run()

Zero-config usage: Call TasksModule.configure() with no arguments to use defaults.

application.yaml
tasks:
backend:
type: redis
redis_url: redis://localhost:6379/0
worker:
worker_count: 4
max_concurrent_tasks: 10
scheduler:
enabled: true
timezone: UTC
Section titled “Option 2 — Profiles + Environment Variables (recommended)”
Terminal window
export LEX_TASKS__ENABLED=true
# Environment variables for each field
from lexigram.tasks.config import TaskConfig
from lexigram.tasks import TasksModule
config = TaskConfig(
backend=TaskBackendConfig(type="redis", redis_url="redis://localhost:6379/0"),
worker=TaskWorkerConfig(worker_count=4, max_concurrent_tasks=10),
)
TasksModule.configure(queue=queue, worker_count=4, enable_scheduler=True)
FieldDefaultEnv varDescription
backend.typememoryLEX_TASKS__BACKEND__TYPEQueue backend (redis, rabbitmq, postgres, memory)
backend.redis_urlredis://localhost:6379/0LEX_TASKS__BACKEND__REDIS_URLRedis connection URL
backend.amqp_urlamqp://localhostLEX_TASKS__BACKEND__AMQP_URLAMQP connection URL
worker.worker_count4LEX_TASKS__WORKER__WORKER_COUNTNumber of worker processes
worker.max_concurrent_tasks10LEX_TASKS__WORKER__MAX_CONCURRENT_TASKSMax tasks executed in parallel per worker
worker.default_timeout300LEX_TASKS__WORKER__DEFAULT_TIMEOUTTask execution timeout in seconds
worker.max_retries3LEX_TASKS__WORKER__MAX_RETRIESMaximum retry attempts per task
scheduler.enabledtrueLEX_TASKS__SCHEDULER__ENABLEDEnable cron-based task scheduling
scheduler.timezoneUTCLEX_TASKS__SCHEDULER__TIMEZONETimezone for cron schedule evaluation
MethodDescription
TasksModule.configure(queue, worker_count, enable_scheduler)Configure with explicit queue and worker settings
TasksModule.stub()Minimal config for testing
  • Multiple backends — Redis, RabbitMQ, PostgreSQL (transactional), in-memory
  • Retry policies — Configurable retries, backoff, max_delay per task
  • Dead-letter queue — Failed jobs routed to DLQ for inspection / replay
  • Priority queuesurgent, default, bulk built-in; custom queues supported
  • Rate limiting — Per-queue and per-task throughput caps
  • Concurrency — Bounded worker pool with backpressure
  • Cron scheduling — Cron-expression task scheduling via @scheduled decorator
  • Observability — Prometheus metrics for queue depth, latency, error rate
  • Health checks/health/tasks endpoint via lexigram-monitor
async with Application.boot(modules=[TasksModule.stub()]) as app:
# your test code
...

A container-injectable service for fire-and-go tasks that ensures no task handle is lost and all pending work is cancelled on framework shutdown.

from lexigram.tasks import BackgroundTaskManager
class MyService:
def __init__(self, task_manager: BackgroundTaskManager) -> None:
self._tasks = task_manager
async def kick_off_work(self) -> None:
self._tasks.track(self._do_something())
async def kick_off_named_work(self) -> None:
self._tasks.track_named("my-named-job", self._do_something())
async def check_pending(self) -> int:
return self._tasks.pending_count
# In your Provider.shutdown():
await task_manager.shutdown(timeout=30.0)

Register as a singleton in your provider:

from lexigram.tasks import BackgroundTaskManager
from lexigram.di.provider import Provider
class MyProvider(Provider):
async def register(self, container):
container.singleton(BackgroundTaskManager, BackgroundTaskManager())
async def shutdown(self):
mgr = await self._container.resolve(BackgroundTaskManager)
await mgr.shutdown(timeout=30.0)

A base class for services that run a cycle of work on a fixed interval — replacing hand-rolled while not stop_event loops.

from lexigram.tasks import BackgroundTaskManager, OnErrorPolicy, ScheduledWorker
class RetentionWorker(ScheduledWorker):
interval_seconds = 3600.0 # run every hour
initial_delay_seconds = 5.0 # wait 5 s before the first cycle
on_error_policy = OnErrorPolicy.LOG_AND_CONTINUE # (default)
async def run_cycle(self) -> None:
await self._repo.delete_expired_records()
# In your provider.boot():
task_manager = await container.resolve(BackgroundTaskManager)
self._worker = RetentionWorker(task_manager=task_manager)
await self._worker.start()
# In your provider.shutdown():
await self._worker.stop()

Override at construction time to tune per-instance without subclassing:

worker = RetentionWorker(
task_manager=task_manager,
interval_seconds=300.0,
max_jitter_seconds=30.0,
on_error_policy=OnErrorPolicy.BACKOFF,
)

OnErrorPolicy values:

ValueBehaviour on run_cycle error
LOG_AND_CONTINUELog the exception and resume on the next interval (default).
BACKOFFLog and double the sleep time (up to 10× interval).
STOPLog and stop the worker permanently.
FileWhat it contains
src/lexigram/tasks/module.pyTasksModule class with factory methods
src/lexigram/tasks/di/provider.pyTasksProvider — wires task protocols into DI container
src/lexigram/tasks/config.pyTaskConfig and sub-config classes
src/lexigram/tasks/backends/Backend implementations (memory, redis, rabbitmq, postgres)
src/lexigram/tasks/scheduling/Cron scheduler and scheduled task decorators
src/lexigram/tasks/background_task_manager.pyBackgroundTaskManager — lifecycle-aware task tracking (LEX-006)
src/lexigram/tasks/scheduled_worker.pyScheduledWorker + OnErrorPolicy — periodic worker base class (LEX-005)