Skip to content
GitHub

Backends

lexigram-tasks provides background job processing through a unified TaskQueueProtocol interface. Each backend implements enqueue, dequeue, task count, and connection lifecycle management. The TaskBackendRegistry manages the type-string to factory mapping with built-in support for four backends.

BackendExtra / PackageProduction ReadyBest For
Redis[redis]YesHigh-throughput, persistent queues, pub/sub
RabbitMQ[rabbitmq]YesReliable delivery, routing, DLQ
Postgres(core)YesACID queues, no extra infra
Memory(none)NoUnit tests, prototyping

A Redis-backed task queue using the redis and rq libraries. Provides persistent queue storage with optional result persistence via CacheBackendResultStore. Best for high-throughput task pipelines where Redis is already part of your stack. Supports rate limiting and global rate limiters.

tasks:
backend:
type: redis
redis_url: "${REDIS_URL}"
queue_name: default
worker:
worker_count: 4
poll_interval: 0.5
from lexigram.tasks import TaskProvider, RedisTaskQueue
queue = RedisTaskQueue(redis_url="redis://localhost:6379", queue_name="default")
provider = TaskProvider(queue=queue, worker_count=4)

An AMQP-backed task queue using aio-pika. Provides reliable message delivery with acknowledgements, dead-letter exchanges, and flexible routing. Best for workflows that need guaranteed delivery, message TTL, or integration with non-Python consumers.

tasks:
backend:
type: rabbitmq
amqp_url: "${AMQP_URL}"
queue_name: default
worker:
worker_count: 4
from lexigram.tasks import TaskProvider, RabbitMQTaskQueue
queue = RabbitMQTaskQueue(amqp_url="amqp://guest:guest@localhost:5672/", queue_name="default")

A PostgreSQL-backed task queue that uses the database itself as the queue substrate. Requires postgres_dsn configuration. Best for applications that want ACID guarantees without maintaining a separate message broker, or for lightweight queue needs alongside an existing Postgres deployment.

tasks:
backend:
type: postgres
postgres_dsn: "${DATABASE_URL}"
queue_name: default

An in-process asyncio.Queue-backed store. All enqueued tasks are lost on process restart. The provider logs a warning if a MemoryTaskQueue is used outside of development or testing environments.

from lexigram.tasks import TaskProvider, MemoryTaskQueue
provider = TaskProvider(queue=MemoryTaskQueue(), worker_count=2)

lexigram-tasks includes a TaskScheduler that runs cron-based job scheduling alongside the worker pool. Scheduled tasks are defined with the @scheduled decorator:

from lexigram.tasks import scheduled
@scheduled(cron="0 */6 * * *")
async def sync_products():
"""Refresh product catalogue every 6 hours."""
...

The scheduler supports cron expressions (via croniter), timezone configuration, and per-job template management. Scheduling is backend-independent — it works with any task queue backend.

Failed tasks can be routed to a DeadLetterQueue for inspection and replay. Each FailureRecord captures the original job, error message, and failure timestamp. The DLQ pairs naturally with RabbitMQ’s dead-letter exchange mechanism.

from lexigram.tasks import DeadLetterQueue
dlq = DeadLetterQueue()
record = await dlq.get("task-123")

The WorkerPool manages concurrent task execution with configurable concurrency, polling intervals, and graceful shutdown. Workers pick up tasks from the queue, execute registered handlers, and emit lifecycle events (TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent).

SettingDefaultDescription
worker_count1Number of concurrent workers
poll_interval0.5sInterval between queue polls
default_timeout300sDefault per-task timeout
shutdown_timeout30sGraceful shutdown window
If you need…Choose…
High throughput, existing RedisRedis
Guaranteed delivery, routing, DLXRabbitMQ
ACID guarantees, no broker infraPostgres
Unit tests, local devMemory
tasks:
backends:
- name: primary
primary: true
type: redis
redis_url: "${REDIS_URL}"
- name: notifications
type: rabbitmq
amqp_url: "${AMQP_URL}"
from lexigram.tasks import MemoryTaskQueue, TaskProvider
queue = MemoryTaskQueue()
provider = TaskProvider(queue=queue, worker_count=1)