Skip to content
GitHubDiscord

Queue (lexigram-queue)

Message bus and queue with Named DI multi-backend support for the Lexigram Framework.


lexigram-queue provides async message queue and bus functionality with Redis, RabbitMQ, Kafka, SQS, and in-memory backends. It includes consumer discovery, dead-letter queue routing after max retries, transactional outbox for atomic DB+message publishing, and composable middleware pipelines — all wired through the DI container.


Terminal window
uv add lexigram lexigram-queue
# With Redis support
uv add "lexigram-queue[redis]"
# With RabbitMQ support
uv add "lexigram-queue[rabbitmq]"
# With Kafka support
uv add "lexigram-queue[kafka]"
# With AWS SQS support
uv add "lexigram-queue[sqs]"
from lexigram import Application
from lexigram.di.module import Module, module
from lexigram.queue import MessageConsumer, QueueModule
from lexigram.queue.config import KafkaDriverConfig, NamedQueueConfig, QueueConfig
from lexigram.contracts.queue.protocols import QueueProtocol
class OrderConsumer(MessageConsumer):
topic = "orders"
async def handle(self, message: dict) -> None:
print(f"Processing order: {message}")
@module(
imports=[
QueueModule.configure(
QueueConfig(
backends=[
NamedQueueConfig(
name="primary",
primary=True,
driver="kafka",
kafka=KafkaDriverConfig(
bootstrap_servers="localhost:9092",
),
)
]
)
),
QueueModule.scope(OrderConsumer),
]
)
class AppModule(Module):
pass
async def main() -> None:
async with Application.boot(modules=[AppModule]) as app:
queue = await app.container.resolve(QueueProtocol)
await queue.publish("orders", {"order_id": "12345", "total": 99.99})
if __name__ == "__main__":
import asyncio
asyncio.run(main())

Zero-config usage: Call QueueModule.configure() with no arguments to use all defaults (memory backend).

application.yaml
queue:
backends:
- name: default
primary: true
driver: kafka
max_retries: 3
kafka:
bootstrap_servers: "localhost:9092"
group_id: "lexigram-consumers"
Section titled “Option 2 — Profiles + Environment Variables (recommended)”
Terminal window
export LEX_QUEUE__ENABLED=true
export LEX_QUEUE__BACKENDS__0__DRIVER=kafka
from lexigram.queue import QueueModule
from lexigram.queue.config import QueueConfig, NamedQueueConfig, KafkaDriverConfig
QueueModule.configure(
QueueConfig(
backends=[
NamedQueueConfig(
name="default",
primary=True,
driver="kafka",
kafka=KafkaDriverConfig(
bootstrap_servers="localhost:9092",
),
),
]
)
)
FieldDefaultEnv varDescription
backends[]LEX_QUEUE__BACKENDSList of named queue backend configurations
backends[n].name(required)LEX_QUEUE__BACKENDS__N__NAMEUnique identifier used for Named() injection
backends[n].driver"memory"LEX_QUEUE__BACKENDS__N__DRIVERDriver: memory, redis, rabbitmq, kafka, sqs
backends[n].primaryfalseLEX_QUEUE__BACKENDS__N__PRIMARYAlso register as unnamed QueueProtocol binding
backends[n].max_retries3LEX_QUEUE__BACKENDS__N__MAX_RETRIESRetries before routing to DLQ
backends[n].redis.urlnullLEX_QUEUE__BACKENDS__N__REDIS__URLRedis connection URL
backends[n].kafka.bootstrap_serversnullLEX_QUEUE__BACKENDS__N__KAFKA__BOOTSTRAP_SERVERSKafka broker addresses (comma-separated)
backends[n].kafka.group_id"lexigram-consumers"LEX_QUEUE__BACKENDS__N__KAFKA__GROUP_IDKafka consumer group ID
backends[n].rabbitmq.urlnullLEX_QUEUE__BACKENDS__N__RABBITMQ__URLRabbitMQ connection URL
backends[n].sqs.queue_urlnullLEX_QUEUE__BACKENDS__N__SQS__QUEUE_URLSQS queue URL
MethodDescription
QueueModule.configure(config=None)Register queue backends; exports QueueProtocol
QueueModule.scope(*consumers)Scope consumer classes into a feature module
QueueModule.stub(config=None)In-memory backend for testing
  • Multi-backend messaging — Redis Pub/Sub, RabbitMQ, Kafka, AWS SQS, and in-memory
  • Consumer discoveryMessageConsumer subclasses auto-discovered and registered
  • Dead-letter queue (DLQ) — failed messages routed to DLQ after max_retries
  • Transactional outbox — atomic DB transaction + message publish via TransactionalOutbox
  • Message middleware pipelines — composable logging, validation, transformation middleware
  • Named DI multi-backendAnnotated[QueueProtocol, Named("events")] for multiple backends
  • Retry logic — exponential backoff before DLQ routing
  • Consumer groups — Kafka consumer groups and RabbitMQ consumer tags for load balancing
BackendDurabilityOrderingThroughputUse Case
MemoryNoneFIFOVery HighDevelopment, testing
Redis Pub/SubAt-most-onceNo guaranteeVery HighReal-time events, ephemeral messages
RabbitMQAt-least-oncePer-queueHighTask queues, work distribution
KafkaAt-least-oncePer-partitionVery HighEvent streams, audit logs
SQSAt-least-onceBest-effort (FIFO available)HighAWS-native, decoupled systems
from lexigram import Application
from lexigram.queue import QueueModule
from lexigram.contracts.queue.protocols import QueueProtocol
async def test_message_consumer():
async with Application.boot(
modules=[QueueModule.stub()]
) as app:
queue = await app.container.resolve(QueueProtocol)
await queue.publish("test-topic", {"key": "value"})
# Test with in-memory backend
FileWhat it contains
src/lexigram/queue/module.pyQueueModule.configure(), .scope(), .stub()
src/lexigram/queue/config.pyQueueConfig, NamedQueueConfig, backend configs
src/lexigram/queue/di/provider.pyQueueProvider boot and registration
src/lexigram/queue/consumer.pyMessageConsumer base class
src/lexigram/queue/core/dlq.pyDeadLetterQueue implementation
src/lexigram/queue/core/outbox.pyTransactionalOutbox implementation
src/lexigram/queue/core/pipeline.pyMessagePipeline and MiddlewareBase
src/lexigram/queue/backends/kafka.pyKafka backend implementation
src/lexigram/queue/backends/rabbitmq.pyRabbitMQ backend implementation
src/lexigram/queue/backends/redis.pyRedis backend implementation