Skip to content
GitHubDiscord

API Reference

Structural protocol for message queue / bus backends.

Implementations include in-memory, Redis Pub/Sub, RabbitMQ, Kafka, and SQS. connect() / close() manage the connection lifecycle; publish() and subscribe() handle message delivery.

async def connect() -> None

Establish connection to the broker.

async def close() -> None

Close the connection and release resources.

async def publish(
    topic: str,
    message: BusMessage
) -> None

Publish a message to a topic.

Parameters
ParameterTypeDescription
`topic`strDestination topic or queue name.
`message`BusMessageThe message to publish.
Raises
ExceptionDescription
QueueErrorFor expected publish failures (quota, auth, etc.).
async def subscribe(
    topic: str,
    handler: Callable[[BusMessage], Coroutine[Any, Any, None]]
) -> None

Subscribe a handler to a topic.

The handler is called for each message received. The backend is responsible for acknowledging or rejecting messages.

Parameters
ParameterTypeDescription
`topic`strTopic or queue name to subscribe to.
`handler`Callable[[BusMessage], Coroutine[Any, Any, None]]Async callable invoked per message.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check broker connectivity.


A message transported through the queue backend.

id defaults to a new UUID4. timestamp defaults to the current wall-clock time. Both are set via field defaults so callers do not need to supply them.

Attributes: id: Unique message identifier. topic: Destination topic or queue name. payload: Arbitrary message body; must be serialisable by the backend. headers: Opaque string key-value headers passed to the broker. timestamp: Unix epoch timestamp when the message was created. ttl: Time-to-live in seconds; None uses the backend default. priority: Message priority (higher = more urgent); backend-dependent. delivery_guarantee: Delivery semantics for this message. retry_count: Number of times this message has been retried. max_retries: Maximum retries before routing to DLQ.

def is_expired() -> bool

Return True if the message TTL has elapsed.

def should_retry() -> bool

Return True if the message may be retried.


Consumer was registered to a queue.

Consumed by: consumer tracking, lifecycle management, monitoring.


In-memory dead letter queue for storing failed messages.
def __init__(max_size: int = 1000) -> None

Initialize DLQ.

Parameters
ParameterTypeDescription
`max_size`intMaximum number of entries to store.
async def push(
    message: BusMessage,
    error: str
) -> None

Add a failed message to the DLQ.

Parameters
ParameterTypeDescription
`message`BusMessageThe message that failed.
`error`strError description.
async def drain() -> list[DeadLetterEntry]

Remove and return all entries from the DLQ.

Returns
TypeDescription
list[DeadLetterEntry]List of dead letter entries.
property size() -> int

Return current number of entries in DLQ.


Message was successfully consumed from the queue.

Consumed by: message tracking, audit logging, metrics collection.


Payload fired when a message is successfully consumed by a handler.

Attributes: queue_name: Name of the queue or topic the message came from. message_type: Type label of the consumed message.


Base class for message consumer workers.

Subclass this and implement handle() to create a worker that subscribes to a topic and processes incoming messages.

def __init__(queue: QueueProtocol) -> None

Initialize consumer.

Parameters
ParameterTypeDescription
`queue`QueueProtocolQueue protocol instance for subscribing.
async def start() -> None

Start consuming messages from the subscribed topic.

async def stop() -> None

Stop consuming messages.

async def handle(message: BusMessage) -> None

Handle a received message. Subclasses must implement this.

Parameters
ParameterTypeDescription
`message`BusMessageMessage to handle.

Message failed and was moved to dead-letter queue.

Consumed by: error handling, retry management, incident tracking.


Ordered chain of middleware around a terminal handler.
def __init__() -> None

Initialize pipeline.

def add(middleware: MiddlewareBase) -> None

Add middleware to the pipeline.

Parameters
ParameterTypeDescription
`middleware`MiddlewareBaseMiddleware instance to add.
async def execute(
    message: BusMessage,
    handler: Any
) -> None

Execute the middleware chain and terminal handler.

Parameters
ParameterTypeDescription
`message`BusMessageThe message to process.
`handler`AnyTerminal async callable to invoke after all middleware.

Payload fired when a message is published to a queue or topic.

Attributes: queue_name: Name of the queue or topic the message was published to. message_type: Type label of the published message.


Base class for message pipeline middleware.
async def process(
    message: BusMessage,
    next_handler: Any
) -> None

Process a message and pass to the next handler.

Parameters
ParameterTypeDescription
`message`BusMessageThe message to process.
`next_handler`AnyAsync callable to invoke the next middleware or terminal handler.

Payload fired when a queue is fully drained (all pending messages processed).

Attributes: queue_name: Name of the queue that was drained.


Message queue/bus integration with Named DI multi-backend support.

Registers QueueProtocol for constructor injection.

Usage

from lexigram.queue.config import QueueConfig, NamedQueueConfig
from lexigram.queue.module import QueueModule
@module(
imports=[QueueModule.configure(QueueConfig(backends=[...]))]
)
class AppModule(Module):
pass

Scope consumers into a feature module

@module(
imports=[
QueueModule.configure(QueueConfig(...)),
QueueModule.scope(OrderConsumer, PaymentConsumer),
]
)
class OrdersFeatureModule(Module):
pass

Named injection

class MyService:
def __init__(
self,
queue: QueueProtocol, # primary
events: Annotated[QueueProtocol, Named("events")], # named
) -> None: ...
def configure(
    cls,
    config: QueueConfig | Any | None = None
) -> DynamicModule

Create a QueueModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`QueueConfig | Any | NoneQueueConfig or ``None`` to use defaults (reads from environment variables).
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def scope(
    cls,
    *consumers: type,
    **kwargs: Any
) -> DynamicModule

Scope queue consumer classes into a feature module.

Registers the given consumer classes as providers so they are discovered and wired by the queue backend. The parent module graph must already include QueueModule.configure — this does not create a new queue connection.

Uses the anonymous token pattern so both configure() and scope() can coexist in the same compiled graph without a ModuleDuplicateError.

Example

@module(
imports=[
QueueModule.configure(QueueConfig(...)),
QueueModule.scope(OrderConsumer, PaymentConsumer),
]
)
class OrdersFeatureModule(Module):
pass
Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule scoped to this feature.
def stub(
    cls,
    config: QueueConfig | None = None
) -> DynamicModule

Return an in-memory QueueModule for unit testing.

Uses a memory backend so no external broker is required.

Parameters
ParameterTypeDescription
`config`QueueConfig | NoneOptional config override; defaults to empty QueueConfig (which uses memory backend).
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Stage messages within a transaction, flush atomically.
def __init__(queue: QueueProtocol) -> None

Initialize outbox.

Parameters
ParameterTypeDescription
`queue`QueueProtocolQueue protocol to publish messages to.
def stage(
    topic: str,
    message: BusMessage
) -> None

Stage a message for publish within a transaction.

Parameters
ParameterTypeDescription
`topic`strDestination topic.
`message`BusMessageMessage to stage.
async def flush() -> None

Publish all staged messages that haven’t been published yet.

def clear() -> None

Clear all staged entries.


Azure Service Bus queue failure.
def __init__(
    message: str = 'Azure Service Bus error',
    **kwargs: Any
) -> None

GCP Pub/Sub queue failure.
def __init__(
    message: str = 'GCP Pub/Sub error',
    **kwargs: Any
) -> None

Kafka queue failure.
def __init__(
    message: str = 'Kafka error',
    **kwargs: Any
) -> None

Base for expected, recoverable queue / bus failures.

Attributes: backend: Name of the queue backend (e.g. “redis”, “kafka”). topic: Topic or queue name where the failure occurred.

def __init__(
    message: str = 'Queue error',
    *,
    backend: str = 'unknown',
    topic: str = 'unknown',
    **kwargs: Any
) -> None

RabbitMQ queue failure.
def __init__(
    message: str = 'RabbitMQ error',
    **kwargs: Any
) -> None

Redis queue failure.
def __init__(
    message: str = 'Redis queue error',
    **kwargs: Any
) -> None

SQS queue failure.
def __init__(
    message: str = 'SQS error',
    **kwargs: Any
) -> None