Skip to content
GitHub

Backends

lexigram-queue provides pluggable queue backends behind the QueueProtocol contract from lexigram-contracts. All backends support the same publish/consume/dead-letter API with per-backend configuration.

BackendExtraDriverProduction-readyBest for
Redislexigram-queue[redis]redis[asyncio]>=5.0YesSimple queues, task distribution
RabbitMQlexigram-queue[rabbitmq]aio-pika>=9.0YesReliable messaging, routing, RPC
Kafkalexigram-queue[kafka]aiokafka>=0.10YesHigh-throughput event streaming
SQSlexigram-queue[sqs]aiobotocore>=2.0YesAWS-managed queues, serverless
Memory(core)in-process queue.QueueDevelopment / testLocal dev, unit tests

Install extras: pip install lexigram-queue[redis,rabbitmq,kafka,sqs].

:::tip Azure Service Bus & GCP Pub/Sub Config models (AzureServiceBusDriverConfig, GCPPubSubDriverConfig) and backend implementations (azure_servicebus.py, gcp_pubsub.py) exist in the source tree but are not yet wired in QueueProvider._create_backend(). PRs welcome on GitHub. :::

Simple queue backed by Redis lists and pub/sub channels.

  • Strengths: Low latency, minimal configuration, good for small-to-medium throughput. Reuses the same Redis you may already run for caching.
  • Weaknesses: No native routing, no message acknowledgments (consumer must handle failures). Limited durability — messages can be lost on Redis restart without persistence.
  • When to choose: Simple task queues, internal background jobs, or when you already run Redis and don’t need Kafka/RabbitMQ-level guarantees.
queue:
backends:
- name: jobs
driver: redis
primary: true
redis:
url: redis://localhost:6379/0
max_connections: 10

Full-featured AMQP message broker via aio-pika.

  • Strengths: Reliable delivery, exchange-based routing, consumer acknowledgments, dead-letter exchanges, and per-queue TTL. Well-suited for RPC patterns and work queues.
  • Weaknesses: Heavier than Redis — requires a RabbitMQ server. Performance degrades under very high throughput compared to Kafka.
  • When to choose: You need reliable delivery, flexible routing (direct/fanout/topic exchanges), or consumer acknowledgments.
queue:
backends:
- name: events
driver: rabbitmq
primary: true
rabbitmq:
url: amqp://guest:guest@localhost/
exchange: lexigram
prefetch_count: 10

Distributed event streaming via aiokafka.

  • Strengths: Extreme throughput, log-based persistence, replayability, fault tolerance, and consumer group scaling. Messages are persisted on disk with configurable retention.
  • Weaknesses: Higher operational complexity (requires ZooKeeper/KRaft). Higher latency per-message than Redis or RabbitMQ. Topic/partition management adds conceptual overhead.
  • When to choose: High-throughput event streams, audit logs, analytics pipelines, or any workload that needs message replay and long-term retention.
queue:
backends:
- name: events
driver: kafka
primary: true
kafka:
bootstrap_servers: localhost:9092
group_id: my-consumers
auto_offset_reset: earliest

AWS-managed queue service via aiobotocore.

  • Strengths: Fully managed, auto-scaling, no servers to operate. Supports DLQ, visibility timeouts, and FIFO queues for exactly-once delivery.
  • Weaknesses: Dependence on AWS. Higher latency than Redis/RabbitMQ. Limited throughput for standard queues (unlimited though) and 300 TPS for FIFO.
  • When to choose: You’re on AWS and want a zero-ops queue, need exactly-once processing (FIFO), or want to decouple microservices without managing middleware.
queue:
backends:
- name: tasks
driver: sqs
primary: true
sqs:
region: us-east-1
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
visibility_timeout: 60

Zero-dependency in-process queue backed by asyncio.Queue.

  • Strengths: No server, no config. Instant setup for testing and local development.
  • Weaknesses: Data lost on process restart. Cannot communicate across processes or machines.
  • When to choose: Unit tests, CI pipelines, local development, single-process apps.
queue:
backends:
- name: local
driver: memory
primary: true
  • I need simple, fast task queuesredis.
  • I need reliable delivery with routingrabbitmq.
  • I need high-throughput event streamingkafka.
  • I’m on AWS and want zero-opssqs.
  • I’m writing unit testsmemory. No extra needed.

Named backends allow multiple queue systems in one app:

queue:
backends:
- name: commands
driver: rabbitmq
primary: true
rabbitmq:
url: amqp://guest:guest@localhost/
- name: events
driver: kafka
kafka:
bootstrap_servers: localhost:9092
- name: tasks
driver: redis
redis:
url: redis://localhost:6379/1

Resolve:

from typing import Annotated
from lexigram.contracts.queue import QueueProtocol
from lexigram.di import Named
class WorkflowOrchestrator:
def __init__(
self,
commands: Annotated[QueueProtocol, Named("commands")],
events: Annotated[QueueProtocol, Named("events")],
):
...

Use the memory backend for tests:

from lexigram.queue import QueueModule, QueueConfig, NamedQueueConfig
config = QueueConfig(
backends=[
NamedQueueConfig(name="test", driver="memory", primary=True),
]
)
module = QueueModule.configure(config)