Guide
Requirements
Section titled “Requirements”| Package | Required | Purpose |
|---|---|---|
lexigram | Yes | Core framework |
lexigram-contracts | Yes | Protocol definitions |
redis | Recommended | Redis queue backend |
aio-pika | Optional | RabbitMQ queue backend |
boto3 | Optional | SQS queue backend |
Problem
Section titled “Problem”Applications need to exchange messages between services, decouple producers from consumers, and handle backpressure, retries, and dead-letter routing. lexigram-queue provides a unified, DI-friendly abstraction over six queue backends — memory, Redis, RabbitMQ, Kafka, SQS, Azure Service Bus, and GCP Pub/Sub — so you can switch backends without changing application code.
Mental Model
Section titled “Mental Model”The queue system is structured around three layers:
┌──────────────┐ publish/subscribe ┌──────────────┐│ Producer │ ───────────────────────▶ │ Consumer ││ (any code) │ BusMessage(topic, │ (handler) ││ │ payload, headers) │ │└──────────────┘ └──────────────┘ │ ▲ │ │ ▼ │┌──────────────────────────────────────────────────┐│ QueueProtocol implementations ││ ┌────────┐ ┌──────────┐ ┌───────┐ ┌─────────┐ ││ │ Memory │ │ Redis │ │ Kafka │ │ RabbitMQ│ │ ...│ └────────┘ └──────────┘ └───────┘ └─────────┘ ││ Named multi-backend via DI │└──────────────────────────────────────────────────┘Core Concepts
Section titled “Core Concepts”QueueProtocol
Section titled “QueueProtocol”QueueProtocol (from lexigram.contracts.queue) defines the interface every backend implements:
class QueueProtocol: async def connect(self) -> None: ... async def close(self) -> None: ... async def publish(self, topic: str, message: BusMessage) -> None: ... async def subscribe( self, topic: str, handler: Callable[[BusMessage], Coroutine], ) -> None: ... async def health_check(self, timeout: float = 5.0) -> HealthCheckResult: ...BusMessage
Section titled “BusMessage”Messages are BusMessage instances (from lexigram.contracts.queue):
from lexigram.contracts.queue import BusMessage, DeliveryGuarantee
msg = BusMessage( topic="orders.created", payload={"order_id": "ord-1"}, headers={"source": "api"}, delivery_guarantee=DeliveryGuarantee.AT_LEAST_ONCE,)QueueModule
Section titled “QueueModule”Use QueueModule to register backends into the DI container:
from lexigram.queue import QueueModulefrom lexigram.queue.config import QueueConfig
config = QueueConfig(backends=[...])module = QueueModule.configure(config) # returns DynamicModuleNamed Multi-Backend
Section titled “Named Multi-Backend”You can run multiple queue backends in the same application. Each backend gets registered under its name using Named() injection:
from typing import Annotatedfrom lexigram.contracts.queue import QueueProtocolfrom lexigram.di import Named
class OrderService: def __init__( self, primary: QueueProtocol, # primary backend events: Annotated[QueueProtocol, Named("events")], # named backend ) -> None: ...The primary backend (marked primary=True or the first entry) also gets the unnamed QueueProtocol binding.
Transactional Outbox
Section titled “Transactional Outbox”TransactionalOutbox guarantees at-least-once delivery by storing messages in-process before publishing:
from lexigram.queue import TransactionalOutbox
outbox = TransactionalOutbox(queue, flush_interval=1.0)await outbox.enqueue(BusMessage(topic="orders", payload=data))Dead Letter Queue
Section titled “Dead Letter Queue”DeadLetterQueue stores messages that exceeded their retry limit:
from lexigram.queue import DeadLetterQueue
dlq = DeadLetterQueue(queue, max_retries=3)dlq.monitor_failures() # routes to DLQ topicMessage Pipeline
Section titled “Message Pipeline”MessagePipeline chains middleware around message processing:
from lexigram.queue import MessagePipeline, MiddlewareBase
class LoggingMiddleware(MiddlewareBase): async def __call__(self, message, next_handler): logger.info("processing", message_id=message.id) return await next_handler(message)
pipeline = MessagePipeline([LoggingMiddleware()])Typical Usage
Section titled “Typical Usage”import asyncio
from lexigram import Applicationfrom lexigram.contracts.queue import BusMessage, QueueProtocolfrom lexigram.di.module import module, Modulefrom lexigram.queue import QueueModulefrom lexigram.queue.config import QueueConfig, NamedQueueConfig
config = QueueConfig(backends=[ NamedQueueConfig(name="default", driver="memory", primary=True),])
@module(imports=[QueueModule.configure(config)])class AppModule(Module): pass
async def main() -> None: async with Application.boot(name="app", modules=[AppModule]) as app: queue = await app.container.resolve(QueueProtocol)
async def handler(msg: BusMessage) -> None: print(f"Handled: {msg.payload}")
await queue.subscribe("notifications", handler) await queue.publish("notifications", BusMessage( topic="notifications", payload={"text": "Hello"}, )) await asyncio.sleep(0.1)
asyncio.run(main())Best Practices
Section titled “Best Practices”- Always subscribe before publishing in the same process to avoid race conditions (in-memory backend).
- Use named backends when different message types have different delivery requirements (e.g., Redis for fast notifications, Kafka for durable event streaming).
- Set
max_retriesper backend to control dead-letter behavior. - Wrap cleanup in
try/finallywhen manually managingQueueProtocollifecycle. - Use
TransactionOutboxfor reliable multi-service message emission.