API Reference
Protocols
Section titled “Protocols”QueueProtocol
Section titled “QueueProtocol”Structural protocol for message queue / bus backends.
Implementations include in-memory, Redis Pub/Sub, RabbitMQ, Kafka,
and SQS. connect() / close() manage the connection lifecycle;
publish() and subscribe() handle message delivery.
Establish connection to the broker.
Close the connection and release resources.
async def publish( topic: str, message: BusMessage ) -> None
Publish a message to a topic.
| Parameter | Type | Description |
|---|---|---|
| `topic` | str | Destination topic or queue name. |
| `message` | BusMessage | The message to publish. |
| Exception | Description |
|---|---|
| QueueError | For expected publish failures (quota, auth, etc.). |
async def subscribe( topic: str, handler: Callable[[BusMessage], Coroutine[Any, Any, None]] ) -> None
Subscribe a handler to a topic.
The handler is called for each message received. The backend is responsible for acknowledging or rejecting messages.
| Parameter | Type | Description |
|---|---|---|
| `topic` | str | Topic or queue name to subscribe to. |
| `handler` | Callable[[BusMessage], Coroutine[Any, Any, None]] | Async callable invoked per message. |
Check broker connectivity.
Classes
Section titled “Classes”BusMessage
Section titled “BusMessage”A message transported through the queue backend.
id defaults to a new UUID4. timestamp defaults to the current
wall-clock time. Both are set via field defaults so callers do not need
to supply them.
Attributes:
id: Unique message identifier.
topic: Destination topic or queue name.
payload: Arbitrary message body; must be serialisable by the backend.
headers: Opaque string key-value headers passed to the broker.
timestamp: Unix epoch timestamp when the message was created.
ttl: Time-to-live in seconds; None uses the backend default.
priority: Message priority (higher = more urgent); backend-dependent.
delivery_guarantee: Delivery semantics for this message.
retry_count: Number of times this message has been retried.
max_retries: Maximum retries before routing to DLQ.
Return True if the message TTL has elapsed.
Return True if the message may be retried.
ConsumerRegisteredEvent
Section titled “ConsumerRegisteredEvent”Consumer was registered to a queue.
Consumed by: consumer tracking, lifecycle management, monitoring.
DeadLetterQueue
Section titled “DeadLetterQueue”In-memory dead letter queue for storing failed messages.
Initialize DLQ.
| Parameter | Type | Description |
|---|---|---|
| `max_size` | int | Maximum number of entries to store. |
async def push( message: BusMessage, error: str ) -> None
Add a failed message to the DLQ.
| Parameter | Type | Description |
|---|---|---|
| `message` | BusMessage | The message that failed. |
| `error` | str | Error description. |
Remove and return all entries from the DLQ.
| Type | Description |
|---|---|
| list[DeadLetterEntry] | List of dead letter entries. |
Return current number of entries in DLQ.
MessageConsumedEvent
Section titled “MessageConsumedEvent”Message was successfully consumed from the queue.
Consumed by: message tracking, audit logging, metrics collection.
MessageConsumedHook
Section titled “MessageConsumedHook”Payload fired when a message is successfully consumed by a handler.
Attributes: queue_name: Name of the queue or topic the message came from. message_type: Type label of the consumed message.
MessageConsumer
Section titled “MessageConsumer”Base class for message consumer workers.
Subclass this and implement handle() to create a worker that subscribes to a topic and processes incoming messages.
def __init__(queue: QueueProtocol) -> None
Initialize consumer.
| Parameter | Type | Description |
|---|---|---|
| `queue` | QueueProtocol | Queue protocol instance for subscribing. |
Start consuming messages from the subscribed topic.
Stop consuming messages.
async def handle(message: BusMessage) -> None
Handle a received message. Subclasses must implement this.
| Parameter | Type | Description |
|---|---|---|
| `message` | BusMessage | Message to handle. |
MessageDeadLetteredEvent
Section titled “MessageDeadLetteredEvent”Message failed and was moved to dead-letter queue.
Consumed by: error handling, retry management, incident tracking.
MessagePipeline
Section titled “MessagePipeline”Ordered chain of middleware around a terminal handler.
Initialize pipeline.
def add(middleware: MiddlewareBase) -> None
Add middleware to the pipeline.
| Parameter | Type | Description |
|---|---|---|
| `middleware` | MiddlewareBase | Middleware instance to add. |
async def execute( message: BusMessage, handler: Any ) -> None
Execute the middleware chain and terminal handler.
| Parameter | Type | Description |
|---|---|---|
| `message` | BusMessage | The message to process. |
| `handler` | Any | Terminal async callable to invoke after all middleware. |
MessagePublishedHook
Section titled “MessagePublishedHook”Payload fired when a message is published to a queue or topic.
Attributes: queue_name: Name of the queue or topic the message was published to. message_type: Type label of the published message.
MiddlewareBase
Section titled “MiddlewareBase”Base class for message pipeline middleware.
async def process( message: BusMessage, next_handler: Any ) -> None
Process a message and pass to the next handler.
| Parameter | Type | Description |
|---|---|---|
| `message` | BusMessage | The message to process. |
| `next_handler` | Any | Async callable to invoke the next middleware or terminal handler. |
QueueDrainedHook
Section titled “QueueDrainedHook”Payload fired when a queue is fully drained (all pending messages processed).
Attributes: queue_name: Name of the queue that was drained.
QueueModule
Section titled “QueueModule”Message queue/bus integration with Named DI multi-backend support.
Registers QueueProtocol for constructor injection.
Usage
from lexigram.queue.config import QueueConfig, NamedQueueConfigfrom lexigram.queue.module import QueueModule
@module( imports=[QueueModule.configure(QueueConfig(backends=[...]))])class AppModule(Module): passScope consumers into a feature module
@module( imports=[ QueueModule.configure(QueueConfig(...)), QueueModule.scope(OrderConsumer, PaymentConsumer), ])class OrdersFeatureModule(Module): passNamed injection
class MyService: def __init__( self, queue: QueueProtocol, # primary events: Annotated[QueueProtocol, Named("events")], # named ) -> None: ...def configure( cls, config: QueueConfig | Any | None = None ) -> DynamicModule
Create a QueueModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | QueueConfig | Any | None | QueueConfig or ``None`` to use defaults (reads from environment variables). |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def scope( cls, *consumers: type, **kwargs: Any ) -> DynamicModule
Scope queue consumer classes into a feature module.
Registers the given consumer classes as providers so they are discovered and wired by the queue backend. The parent module graph must already include QueueModule.configure — this does not create a new queue connection.
Uses the anonymous token pattern so both configure() and
scope() can coexist in the same compiled graph without a
ModuleDuplicateError.
Example
@module( imports=[ QueueModule.configure(QueueConfig(...)), QueueModule.scope(OrderConsumer, PaymentConsumer), ])class OrdersFeatureModule(Module): pass| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule scoped to this feature. |
def stub( cls, config: QueueConfig | None = None ) -> DynamicModule
Return an in-memory QueueModule for unit testing.
Uses a memory backend so no external broker is required.
| Parameter | Type | Description |
|---|---|---|
| `config` | QueueConfig | None | Optional config override; defaults to empty QueueConfig (which uses memory backend). |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
TransactionalOutbox
Section titled “TransactionalOutbox”Stage messages within a transaction, flush atomically.
def __init__(queue: QueueProtocol) -> None
Initialize outbox.
| Parameter | Type | Description |
|---|---|---|
| `queue` | QueueProtocol | Queue protocol to publish messages to. |
def stage( topic: str, message: BusMessage ) -> None
Stage a message for publish within a transaction.
| Parameter | Type | Description |
|---|---|---|
| `topic` | str | Destination topic. |
| `message` | BusMessage | Message to stage. |
Publish all staged messages that haven’t been published yet.
Clear all staged entries.
Exceptions
Section titled “Exceptions”AzureServiceBusQueueError
Section titled “AzureServiceBusQueueError”Azure Service Bus queue failure.
GCPPubSubQueueError
Section titled “GCPPubSubQueueError”GCP Pub/Sub queue failure.
KafkaQueueError
Section titled “KafkaQueueError”Kafka queue failure.
QueueError
Section titled “QueueError”Base for expected, recoverable queue / bus failures.
Attributes: backend: Name of the queue backend (e.g. “redis”, “kafka”). topic: Topic or queue name where the failure occurred.
RabbitMQQueueError
Section titled “RabbitMQQueueError”RabbitMQ queue failure.
RedisQueueError
Section titled “RedisQueueError”Redis queue failure.
SQSQueueError
Section titled “SQSQueueError”SQS queue failure.