Queue & Message Bus
lexigram-queue provides async publish/subscribe messaging behind a single protocol. Application code depends on QueueProtocol; the backend (in-memory, Redis, RabbitMQ, Kafka, SQS, Azure Service Bus, or GCP Pub/Sub) is chosen in configuration. You can swap backends, run several side-by-side, and substitute an in-memory stub in tests without touching the services that use them.
For the full configuration reference and advanced features (transactional outbox, middleware pipelines, dead-letter queues), see the lexigram-queue package docs.
1. The Contract
Section titled “1. The Contract”All backends implement QueueProtocol. Messages are modeled as BusMessage — an immutable dataclass with headers, TTL, priority, and delivery guarantees:
from typing import Any, Protocol, runtime_checkablefrom lexigram.contracts.queue.protocols import QueueProtocolfrom lexigram.contracts.queue.types import BusMessage, DeliveryGuarantee
@runtime_checkableclass MyQueue(QueueProtocol, Protocol): async def connect(self) -> None: ... async def close(self) -> None: ... async def publish(self, topic: str, message: BusMessage) -> None: ... async def subscribe(self, topic: str, handler: Callable[[BusMessage], Coroutine[Any, Any, None]]) -> None: ... async def health_check(self, timeout: float = 5.0) -> HealthCheckResult: ...Your services depend on the protocol — never on a concrete backend:
graph LR
Svc[Your Service] -- depends on --> P[QueueProtocol]
P -- resolved to --> R[RedisBackend]
P -- or --> K[KafkaBackend]
P -- or --> S[SQSBackend]
P -- or --> M[MemoryBackend]
2. Configuration
Section titled “2. Configuration”Queue backends are declared through NamedQueueConfig. Exactly one should be marked primary: true:
from lexigram import Applicationfrom lexigram.queue import QueueModule
app = Application(name="my-app")queue: backends: - name: "default" driver: "redis" primary: true delivery_guarantee: "at_least_once" max_retries: 3 redis: url: "${REDIS_URL:redis://localhost:6379/0}" max_connections: 10 socket_timeout: 5.0Each driver has its own config block:
queue: backends: - name: "events" driver: "kafka" primary: true kafka: bootstrap_servers: "${KAFKA_BROKERS:localhost:9092}" client_id: "myapp" group_id: "myapp-consumers" auto_offset_reset: "earliest"queue: backends: - name: "orders" driver: "sqs" sqs: region: "${AWS_REGION:us-east-1}" queue_url: "${SQS_QUEUE_URL}" visibility_timeout: 303. Publishing & Subscribing
Section titled “3. Publishing & Subscribing”Inject QueueProtocol and call publish() with a BusMessage:
from lexigram.contracts.queue.protocols import QueueProtocolfrom lexigram.contracts.queue.types import BusMessage, DeliveryGuarantee
class OrderService: def __init__(self, queue: QueueProtocol) -> None: self._queue = queue
async def place_order(self, order_id: str) -> None: message = BusMessage( topic="orders.placed", payload={"order_id": order_id}, delivery_guarantee=DeliveryGuarantee.AT_LEAST_ONCE, ttl=300.0, ) await self._queue.publish("orders.placed", message)Subscribe by registering an async handler:
async def handle_order_placed(message: BusMessage) -> None: print(f"Processing order {message.payload['order_id']}")
await queue.subscribe("orders.placed", handle_order_placed)4. Message Consumers
Section titled “4. Message Consumers”For structured consumers, extend MessageConsumer and declare the topic:
from lexigram.contracts.queue.types import BusMessagefrom lexigram.queue import MessageConsumer
class OrderConsumer(MessageConsumer): topic = "orders.placed"
async def handle(self, message: BusMessage) -> None: order_id = message.payload["order_id"]Scope consumers into your feature module:
from lexigram.di.module import modulefrom lexigram.queue import QueueModule
@module(imports=[ QueueModule.configure(), QueueModule.scope(OrderConsumer, PaymentConsumer),])class OrdersFeatureModule: pass5. Dead-Letter Queue & Outbox
Section titled “5. Dead-Letter Queue & Outbox”Messages that exhaust their retries are routed to the DeadLetterQueue:
from lexigram.queue import DeadLetterQueue
dlq = DeadLetterQueue()dlq.store(message)failed = dlq.drain()For atomic staging of messages alongside database writes, use TransactionalOutbox:
from lexigram.queue import TransactionalOutbox
outbox = TransactionalOutbox(queue)outbox.stage("order.created", BusMessage(topic="order.created", payload=data))await outbox.flush()6. Middleware Pipelines
Section titled “6. Middleware Pipelines”Chain middleware around message handlers with MessagePipeline:
from lexigram.queue import MessagePipeline, MiddlewareBase
class LoggingMiddleware(MiddlewareBase): async def handle(self, message: BusMessage, next: callable) -> None: logger.info("processing", topic=message.topic) await next(message) logger.info("completed", topic=message.topic)
pipeline = MessagePipeline()pipeline.use(LoggingMiddleware())pipeline.set_handler(my_handler)await pipeline.run(message)7. Multiple Backends
Section titled “7. Multiple Backends”Declare multiple named backends, each registered under Annotated[T, Named(name)]:
queue: backends: - name: "events" driver: "kafka" primary: true - name: "notifications" driver: "redis"from typing import Annotatedfrom lexigram.contracts.queue.protocols import QueueProtocolfrom lexigram.di.markers import Named
class NotificationService: def __init__( self, queue: QueueProtocol, notifications: Annotated[QueueProtocol, Named("notifications")], ) -> None: self._queue = queue self._notifications = notifications8. Testing
Section titled “8. Testing”QueueModule.stub() returns an in-memory backend with no external broker:
from lexigram import Applicationfrom lexigram.queue import QueueModulefrom lexigram.contracts.queue.protocols import QueueProtocolfrom lexigram.contracts.queue.types import BusMessage
async def test_publishes_and_consumes() -> None: received = []
async def handler(msg: BusMessage) -> None: received.append(msg)
async with Application.boot(modules=[QueueModule.stub()]) as app: queue = await app.container.resolve(QueueProtocol) await queue.connect() await queue.subscribe("test", handler) await queue.publish("test", BusMessage(payload="hello")) assert len(received) == 1Next Steps
Section titled “Next Steps”- Dependency Injection — binding protocols to implementations
- Providers — how
QueueProviderhooks into application boot - Testing — substituting stubs for infrastructure
lexigram-queuepackage — full config reference, driver options, CLI tools