Skip to content
GitHub

API Reference

Protocol for command bus implementations.

Example

class CommandBusProtocol:
async def dispatch(self, command: Command) -> Any:
handler = self._handlers[type(command)]
return await handler.handle(command)
dispatch
async def dispatch(command: Any) -> Any

Dispatch a command to its handler.

Parameters
ParameterTypeDescription
`command`AnyCommand to dispatch.
Returns
TypeDescription
AnyResult from the command handler.

Protocol for publishing domain events.
publish
async def publish(event: Any) -> None

Publish a domain event.

Parameters
ParameterTypeDescription
`event`AnyDomainEvent instance.

Protocol for event bus implementations.

The event bus manages event publication and subscription.

Example

```python
class InMemoryEventBus:
async def publish(self, event: DomainEvent) -> "Result[None, EventError]":
for handler in self._handlers.get(type(event), []):
await handler.handle(event)
return Ok(None)
def subscribe(self, event_type, handler):
self._handlers.setdefault(event_type, []).append(handler)
<div style='padding-left:1rem;border-left:1px solid var(--sl-color-gray-5);margin-top:2rem;margin-bottom:2rem;'>
<div style='border-radius:8px;border:1px solid var(--color-border-weak);overflow:hidden;box-shadow:0 4px 12px rgba(0,0,0,0.35);margin-bottom:1rem;'><div style='background:var(--color-background-weak);border-bottom:1px solid var(--color-border-weak);padding:0 1rem;min-height:36px;display:flex;align-items:center;padding-left:70px;position:relative;'><span style='position:absolute;top:50%;left:16px;transform:translateY(-50%);display:inline-block;width:12px;height:12px;border-radius:50%;background-color:#ff5f56;box-shadow:20px 0 0 #ffbd2e,40px 0 0 #27c93f;'></span><span style='font-family:var(--sl-font-mono);font-size:0.72em;color:var(--color-text-weaker);'>publish</span></div><pre style='margin:0;background:var(--color-background-weak);font-family:var(--sl-font-mono);font-size:0.875em;line-height:1.65;white-space:pre-wrap;word-break:break-all;padding:0.75rem 1rem;'><span style='color: var(--lex-color-keyword)'>async </span><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>publish</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>Any</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></span></pre></div>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L97' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.7rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='12' height='12' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
Publish an event to all subscribers.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Any</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>DomainEvent to publish.</td></tr></tbody></table></div>
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Returns</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:45%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Ok(None) when the event is successfully enqueued for dispatch. Err(EventError) when the event cannot be accepted (e.g.\ no handlers registered and the bus requires at least one).</td></tr></tbody></table></div>
<div style='border-radius:8px;border:1px solid var(--color-border-weak);overflow:hidden;box-shadow:0 4px 12px rgba(0,0,0,0.35);margin-bottom:1rem;'><div style='background:var(--color-background-weak);border-bottom:1px solid var(--color-border-weak);padding:0 1rem;min-height:36px;display:flex;align-items:center;padding-left:70px;position:relative;'><span style='position:absolute;top:50%;left:16px;transform:translateY(-50%);display:inline-block;width:12px;height:12px;border-radius:50%;background-color:#ff5f56;box-shadow:20px 0 0 #ffbd2e,40px 0 0 #27c93f;'></span><span style='font-family:var(--sl-font-mono);font-size:0.72em;color:var(--color-text-weaker);'>subscribe</span></div><pre style='margin:0;background:var(--color-background-weak);font-family:var(--sl-font-mono);font-size:0.875em;line-height:1.65;white-space:pre-wrap;word-break:break-all;padding:0.75rem 1rem;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>subscribe</span><span style='color: var(--lex-color-colon)'>(</span>
<span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span>
<span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'><a href='/packages/events/lexigram-events/api/#eventhandlerprotocol' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>EventHandlerProtocol</a></span>
<span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre></div>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L110' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.7rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='12' height='12' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
Subscribe a handler to an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to subscribe to.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'><a href='/packages/events/lexigram-events/api/#eventhandlerprotocol' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>EventHandlerProtocol</a></td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to call when event is published.</td></tr></tbody></table></div>
<div style='border-radius:8px;border:1px solid var(--color-border-weak);overflow:hidden;box-shadow:0 4px 12px rgba(0,0,0,0.35);margin-bottom:1rem;'><div style='background:var(--color-background-weak);border-bottom:1px solid var(--color-border-weak);padding:0 1rem;min-height:36px;display:flex;align-items:center;padding-left:70px;position:relative;'><span style='position:absolute;top:50%;left:16px;transform:translateY(-50%);display:inline-block;width:12px;height:12px;border-radius:50%;background-color:#ff5f56;box-shadow:20px 0 0 #ffbd2e,40px 0 0 #27c93f;'></span><span style='font-family:var(--sl-font-mono);font-size:0.72em;color:var(--color-text-weaker);'>unsubscribe</span></div><pre style='margin:0;background:var(--color-background-weak);font-family:var(--sl-font-mono);font-size:0.875em;line-height:1.65;white-space:pre-wrap;word-break:break-all;padding:0.75rem 1rem;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>unsubscribe</span><span style='color: var(--lex-color-colon)'>(</span>
<span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span>
<span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'><a href='/packages/events/lexigram-events/api/#eventhandlerprotocol' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>EventHandlerProtocol</a></span>
<span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre></div>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L119' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.7rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='12' height='12' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
Remove a handler subscription for an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to unsubscribe from.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'><a href='/packages/events/lexigram-events/api/#eventhandlerprotocol' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>EventHandlerProtocol</a></td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to remove.</td></tr></tbody></table></div>
</div>
<hr style='border:none;border-top:1px solid rgba(200,255,0,0.2);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventFilterProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-events/src/lexigram/events/protocols.py#L22' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.75rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='14' height='14' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
Predicate that determines whether an event should be dispatched.
<div style='padding-left:1rem;border-left:1px solid var(--sl-color-gray-5);margin-top:2rem;margin-bottom:2rem;'>
<div style='border-radius:8px;border:1px solid var(--color-border-weak);overflow:hidden;box-shadow:0 4px 12px rgba(0,0,0,0.35);margin-bottom:1rem;'><div style='background:var(--color-background-weak);border-bottom:1px solid var(--color-border-weak);padding:0 1rem;min-height:36px;display:flex;align-items:center;padding-left:70px;position:relative;'><span style='position:absolute;top:50%;left:16px;transform:translateY(-50%);display:inline-block;width:12px;height:12px;border-radius:50%;background-color:#ff5f56;box-shadow:20px 0 0 #ffbd2e,40px 0 0 #27c93f;'></span><span style='font-family:var(--sl-font-mono);font-size:0.72em;color:var(--color-text-weaker);'>matches</span></div><pre style='margin:0;background:var(--color-background-weak);font-family:var(--sl-font-mono);font-size:0.875em;line-height:1.65;white-space:pre-wrap;word-break:break-all;padding:0.75rem 1rem;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>matches</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'><a href='/packages/events/lexigram-events/api/#domainevent' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>DomainEvent</a></span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>bool</span></pre></div>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-events/src/lexigram/events/protocols.py#L25' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.7rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='12' height='12' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
</div>
<hr style='border:none;border-top:1px solid rgba(200,255,0,0.2);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventSerializerProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-events/src/lexigram/events/protocols.py#L14' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.75rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='14' height='14' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
Serializes and deserializes domain events for transport.
<div style='padding-left:1rem;border-left:1px solid var(--sl-color-gray-5);margin-top:2rem;margin-bottom:2rem;'>
<div style='border-radius:8px;border:1px solid var(--color-border-weak);overflow:hidden;box-shadow:0 4px 12px rgba(0,0,0,0.35);margin-bottom:1rem;'><div style='background:var(--color-background-weak);border-bottom:1px solid var(--color-border-weak);padding:0 1rem;min-height:36px;display:flex;align-items:center;padding-left:70px;position:relative;'><span style='position:absolute;top:50%;left:16px;transform:translateY(-50%);display:inline-block;width:12px;height:12px;border-radius:50%;background-color:#ff5f56;box-shadow:20px 0 0 #ffbd2e,40px 0 0 #27c93f;'></span><span style='font-family:var(--sl-font-mono);font-size:0.72em;color:var(--color-text-weaker);'>serialize</span></div><pre style='margin:0;background:var(--color-background-weak);font-family:var(--sl-font-mono);font-size:0.875em;line-height:1.65;white-space:pre-wrap;word-break:break-all;padding:0.75rem 1rem;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>serialize</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'><a href='/packages/events/lexigram-events/api/#domainevent' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>DomainEvent</a></span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>bytes</span></pre></div>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-events/src/lexigram/events/protocols.py#L17' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.7rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='12' height='12' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
<div style='border-radius:8px;border:1px solid var(--color-border-weak);overflow:hidden;box-shadow:0 4px 12px rgba(0,0,0,0.35);margin-bottom:1rem;'><div style='background:var(--color-background-weak);border-bottom:1px solid var(--color-border-weak);padding:0 1rem;min-height:36px;display:flex;align-items:center;padding-left:70px;position:relative;'><span style='position:absolute;top:50%;left:16px;transform:translateY(-50%);display:inline-block;width:12px;height:12px;border-radius:50%;background-color:#ff5f56;box-shadow:20px 0 0 #ffbd2e,40px 0 0 #27c93f;'></span><span style='font-family:var(--sl-font-mono);font-size:0.72em;color:var(--color-text-weaker);'>deserialize</span></div><pre style='margin:0;background:var(--color-background-weak);font-family:var(--sl-font-mono);font-size:0.875em;line-height:1.65;white-space:pre-wrap;word-break:break-all;padding:0.75rem 1rem;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>deserialize</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>data</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>bytes</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'><a href='/packages/events/lexigram-events/api/#domainevent' style='color:inherit;text-decoration:underline;text-decoration-color:rgba(128,128,128,0.3);text-underline-offset:2px;'>DomainEvent</a></span></pre></div>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-events/src/lexigram/events/protocols.py#L18' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.7rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='12' height='12' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
</div>
<hr style='border:none;border-top:1px solid rgba(200,255,0,0.2);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventStoreProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display:flex;justify-content:flex-end;margin-top:-0.5rem;margin-bottom:0.5rem;'><a href='https://github.com/lexigram-framework/lexigram/blob/main/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L253' target='_blank' rel='noopener noreferrer' style='display:inline-flex;align-items:center;gap:0.3rem;font-size:0.75rem;color:var(--sl-color-gray-3);text-decoration:none;'><svg viewBox='0 0 16 16' width='14' height='14' fill='currentColor'><path d='M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z'/></svg>source</a></div>
Protocol for event store implementations.
The event store persists and retrieves domain events.
**Example**
```python
class PostgresEventStore:
async def append(self, stream_id, events, expected_version=None):
async with self.db.transaction():
for event in events:
await self.db.insert("events", event.to_dict())
append
async def append(
    stream_id: str,
    events: list[Any],
    expected_version: int | None = None
) -> int

Append events to a stream.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`events`list[Any]List of events to append.
`expected_version`int | NoneExpected stream version for optimistic concurrency.
Returns
TypeDescription
intNew stream version.
Raises
ExceptionDescription
ConcurrencyErrorIf expected version doesn't match.
read
async def read(
    stream_id: str,
    start: int = 0,
    count: int | None = None
) -> list[Any]

Read events from a stream.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`start`intStarting position.
`count`int | NoneMaximum events to read.
Returns
TypeDescription
list[Any]List of events.
read_all
async def read_all(
    position: int = 0,
    count: int | None = None
) -> list[Any]

Read events from all streams.

Parameters
ParameterTypeDescription
`position`intStarting global position.
`count`int | NoneMaximum events to read.
Returns
TypeDescription
list[Any]List of events.

Protocol for handlers that handle multiple event types.
handles
def handles() -> list[type]

Get list of event types handled.

Returns
TypeDescription
list[type]List of event classes.
handle
async def handle(event: Any) -> Result[None, EventError]

Handle an event.

Parameters
ParameterTypeDescription
`event`AnyDomainEvent to handle.
Returns
TypeDescription
Result[None, EventError]``Ok(None)`` on success, ``Err(EventError)`` if handling fails in an expected, recoverable way.

ProjectionProtocol protocol for building read models from events.
apply
def apply(event: Any) -> None

Apply an event to the projection state.

Parameters
ParameterTypeDescription
`event`AnyDomain event to apply.

Protocol for query bus implementations.

Example

class QueryBusProtocol:
async def execute(self, query: Query) -> Any:
handler = self._handlers[type(query)]
return await handler.handle(query)
execute
async def execute(query: Any) -> Any

Execute a query through its handler.

Parameters
ParameterTypeDescription
`query`AnyQuery to execute.
Returns
TypeDescription
AnyQuery result.

Protocol for read-only repository operations.

Use this for query-only access patterns (CQRS query side).

Example

class UserQueryRepository:
async def get(self, id: str) -> User | None:
return await self.db.query("users").where(id=id).first()
async def list(self, skip: int = 0, limit: int = 100) -> list[User]:
return await self.db.query("users").offset(skip).limit(limit).all()
get
async def get(item_id: str) -> T | None

Get entity by ID.

Parameters
ParameterTypeDescription
`item_id`strEntity identifier.
Returns
TypeDescription
T | NoneEntity if found, None otherwise.
list
async def list(
    skip: int = 0,
    limit: int = 100,
    **filters: Any
) -> list[T]

List entities with pagination.

Parameters
ParameterTypeDescription
`skip`intNumber of records to skip.
`limit`intMaximum records to return. **filters: Optional filter criteria.
Returns
TypeDescription
list[T]List of entities.
find_by_spec
async def find_by_spec(spec: SpecificationProtocol[T]) -> list[T]

Find entities matching a complex specification.

Parameters
ParameterTypeDescription
`spec`SpecificationProtocol[T]The DDD specification to evaluate.
Returns
TypeDescription
list[T]List of matching entities.
count
async def count(**filters: Any) -> int

Count entities matching filters.

Parameters
ParameterTypeDescription
Returns
TypeDescription
intTotal count of matching entities.

Protocol for full repository operations.

Extends ReadOnlyRepositoryProtocol with write operations.

Example

class UserRepository:
async def save(self, entity: User) -> User:
if entity.id:
await self.db.update("users", entity.dict()).where(id=entity.id)
else:
entity.id = await self.db.insert("users", entity.dict())
return entity
async def delete(self, id: str) -> bool:
result = await self.db.delete("users").where(id=id)
return result.affected_rows > 0
save
async def save(entity: T) -> T

Save (create or update) an entity.

Parameters
ParameterTypeDescription
`entity`TEntity to save.
Returns
TypeDescription
TSaved entity with any generated fields populated.
delete
async def delete(item_id: str) -> bool

Delete entity by ID.

Parameters
ParameterTypeDescription
`item_id`strEntity identifier.
Returns
TypeDescription
boolTrue if deleted, False if not found.
save_many
async def save_many(entities: list[T]) -> list[T]

Save (create or update) multiple entities in a single operation.

Implementations SHOULD execute this as a batch for efficiency.

Parameters
ParameterTypeDescription
`entities`list[T]Entities to save.
Returns
TypeDescription
list[T]Saved entities with any generated fields populated.
delete_many
async def delete_many(item_ids: list[str]) -> int

Delete multiple entities by ID.

Implementations SHOULD execute this as a batch for efficiency.

Parameters
ParameterTypeDescription
`item_ids`list[str]Entity identifiers to delete.
Returns
TypeDescription
intNumber of entities actually deleted.

Protocol for saga lifecycle management.

The manager routes incoming events to all registered sagas and coordinates their execution.

process
async def process(event: Any) -> None

Process an event through all relevant sagas.

Parameters
ParameterTypeDescription
`event`AnyDomain event to route and process.

Protocol for saga / process-manager implementations.

Sagas coordinate long-running business processes that span multiple aggregates by reacting to domain events and dispatching commands.

handle
async def handle(event: Any) -> list[Any]

Handle a domain event and produce commands.

Parameters
ParameterTypeDescription
`event`AnyDomain event to handle.
Returns
TypeDescription
list[Any]List of commands to dispatch to the command bus.

Protocol for aggregate snapshot storage.
save
async def save(
    aggregate_id: str,
    snapshot: Any,
    version: int
) -> None

Save an aggregate snapshot.

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate identifier.
`snapshot`AnyAggregate state snapshot.
`version`intAggregate version at snapshot.
load
async def load(aggregate_id: str) -> tuple[Any, int] | None

Load the latest snapshot.

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate identifier.
Returns
TypeDescription
tuple[Any, int] | NoneTuple of (snapshot, version) or None if not found.

Abstract base class for event storage.

Inherits stream/cursor/replay helpers from EventStreamMixin. Snapshot storage is provided by AbstractSnapshotStore.

Example

# Append events
await store.append(
stream_id="order-123",
events=[OrderCreated(...), ItemAdded(...)],
expected_version=0
)
# Read events
events = await store.read("order-123")
# Append events
await store.append(
stream_id="order-123",
events=[OrderCreated(...), ItemAdded(...)],
expected_version=0
)
# Read events
events = await store.read("order-123")
__init__
def __init__(schema_evolution: SchemaEvolution | None = None) -> None

Initialise event store, optionally with schema evolution (M-03).

append
async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to a stream with optimistic concurrency.

Returns
TypeDescription
intNew stream version.
Parameters
ParameterTypeDescription
`stream_id`strUnique identifier for the event stream
`events`list[Event]List of events to save
`expected_version`int | NoneExpected current version for concurrency control
Raises
ExceptionDescription
ConcurrencyErrorIf version conflict occurs
EventPersistenceErrorIf save operation fails
read
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream (M-15).

Parameters
ParameterTypeDescription
`stream_id`strStream to load events from
`from_version`intStarting version (inclusive)
`to_version`int | NoneEnding version (inclusive)
`limit`int | NoneMax number of events to read
Returns
TypeDescription
list[Event]List of events in chronological order
Raises
ExceptionDescription
EventLoadErrorIf load operation fails
get_stream_version
async def get_stream_version(stream_id: str) -> int

Get the current version of a stream.

Parameters
ParameterTypeDescription
`stream_id`strStream to check
Returns
TypeDescription
intCurrent version (0 if stream doesn't exist)
stream_exists
async def stream_exists(stream_id: str) -> bool

Check if a stream exists.

Parameters
ParameterTypeDescription
`stream_id`strStream to check
Returns
TypeDescription
boolTrue if stream has events
get_stream_info
async def get_stream_info(stream_id: str) -> StreamInfo | None

Get information about a stream.

Parameters
ParameterTypeDescription
`stream_id`strStream to get info for
Returns
TypeDescription
StreamInfo | NoneStreamInfo or None if stream doesn't exist
stream_all
def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events from the store in global order.

Parameters
ParameterTypeDescription
`from_position`intStarting global sequence number.
`batch_size`intNumber of events to fetch per batch.
`partition`int | NoneOptional partition index (0 to total_partitions - 1).
`total_partitions`int | NoneTotal number of partitions for sharding.
compact
async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for a stream up to (and including) the given version (MF-04).

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
`up_to_version`intMaximum version to delete.
Returns
TypeDescription
intNumber of events purged.
find_by_aggregate
async def find_by_aggregate(aggregate_id: str) -> list[Event]

Get all events for an aggregate (for reconstitution).

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate identifier (same as stream_id).
Returns
TypeDescription
list[Event]All events for the aggregate in chronological order.
close
async def close() -> None

Close the event store and release resources.


Abstract base for middleware components.

Middleware wraps around handlers to add cross-cutting concerns like logging, validation, transactions, etc.

Example

class TimingMiddleware(AbstractMiddleware[Message, Any]):
async def __call__(
self,
message: Message,
next_handler: NextHandler
) -> Any:
start = time.time()
result = await next_handler(message)
duration = time.time() - start
logger.info("Handler took %.3fs", duration)
return result

Read-only repository interface.

Useful for query-side repositories in CQRS.

get
async def get(aggregate_id: UUID | str) -> TAggregate | None

Get an aggregate by ID.

exists
async def exists(aggregate_id: UUID | str) -> bool

Check if an aggregate exists.

get_all
async def get_all(
    limit: int | None = None,
    offset: int | None = None
) -> list[TAggregate]

Get all aggregates with pagination.


Abstract base repository for aggregate persistence.

Repositories provide collection-like access to aggregates, abstracting away the persistence mechanism.

Example

async def save(self, aggregate: Order) -> None:
...
```python
get
async def get(aggregate_id: UUID | str) -> TAggregate | None

Get an aggregate by ID.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
TAggregate | NoneThe aggregate if found, None otherwise
save
async def save(aggregate: TAggregate) -> None

Save an aggregate.

This persists any pending changes and clears the pending events.

Parameters
ParameterTypeDescription
`aggregate`TAggregateThe aggregate to save
exists
async def exists(aggregate_id: UUID | str) -> bool

Check if an aggregate exists.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
boolTrue if the aggregate exists

Abstract base class for snapshot storage.

Provides optimised aggregate loading by storing periodic snapshots. Avoids replaying the entire event history for aggregates with many events.

Strategy: 1. Load the latest snapshot (if any). 2. Load events after the snapshot version. 3. Apply those events to the snapshot state.

This reduces loading from O(N) to O(1) + O(events since snapshot).

Example (M-14 — uses Snapshot object throughout)

from lexigram.events.types import Snapshot
# Save a snapshot
await store.save(Snapshot(
aggregate_id=order.id,
aggregate_type="Order",
version=order.version,
state=order.to_dict()
))
# Load the latest snapshot
snapshot = await store.get_latest("order-123")
if snapshot:
order = Order.from_snapshot(snapshot.state)
events = await event_store.read("order-123", start=snapshot.version)
from lexigram.events.types import Snapshot
# Save a snapshot
await store.save(Snapshot(
aggregate_id=order.id,
aggregate_type="Order",
version=order.version,
state=order.to_dict()
))
# Load the latest snapshot
snapshot = await store.get_latest("order-123")
if snapshot:
order = Order.from_snapshot(snapshot.state)
events = await event_store.read("order-123", start=snapshot.version)
save_snapshot
async def save_snapshot(snapshot: Snapshot) -> None

Save an aggregate snapshot.

Parameters
ParameterTypeDescription
`snapshot`SnapshotSnapshot object containing aggregate_id, version, and state.
get_latest
async def get_latest(aggregate_id: str) -> Snapshot | None

Load the latest snapshot for an aggregate.

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate to load snapshot for.
Returns
TypeDescription
Snapshot | NoneSnapshot object or None if no snapshot exists.
get_by_version
async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get a specific snapshot by version.

Parameters
ParameterTypeDescription
`aggregate_id`str | UUIDAggregate identifier.
`version`intExact version to retrieve.
Returns
TypeDescription
Snapshot | NoneSnapshot at that version, or None.
delete_old_snapshots
async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

Parameters
ParameterTypeDescription
`aggregate_id`str | UUIDID of the aggregate.
`keep_count`intNumber of snapshots to retain (default: 3).
Returns
TypeDescription
intNumber of snapshots deleted.
close
async def close() -> None

Close the snapshot store and release resources.


Factory for creating aggregates.
create
def create(aggregate_id: UUID | str) -> TAggregate_co

Create a new aggregate instance.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strAggregate ID.
Returns
TypeDescription
TAggregate_coNew aggregate instance.

Base class for event-sourced aggregates.

Aggregates:

  • Maintain consistency boundaries
  • Generate domain events on state changes
  • Can be reconstructed from events (event sourcing)
  • Support snapshotting for performance

Example

class Order(AggregateRoot):
status: str = "draft"
items: list[OrderItem] = Field(default_factory=list)
total: Decimal = Decimal("0.00")
def add_item(self, product_id: str, quantity: int, price: Decimal):
if self.status != "draft":
raise InvalidOperationError("Cannot modify non-draft order")
self._apply(ItemAddedEvent(
order_id=self.id,
product_id=product_id,
quantity=quantity,
price=price
))
def _handle_item_added(self, event: ItemAddedEvent):
self.items.append(OrderItem(
product_id=event.product_id,
quantity=event.quantity,
price=event.price
))
self.total += event.price * event.quantity
__init__
def __init__(
    id: UUID | None = None,
    **kwargs: Any
) -> None

Initialise the aggregate, auto-generating a UUID id if none is given.

add_event
def add_event(event: Any) -> None

Buffer a domain event for persistence, calling invariant checks first.

pull_events
def pull_events() -> list[Any]

Return a copy of pending events without clearing them.

clear_events
def clear_events() -> None

Discard all buffered events.

has_uncommitted_events
property has_uncommitted_events() -> bool

True when there are buffered events awaiting persistence.

get_pending_events
def get_pending_events() -> list[Event]

Get events pending persistence.

Returns
TypeDescription
list[Event]List of uncommitted events
clear_pending_events
def clear_pending_events() -> None

Clear pending events after persistence.

load_from_history
def load_from_history(events: list[Event]) -> None

Reconstruct aggregate state from event history.

This is used during aggregate loading to replay all events and rebuild the current state.

Parameters
ParameterTypeDescription
`events`list[Event]Historical events to replay
load_from_snapshot
def load_from_snapshot(snapshot: Snapshot) -> None

Restore aggregate state from a snapshot.

Parameters
ParameterTypeDescription
`snapshot`SnapshotThe snapshot to restore from
create_snapshot
def create_snapshot() -> Snapshot

Create a snapshot of current aggregate state.

Returns
TypeDescription
SnapshotSnapshot instance

Status of an aggregate root.

Checkpoint for tracking projection progress.

Command extended with aggregate-targeting helpers.
target_aggregate_id
property target_aggregate_id() -> UUID | None
expected_version
property expected_version() -> int | None
for_aggregate
def for_aggregate(
    aggregate_id: UUID,
    version: int | None = None
) -> Command

Target a specific aggregate.


Configuration for command bus.

Command bus for dispatching commands.

Handlers are registered explicitly via register() or through HandlerRegistry.register_with_buses() during provider registration.

Example

# Commands are automatically routed to handlers
result = await bus.dispatch(CreateOrderCommand(
customer_id="cust-123",
items=[...]
))
# Commands are automatically routed to handlers
result = await bus.dispatch(CreateOrderCommand(
customer_id="cust-123",
items=[...]
))
__init__
def __init__(
    middlewares: list[Any] | None = None,
    config: Any | None = None
) -> None

Initialize the command bus.

dispatch
async def dispatch(command: Command) -> Any

Dispatch a command for execution.

Parameters
ParameterTypeDescription
`command`CommandThe command to execute.
Returns
TypeDescription
AnyCommand execution result.
Raises
ExceptionDescription
HandlerNotFoundErrorIf no handler is registered.
CommandExecutionErrorIf command execution fails.

Base class for command handlers.

Command handlers process commands and optionally return results. Each command type should have exactly one handler.

Example

class CreateOrderHandler(CommandHandlerProtocol[CreateOrderCommand, OrderId]):
def __init__(self, repository: OrderRepository):
self.repository = repository
async def handle(self, command: CreateOrderCommand) -> OrderId:
order = Order.create(
customer_id=command.customer_id,
items=command.items
)
await self.repository.save(order)
return order.id
handle
async def handle(command: TCommand) -> TResult

Handle a command.

Parameters
ParameterTypeDescription
`command`TCommandThe command to handle
Returns
TypeDescription
TResultCommand execution result

Result of command execution.

Result of an event dispatch operation.

Attributes: success: True if all handlers completed successfully. handler_results: List of return values from each handler. errors: List of (exception_type, exception_instance) tuples.

raise_if_errors
def raise_if_errors() -> None

Raise if any handler failed. Call explicitly when needed.

has_errors
property has_errors() -> bool

Return True if any handler failed.


Base class for domain events.
__init__
def __init__(**kwargs: Any) -> None

Initialize the event setting base fields and any extra subclass fields.

Declared dataclass fields receive their default values when not supplied. Extra keyword arguments (e.g. fields declared on plain subclasses without @dataclass) are set directly on the instance via object.__setattr__, which bypasses the frozen=True guard safely during construction.

to_dict
def to_dict() -> dict[str, Any]

Return a JSON-serializable dictionary representation.

from_dict
def from_dict(
    cls,
    data: dict[str, Any]
) -> DomainEvent
for_aggregate
def for_aggregate(
    aggregate_id: UUID,
    aggregate_type: str
) -> DomainEvent

Base class for entities within aggregates.

Entities:

  • Have identity (id field)
  • Are mutable
  • Belong to an aggregate root
  • Don’t have their own event streams
touch
def touch() -> None

Update the modified timestamp.


Represents a fact that happened in the past.

We override __init__ to accept arbitrary extra keyword arguments, which are attached as attributes on the instance. This preserves the historical ability to create Event(data=...) without declaring a new subclass. event_type handling is performed by the contract base class (which now includes actor_id).

The event version is an aggregate-specific concept and therefore is declared on this subclass rather than the canonical DomainEvent base class. with_version and related helpers operate on this field.

__init__
def __init__(
    *args: Any,
    **kwargs: Any
) -> None
with_version
def with_version(version: int) -> Event

Create a copy with a specific version (aggregate version).

for_aggregate
def for_aggregate(
    aggregate_id: UUID,
    aggregate_type: str
) -> Event

Create a copy for a specific aggregate.


Configuration for event bus.

Event bus for publishing domain events.

Orchestrates subscription management, per-event-type BoundedChannel backpressure (MAJ-11), and background drain tasks. All handler dispatch logic (lookup, parallel/sequential dispatch, retry) is provided by HandlerDispatchMixin.

Attributes: DEFAULT_MAX_CONCURRENT_HANDLERS: Maximum concurrent handler executions. DEFAULT_HANDLER_TIMEOUT_SECONDS: Timeout for each handler execution. DEFAULT_RETRY_FAILED_HANDLERS: Whether to retry failed handlers. DEFAULT_MAX_HANDLER_RETRIES: Maximum retry attempts per handler. DEFAULT_ENABLE_DEAD_LETTER: Whether to enable dead letter queue. DEFAULT_ALLOW_NO_HANDLERS: Whether to allow events with no handlers. DEFAULT_PARALLEL_DISPATCH: Whether to dispatch handlers in parallel. DEFAULT_CONTINUE_ON_ERROR: Whether to continue on handler errors.

__init__
def __init__(
    middlewares: list[MiddlewareFunc] | None = None,
    config: EventBusConfig | None = None,
    parallel: ParallelProtocol | None = None,
    tracer: TracerProtocol | None = None,
    hooks: HookRegistryProtocol | None = None
) -> None

Initialize the event bus.

Parameters
ParameterTypeDescription
`middlewares`list[MiddlewareFunc] | NoneList of middleware functions.
`config`EventBusConfig | NoneEvent bus configuration.
`parallel`ParallelProtocol | NoneParallel execution protocol for concurrent handler execution.
`tracer`TracerProtocol | NoneOptional tracer for distributed tracing.
subscribe
def subscribe(
    event_type: type[Event],
    handler: Any
) -> None

Subscribe a handler to an event type.

Parameters
ParameterTypeDescription
`event_type`type[Event]The event class to subscribe to.
`handler`AnyHandler to call when event is published.
set_tracer
def set_tracer(tracer: TracerProtocol | None) -> None

Attach an optional tracer after provider boot wiring.

Parameters
ParameterTypeDescription
`tracer`TracerProtocol | NoneTracer to attach, or None to clear.
set_hook_registry
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None

Attach an optional hook registry after provider boot wiring.

handler
def handler(event_type: type[Event]) -> Callable[[Callable], Callable]

Decorator to subscribe a handler to an event type.

subscribe_all
def subscribe_all(handler: Any) -> None

Subscribe a handler to all events.

Parameters
ParameterTypeDescription
`handler`AnyHandler to call for every published event.
unsubscribe
def unsubscribe(
    event_type: type[Event],
    handler: Any
) -> bool

Unsubscribe a handler from an event type.

Parameters
ParameterTypeDescription
`event_type`type[Event]The event class.
`handler`AnyHandler to remove.
Returns
TypeDescription
boolTrue if handler was found and removed.
publish
async def publish(event: Event) -> Result[None, EventError]

Publish an event to all subscribers via a bounded channel.

Enqueues the event into a per-event-type BoundedChannel; a background drain task dequeues and dispatches it to all handlers. Blocks when the channel is at capacity providing natural backpressure (MAJ-11).

Parameters
ParameterTypeDescription
`event`EventThe event to publish.
Returns
TypeDescription
Result[None, EventError]Ok(None) on successful enqueue. Err(EventHandlerError) when no handlers are registered and allow_no_handlers is False.
flush
async def flush() -> None

Wait until all pending events in every channel have been dispatched.

Yields control repeatedly until every BoundedChannel is empty and all in-progress handler dispatches have completed. Useful in tests.


Wrapper for events with metadata for storage and transmission.

Payload fired after an event handler successfully processes an event.

Attributes: event_type: Fully-qualified type name of the handled event. handler: Qualified name of the handler class or function.


Base class for event handlers.

Event handlers process events and perform side effects. Unlike command handlers, multiple handlers can subscribe to the same event type.

Example

class SendOrderConfirmationHandler(EventHandlerProtocol[OrderCreatedEvent]):
def __init__(self, email_service: EmailService):
self.email_service = email_service
async def handle(self, event: OrderCreatedEvent) -> None:
await self.email_service.send_confirmation(
order_id=event.order_id,
customer_email=event.customer_email
)
handle
async def handle(event: TEvent) -> Result[None, EventError]

Handle an event.

Parameters
ParameterTypeDescription
`event`TEventThe event to handle.
Returns
TypeDescription
Result[None, EventError]``Ok(None)`` on success, ``Err(EventError)`` if handling fails in an expected, recoverable way.

Emitted when the event bus successfully publishes a domain event.

Attributes: source_event_type: Fully-qualified type name of the published event. handler_count: Number of registered handlers that were notified.


Payload fired when an event is published onto the event bus.

Attributes: event_type: Fully-qualified type name of the published event. aggregate_id: Identifier of the aggregate that emitted the event, if known.


AbstractRepository using event sourcing with snapshot acceleration.

This repository:

  1. Loads aggregates using snapshots when available (O(1) vs O(N))
  2. Persists changes as events
  3. Automatically creates snapshots based on policy
  4. Publishes events to the event bus

Example

# Create repository with snapshot support
repository = EventSourcingRepository(
aggregate_type=Order,
event_store=event_store,
snapshot_store=snapshot_store,
event_bus=event_bus,
snapshot_policy=EventCountPolicy(100)
)
# Load aggregate - uses snapshot if available
order = await repository.get(order_id)
# Modify aggregate
order.add_item(product_id, quantity, price)
# Save - stores events, maybe creates snapshot, publishes events
await repository.save(order)

Performance comparison: Without snapshots (1000 events): ~100ms to load With snapshots (1000 events): ~1ms to load (snapshot + ~0 events)

__init__
def __init__(
    aggregate_type: type[TAggregate],
    event_store: AbstractEventStore,
    snapshot_store: AbstractSnapshotStore | None = None,
    event_bus: EventBusImpl | None = None,
    snapshot_policy: SnapshotPolicy | None = None,
    snapshot_every: int = 100
)

Initialize the event sourcing repository.

Parameters
ParameterTypeDescription
`aggregate_type`type[TAggregate]The aggregate class to manage
`event_store`AbstractEventStoreEvent store for event persistence
`snapshot_store`AbstractSnapshotStore | NoneSnapshot store (optional, enables O(1) loading)
`event_bus`EventBusImpl | NoneEvent bus for publishing events (optional)
`snapshot_policy`SnapshotPolicy | NoneCustom snapshot policy (default: every N events)
`snapshot_every`intEvents between snapshots (default: 100)
get
async def get(aggregate_id: UUID | str) -> TAggregate | None

Load an aggregate by ID using snapshot acceleration.

This method:

  1. Tries to load the latest snapshot
  2. Loads only events after the snapshot version
  3. Reconstructs the aggregate
Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
TAggregate | NoneThe aggregate if found, None otherwise
save
async def save(aggregate: TAggregate) -> None

Save an aggregate’s pending events.

This method:

  1. Persists pending events to the event store
  2. Creates a snapshot if policy triggers
  3. Publishes events to the event bus
  4. Clears pending events from the aggregate
Parameters
ParameterTypeDescription
`aggregate`TAggregateThe aggregate to save
Raises
ExceptionDescription
ConcurrencyErrorIf version conflict detected
exists
async def exists(aggregate_id: UUID | str) -> bool

Check if an aggregate exists.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
boolTrue if the aggregate has any events
get_or_raise
async def get_or_raise(aggregate_id: UUID | str) -> TAggregate

Get an aggregate or raise an error if not found.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
TAggregateThe aggregate
Raises
ExceptionDescription
AggregateNotFoundErrorIf aggregate not found
force_snapshot
async def force_snapshot(aggregate: TAggregate) -> None

Force creation of a snapshot for an aggregate.

Useful for bulk operations or maintenance.

Parameters
ParameterTypeDescription
`aggregate`TAggregateThe aggregate to snapshot

Supported event store backends.

Payload fired when an event is persisted to the event store.

Attributes: event_type: Fully-qualified type name of the stored event. stream_id: Identifier of the event stream the event was written to.


Top-level Events configuration.

This consolidated class combines all event system configuration. No TYPE_CHECKING guards or cast() wrappers — all classes defined in this module, so runtime type resolution and dict→model coercion work.

is_production
property is_production() -> bool

Check if running in production environment.

is_development
property is_development() -> bool

Check if running in development environment.

is_test
property is_test() -> bool

Check if running in test environment.

validate_backend_config
def validate_backend_config() -> list[str]

Validate that the selected backend has a matching config.

Returns a list of validation error messages. Empty list means valid.


CQRS + Event Sourcing: CommandBusProtocol, QueryBusProtocol, EventBusProtocol, EventStoreProtocol, sagas.

Call configure to configure the events subsystem.

Usage

from lexigram.events.config import EventsConfig
from lexigram.events.types import EventStoreBackend
@module(
imports=[
EventsModule.configure(
EventsConfig(event_store_backend=EventStoreBackend.MEMORY)
)
]
)
class AppModule(Module):
pass
from lexigram.events.config import EventsConfig
from lexigram.events.types import EventStoreBackend
@module(
imports=[
EventsModule.configure(
EventsConfig(event_store_backend=EventStoreBackend.MEMORY)
)
]
)
class AppModule(Module):
pass
configure
def configure(
    cls,
    config: Any | None = None,
    handler_modules: list[str] | None = None
) -> DynamicModule

Create an EventsModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneEventsConfig or ``None`` for framework defaults (in-memory event store).
`handler_modules`list[str] | NonePython module paths to auto-discover event/command handlers from (e.g. ``["myapp.handlers"]``).
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
scope
def scope(
    cls,
    *handlers: type,
    **kwargs: Any
) -> DynamicModule

Scope event/command/query handler classes into a feature module.

Registers the given handler classes as providers so they are resolved by the bus implementations. The parent module graph must already include EventsModule.configure — this does not create a new event store or bus.

Uses the anonymous token pattern so both configure() and scope() can coexist in the same compiled graph without a ModuleDuplicateError.

Example

@module(
imports=[
EventsModule.configure(config),
EventsModule.scope(
CreateOrderHandler,
GetOrderHandler,
OrderShippedHandler,
),
]
)
class OrderFeatureModule(Module):
pass
@module(
imports=[
EventsModule.configure(config),
EventsModule.scope(
CreateOrderHandler,
GetOrderHandler,
OrderShippedHandler,
),
]
)
class OrderFeatureModule(Module):
pass
Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule scoped to this feature.
stub
def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return an in-memory EventsModule for unit testing.

Registers an in-memory event store with no external broker connections. Suitable for testing event handlers and CQRS logic.

Returns
TypeDescription
DynamicModuleA DynamicModule backed by in-memory event storage.

Thin orchestrator that coordinates Events sub-providers (M-01).

Delegates all setup, lifecycle, and container-registration work to:

  • StoreSubProvider
  • BusSubProvider
  • HandlerSubProvider
  • ManagerSubProvider

Public API is unchanged from the monolithic version.

__init__
def __init__(
    config: EventsConfig | dict[str, Any] | None = None,
    handler_modules: list[str] | None = None
) -> None
from_config
def from_config(
    cls,
    config: EventsConfig,
    **context: Any
) -> EventsProvider

Create EventsProvider from config.

config
property config() -> EventsConfig
config
def config(value: EventsConfig) -> None

Allow orchestrator to auto-inject config from LexigramConfig.

event_store
property event_store() -> Any
snapshot_manager
property snapshot_manager() -> Any
command_bus
property command_bus() -> Any
query_bus
property query_bus() -> Any
event_bus
property event_bus() -> Any
projection_manager
property projection_manager() -> Any
saga_manager
property saga_manager() -> Any
register
async def register(container: ContainerRegistrarProtocol) -> None

Register all events components with the DI container.

boot
async def boot(container: ContainerResolverProtocol) -> None

Start buses, managers, and wire optional tracer.

shutdown
async def shutdown() -> None

Shutdown all components.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Aggregate health across stores and buses.


Information about a registered handler.

Registry for handler discovery and registration.

The registry provides:

  • Auto-discovery of handlers from modules
  • Registration with command, query, and event buses
  • Tracking of handler-message relationships

Example

# Create registry
registry = HandlerRegistry()
# Discover handlers from modules
registry.discover("app.handlers")
# Register with buses
registry.register_with_buses(
command_bus=command_bus,
query_bus=query_bus,
event_bus=event_bus
)
# Or use DI container for handler instantiation
registry.register_with_container(container)
__init__
def __init__() -> None

Initialize the handler registry.

load_decorators
def load_decorators() -> None

Load handlers from the global decorator registry (M-17).

Call this explicitly to register handlers that were annotated with @command_handler, @event_handler, etc.

register_command_handler
def register_command_handler(
    command_type: type,
    handler_type: Any
) -> None

Register a command handler.

Parameters
ParameterTypeDescription
`command_type`typeThe command class
`handler_type`AnyThe handler class
register_query_handler
def register_query_handler(
    query_type: type,
    handler_type: Any
) -> None

Register a query handler.

Parameters
ParameterTypeDescription
`query_type`typeThe query class
`handler_type`AnyThe handler class
register_event_handler
def register_event_handler(
    event_type: type,
    handler_type: Any
) -> None

Register an event handler.

Parameters
ParameterTypeDescription
`event_type`typeThe event class
`handler_type`AnyThe handler class
discover
def discover(
    module_path: str,
    recursive: bool = True
) -> int

Discover handlers from a module or package.

Scans the module for classes that inherit from handler base classes and registers them automatically.

Parameters
ParameterTypeDescription
`module_path`strPython module path (e.g., "app.handlers")
`recursive`boolWhether to scan submodules
Returns
TypeDescription
intNumber of handlers discovered
register_with_buses
def register_with_buses(
    command_bus: CommandBusProtocol | None = None,
    query_bus: QueryBusProtocol | None = None,
    event_bus: EventBusProtocol | None = None,
    handler_factory: Callable[[type], Any] | None = None
) -> None

Register all discovered handlers with buses.

Parameters
ParameterTypeDescription
`command_bus`CommandBusProtocol | NoneCommand bus to register with
`query_bus`QueryBusProtocol | NoneQuery bus to register with
`event_bus`EventBusProtocol | NoneEvent bus to register with
`handler_factory`Callable[[type], Any] | NoneFactory function to instantiate handlers
get_command_handlers
def get_command_handlers() -> dict[type, type[CommandHandlerProtocol[Any, Any]]]

Get all registered command handlers.

get_query_handlers
def get_query_handlers() -> dict[type, type[QueryHandlerProtocol[Any, Any]]]

Get all registered query handlers.

get_event_handlers
def get_event_handlers() -> dict[type, list[type[EventHandlerProtocol[Any]]]]

Get all registered event handlers.

with_defaults
def with_defaults(cls) -> HandlerRegistry

Create a registry with no pre-registered handlers.

Returns a fresh instance ready for handler registration via the @event_handler decorator or register() method.

Returns
TypeDescription
HandlerRegistryA new HandlerRegistry instance.

Idempotent command extended with aggregate-targeting helpers.

In-memory event store for testing and development.

M-19: Uses a global log and type index for O(n) and O(matching) streaming performance.

Example

store = InMemoryEventStore()
# Save events
await store.append(
stream_id="order-123",
events=[OrderCreated(...)],
expected_version=0
)
# Load events
events = await store.read("order-123")
store = InMemoryEventStore()
# Save events
await store.append(
stream_id="order-123",
events=[OrderCreated(...)],
expected_version=0
)
# Load events
events = await store.read("order-123")
__init__
def __init__(
    schema_evolution: SchemaEvolution | None = None,
    max_events_per_stream: int = 10000
) -> None

Initialize the in-memory store.

Parameters
ParameterTypeDescription
`schema_evolution`SchemaEvolution | NoneOptional schema evolution handler.
`max_events_per_stream`intMaximum events retained per stream. Oldest events are evicted when the limit is exceeded. Defaults to 10,000 — set to 0 for unbounded.
append
async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to an in-memory stream with optimistic concurrency.

Returns
TypeDescription
intNew stream version.
read
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream (M-15).

get_stream_version
async def get_stream_version(stream_id: str) -> int

Get the current version of a stream.

get_stream_ids
async def get_stream_ids() -> list[str]

Get all stream IDs (for testing/debugging).

compact
async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for a stream up to (and including) the given version (MF-04).

stream_all
async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events across all streams.

M-19: O(n) from position using _global_log — no merging required.

stream_by_type
async def stream_by_type(
    event_types: list[str],
    from_position: int = 0
) -> AsyncIterator[Event]

Stream events filtered by type.

M-19: O(matching_events) not O(total_events) using type index.

get_all_streams
def get_all_streams() -> list[str]

Get all stream IDs (for testing/debugging).

get_global_position
def get_global_position() -> int

Get current global position.

clear
def clear() -> None

Clear all events (for testing).

close
async def close() -> None

Close the in-memory event store (no-op).


Configuration for in-memory event store.

In-memory snapshot store for testing and development.

M-14: Uses Snapshot objects throughout (save_snapshot / get_latest / get_by_version). The legacy tuple-based save/load methods are provided by the AbstractSnapshotStore base class via backward-compat shims.

Example

from lexigram.events.types import Snapshot
store = InMemorySnapshotStore()
# Save snapshot
await store.save_snapshot(Snapshot(
aggregate_id="order-123",
aggregate_type="Order",
version=100,
state={"status": "completed", "total": 150.00}
))
# Load latest snapshot
snapshot = await store.get_latest("order-123")
from lexigram.events.types import Snapshot
store = InMemorySnapshotStore()
# Save snapshot
await store.save_snapshot(Snapshot(
aggregate_id="order-123",
aggregate_type="Order",
version=100,
state={"status": "completed", "total": 150.00}
))
# Load latest snapshot
snapshot = await store.get_latest("order-123")
__init__
def __init__(max_snapshots_per_aggregate: int = 5) -> None

Initialize the in-memory snapshot store.

Parameters
ParameterTypeDescription
`max_snapshots_per_aggregate`intMaximum snapshots to keep per aggregate
save_snapshot
async def save_snapshot(snapshot: Snapshot) -> None

Save a snapshot (M-14: accepts Snapshot object).

get_latest
async def get_latest(aggregate_id: str) -> Snapshot | None

Load the latest snapshot for an aggregate.

get_by_version
async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get the snapshot for a specific version.

delete_old_snapshots
async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

clear
def clear() -> None

Clear all snapshots (for testing).

close
async def close() -> None

Close the in-memory snapshot store (no-op).


Event for cross-service communication.

Base class for all messages in the Events system.
with_metadata
def with_metadata(metadata: MessageMetadata) -> Message

Create a copy of the message with new metadata.

with_correlation_id
def with_correlation_id(correlation_id: UUID) -> Message

Create a copy with a correlation ID.

with_causation_id
def with_causation_id(causation_id: UUID) -> Message

Create a copy with a causation ID.

correlation_id
property correlation_id() -> UUID | None

Get correlation ID from metadata.

causation_id
property causation_id() -> UUID | None

Get causation ID from metadata.

to_dict
def to_dict() -> dict[str, Any]

Convert message to dictionary.

from_dict
def from_dict(
    cls,
    data: dict[str, Any]
) -> Message

Create message from dictionary.


Metadata attached to messages for tracking and correlation.
with_correlation
def with_correlation(correlation_id: UUID) -> MessageMetadata

Create new metadata with correlation ID.

with_causation
def with_causation(causation_id: UUID) -> MessageMetadata

Create new metadata with causation ID.

with_user
def with_user(user_id: str) -> MessageMetadata

Create new metadata with user ID.

with_tenant
def with_tenant(tenant_id: str) -> MessageMetadata

Create new metadata with tenant ID.

with_custom
def with_custom(
    key: str,
    value: Any
) -> MessageMetadata

Create new metadata with additional custom field.


Types of messages in the event-driven system.

Information about registered middleware.

MongoDB connection configuration for store implementations.

Used directly by MongoDBSnapshotStore for motor connection lifecycle management.


MongoDB event store implementation using ``DocumentStoreProtocol``.

Uses the lexigram-nosql collection abstraction for standard CRUD while keeping direct motor access for change streams and transactions.

Features: - Optimistic concurrency with version checking - Global event ordering with sequence counters - Efficient event streaming with cursors - Automatic index creation - Change streams support for real-time events

Example

store = MongoDBEventStore(document_store, config)
await store.connect()
try:
await store.save("order-123", [event], expected_version=0)
finally:
await store.close()
__init__
def __init__(
    document_store: DocumentStoreProtocol,
    config: Any,
    event_serializer: Any | None = None
) -> None

Initialize MongoDB event store.

Parameters
ParameterTypeDescription
`document_store`DocumentStoreProtocolA connected ``DocumentStoreProtocol`` (e.g. MongoDBDocumentStore).
`config`AnyEvent store configuration with collection names.
`event_serializer`Any | NoneOptional custom event serializer.
events
property events() -> CollectionProtocol

Events collection.

counters
property counters() -> CollectionProtocol

Counters collection.

connect
async def connect() -> None

Connect to MongoDB and set up collections.

append
async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to a stream with optimistic concurrency.

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
`events`list[Event]Events to save.
`expected_version`int | NoneExpected current version.
Returns
TypeDescription
intNew stream version.
Raises
ExceptionDescription
ConcurrencyErrorIf version conflict occurs.
read
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream.

get_stream_version
async def get_stream_version(stream_id: str) -> int

Get current stream version.

Parameters
ParameterTypeDescription
`stream_id`strStream to check.
Returns
TypeDescription
intCurrent version (0 if stream doesn't exist).
stream_all
async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events from a global position.

stream_by_type
async def stream_by_type(
    event_types: list[str],
    from_position: int = 0
) -> AsyncIterator[Event]

Stream events filtered by type.

watch_events
async def watch_events(event_types: list[str] | None = None) -> AsyncIterator[Event]

Watch for new events using MongoDB change streams.

Note: This requires direct motor access for change stream support. Falls back to polling if the underlying store doesn’t support watch().

Parameters
ParameterTypeDescription
`event_types`list[str] | NoneOptional list of event types to filter.
Yields
TypeDescription
AsyncIterator[Event]New events as they are inserted.
find_by_type
async def find_by_type(
    event_type: str,
    limit: int = 100,
    offset: int = 0
) -> list[Event]

Get events of a specific type.

Parameters
ParameterTypeDescription
`event_type`strEvent type name.
`limit`intMaximum events to return.
`offset`intNumber of events to skip.
Returns
TypeDescription
list[Event]List of events.
get_events_count
async def get_events_count(stream_id: str | None = None) -> int

Get total event count.

Parameters
ParameterTypeDescription
`stream_id`str | NoneOptional stream to count (None for all).
Returns
TypeDescription
intEvent count.
delete_stream
async def delete_stream(stream_id: str) -> int

Delete all events for a stream.

Parameters
ParameterTypeDescription
`stream_id`strStream to delete.
Returns
TypeDescription
intNumber of deleted events.
close
async def close() -> None

Close the MongoDB connection.


Configuration for MongoDB event store (top-level user config).

Used by EventsConfig.mongodb <lexigram.events.config.EventsConfig.mongodb>. For the store-level connection config used by MongoDBSnapshotStore, see lexigram.events.stores.mongodb.MongoDBConfig.

validate_connection_string
def validate_connection_string(
    cls,
    v: SecretStr | str
) -> SecretStr

Validate and coerce MongoDB connection string to SecretStr.


MongoDB snapshot store implementation.

This store manages aggregate snapshots for optimized loading.

Example

config = MongoDBConfig(uri="mongodb://localhost:27017")
store = MongoDBSnapshotStore(config)
await store.connect()
# Save snapshot
await store.save(Snapshot(
aggregate_id="order-123",
aggregate_type="Order",
version=100,
state=order.to_dict(),
))
# Load latest
snapshot = await store.get_latest("order-123")
__init__
def __init__(config: MongoDBConfig) -> None

Initialize MongoDB snapshot store.

Parameters
ParameterTypeDescription
`config`MongoDBConfigMongoDB configuration.
snapshots
property snapshots() -> AsyncIOMotorCollection
connect
async def connect() -> None

Connect to MongoDB.

save_snapshot
async def save_snapshot(snapshot: Snapshot) -> None

Save a snapshot (M-14 standardized API).

get_latest
async def get_latest(aggregate_id: str) -> Snapshot | None

Get latest snapshot (M-14 standardized API).

get_by_version
async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get a specific snapshot version (M-14).

delete_old_snapshots
async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

Parameters
ParameterTypeDescription
`aggregate_id`str | UUIDAggregate identifier.
`keep_count`intNumber of snapshots to keep.
Returns
TypeDescription
intNumber of deleted snapshots.
close
async def close() -> None

Close the MongoDB connection.


Container for paginated results.

Attributes: items: The items in the current page. total: Total number of items across all pages. page: Current page number. page_size: Number of items per page.

Example

result = PagedResult(
items=[...],
total=150,
page=2,
page_size=20
)
logger.info(result.total_pages) # 8
logger.info(result.has_next) # True
__init__
def __init__(
    items: list[TResult],
    total: int,
    page: int,
    page_size: int
) -> None

Initialize paged result.

Parameters
ParameterTypeDescription
`items`list[TResult]Items in the current page.
`total`intTotal item count.
`page`intCurrent page number.
`page_size`intItems per page.
total_pages
property total_pages() -> int

Get total number of pages.

has_next
property has_next() -> bool

Check if there is a next page.

Returns
TypeDescription
boolTrue if there are more pages after current.
has_prev
property has_prev() -> bool

Check if there is a previous page.

Returns
TypeDescription
boolTrue if there are pages before current.
to_dict
def to_dict() -> dict[str, Any]

Convert to dictionary.

Returns
TypeDescription
dict[str, Any]Dictionary representation of paged result.

Query with built-in pagination support.
offset
property offset() -> int

Get query offset based on page.

limit
property limit() -> int

Get query limit.


PostgreSQL connection configuration.

Attributes: dsn: Database connection string (treated as a secret). pool_min_size: Minimum connection pool size. pool_max_size: Maximum connection pool size. command_timeout: Command timeout in seconds. events_table: Name of the events table. snapshots_table: Name of the snapshots table. auto_create_tables: Whether to create tables automatically.

validate_dsn
def validate_dsn(
    cls,
    v: SecretStr | str | None
) -> SecretStr | None

Validate and normalize the DSN. Returns None when not provided.


States of a projection.

Emitted when a read-model projection is rebuilt or updated.

Attributes: projection_name: Canonical name of the projection that was updated. events_processed: Number of domain events processed in this update.


Represents a request for data.

Configuration for query bus.

Query bus for dispatching queries.

Handlers are registered explicitly via register() or through HandlerRegistry.register_with_buses() during provider registration.

Example

# Queries are automatically routed to handlers
orders = await bus.execute(GetOrdersByCustomerQuery(
customer_id="cust-123",
status="pending"
))
# Queries are automatically routed to handlers
orders = await bus.execute(GetOrdersByCustomerQuery(
customer_id="cust-123",
status="pending"
))
__init__
def __init__(
    middlewares: list[Any] | None = None,
    config: Any | None = None
) -> None

Initialize the query bus.

execute
async def execute(query: Query[TResult]) -> TResult

Execute a query.

Parameters
ParameterTypeDescription
`query`Query[TResult]The query to execute.
Returns
TypeDescription
TResultQuery result.
Raises
ExceptionDescription
HandlerNotFoundErrorIf no handler is registered.
QueryExecutionErrorIf query execution fails.

Base class for query handlers.

Query handlers process queries and return results. Each query type should have exactly one handler. Queries should not modify state.

Example

class GetOrdersHandler(QueryHandlerProtocol[GetOrdersQuery, list[OrderDTO]]):
def __init__(self, read_model: OrderReadModel):
self.read_model = read_model
async def handle(self, query: GetOrdersQuery) -> list[OrderDTO]:
return await self.read_model.get_orders(
customer_id=query.customer_id,
status=query.status
)
handle
async def handle(query: TQuery) -> TResult

Handle a query.

Parameters
ParameterTypeDescription
`query`TQueryThe query to handle
Returns
TypeDescription
TResultQuery result

Result of query execution.

Redis-backed AbstractEventStore for lightweight deployments.

Uses the StateStoreProtocol protocol for storage, so no direct Redis dependency is introduced into this package. Suitable for single-node or low-throughput deployments where a full relational or document store is not required.

Key layout

{namespace}:{stream_id}:events — JSON array of serialised event dicts
{namespace}:{stream_id}:version — current integer version for the stream
{namespace}:_streams — JSON array of all known stream IDs
{namespace}:{stream_id}:events — JSON array of serialised event dicts
{namespace}:{stream_id}:version — current integer version for the stream
{namespace}:_streams — JSON array of all known stream IDs

Example

from lexigram.events.stores.redis import RedisEventStore
store = RedisEventStore(state_store, namespace="myapp:events")
await store.append("order-123", [OrderCreated(...)])
events = await store.read("order-123")
from lexigram.events.stores.redis import RedisEventStore
store = RedisEventStore(state_store, namespace="myapp:events")
await store.append("order-123", [OrderCreated(...)])
events = await store.read("order-123")
__init__
def __init__(
    store: StateStoreProtocol,
    namespace: str = 'lexigram:events'
) -> None

Initialise the Redis-backed event store.

Parameters
ParameterTypeDescription
`store`StateStoreProtocolA StateStoreProtocol instance (e.g. a Redis-backed implementation).
`namespace`strKey prefix used for all stored keys.
append
async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to stream_id with optimistic concurrency control.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`events`list[Event]Events to append. Empty list is a no-op.
`expected_version`int | NoneIf provided, the current stream version must equal this value or ConcurrencyError is raised.
Returns
TypeDescription
intNew stream version after the append.
Raises
ExceptionDescription
ConcurrencyErrorIf *expected_version* does not match the actual current version of the stream.
read
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from stream_id.

Parameters
ParameterTypeDescription
`stream_id`strStream to read from.
`from_version`intLowest version to include (inclusive, default 0 = all).
`to_version`int | NoneHighest version to include (inclusive).
`limit`int | NoneMaximum number of events to return.
Returns
TypeDescription
list[Event]Ordered list of Event objects.
load
async def load(
    stream_id: str,
    after_version: int = 0
) -> list[Event]

Load events after after_version (non-inclusive lower bound).

This alias is used by several base-class helpers.

Parameters
ParameterTypeDescription
`stream_id`strStream to load events from.
`after_version`intEvents with version <= this value are excluded.
Returns
TypeDescription
list[Event]Ordered list of events.
get_stream_version
async def get_stream_version(stream_id: str) -> int

Return the current version of stream_id (0 if the stream is empty).

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
Returns
TypeDescription
intCurrent integer version.
get_version
async def get_version(stream_id: str) -> int

Public alias for get_stream_version.

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
Returns
TypeDescription
intCurrent integer version.
stream_all
async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events across all known streams.

Note: Cross-stream global ordering is not guaranteed by this implementation. Streams are iterated in registration order and events within each stream are returned in version order. For strict global ordering, use the PostgreSQL or MongoDB stores.

Parameters
ParameterTypeDescription
`from_position`intEvents with ``sequence_number`` <= this value are skipped.
`batch_size`intUnused; present for interface compatibility.
`partition`int | NoneOptional partition index for sharding.
`total_partitions`int | NoneTotal number of partitions.
Yields
TypeDescription
AsyncIterator[Event]Events in stream-registration order, version order within each stream.
compact
async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for stream_id up to and including up_to_version.

Parameters
ParameterTypeDescription
`stream_id`strStream to compact.
`up_to_version`intEvents at or below this version are removed.
Returns
TypeDescription
intNumber of events deleted.
close
async def close() -> None

Close the event store.

No-op for this implementation; the underlying StateStoreProtocol manages its own lifecycle.


Aggregate state snapshot for performance optimization.

Configuration for snapshotting behavior.

Manages snapshot creation and retrieval for aggregates.

This is the CRITICAL component that transforms O(N) aggregate loading into O(1) by storing and loading snapshots.

Example

manager = SnapshotManager(
event_store=event_store,
snapshot_store=snapshot_store,
policy=EventCountPolicy(100)
)
# Load aggregate with snapshot acceleration
state, events = await manager.load_with_snapshot(
aggregate_id="order-123",
aggregate_type="Order"
)
# Auto-snapshot after save if policy triggers
await manager.save_and_maybe_snapshot(
aggregate_id="order-123",
aggregate_type="Order",
events=[...],
current_state=state,
expected_version=100
)
__init__
def __init__(
    event_store: EventStoreProtocol,
    snapshot_store: SnapshotStoreProtocol,
    policy: SnapshotPolicy | None = None,
    config: SnapshotConfig | None = None
)

Initialize the snapshot manager.

Parameters
ParameterTypeDescription
`event_store`EventStoreProtocolThe event store instance
`snapshot_store`SnapshotStoreProtocolThe snapshot store instance
`policy`SnapshotPolicy | NoneSnapshot creation policy (default: EventCountPolicy(100))
`config`SnapshotConfig | NoneSnapshot configuration
load_with_snapshot
async def load_with_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str
) -> tuple[dict[str, Any] | None, list[Any], int]
save_and_maybe_snapshot
async def save_and_maybe_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str,
    events: list[Any],
    current_state: dict[str, Any],
    expected_version: int
) -> bool

Save events and create snapshot if policy triggers.

Returns
TypeDescription
boolTrue if a snapshot was created
create_snapshot
async def create_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str,
    version: int,
    state: dict[str, Any]
) -> Snapshot

Create and store a snapshot.

force_snapshot
async def force_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str,
    state_extractor: Callable[[], dict[str, Any]]
) -> Snapshot

Force creation of a snapshot regardless of policy.

cleanup_old_snapshots
async def cleanup_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int | None = None
) -> int

Clean up old snapshots for an aggregate.


Snapshot creation strategies.

SQLite configuration for event stores.

SQLite event store implementation.

This store uses the generic DatabaseProviderProtocol for async database access. Suitable for development, testing, and single-instance deployments.

__init__
def __init__(
    config: SqliteConfig | None = None,
    provider: DatabaseProviderProtocol | None = None,
    event_serializer: Any | None = None
) -> None

Initialize SQLite event store.

Parameters
ParameterTypeDescription
`config`SqliteConfig | NoneSQLite configuration.
`provider`DatabaseProviderProtocol | NoneThe generic database provider injected via DI.
`event_serializer`Any | NoneOptional custom event serializer.
register_event_type
def register_event_type(
    event_type: str,
    event_class: type
) -> None

Register an event type for proper deserialization.

connect
async def connect() -> None

Connection is managed by the provider. Tables are created if configured.

append
async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to a stream with optimistic concurrency.

read
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream (M-15).

get_stream_version
async def get_stream_version(stream_id: str) -> int

Get current stream version.

stream_all
async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events from a global position.

stream_by_type
async def stream_by_type(
    event_types: list[str],
    from_position: int = 0
) -> AsyncIterator[Event]

Stream events filtered by type.

find_by_type
async def find_by_type(
    event_type: str,
    count: int = 100,
    start: int = 0
) -> list[Event]

Get events of a specific type.

get_events_count
async def get_events_count(stream_id: str | None = None) -> int

Get total event count.

delete_stream
async def delete_stream(stream_id: str) -> int

Delete all events for a stream.

get_stream_ids
async def get_stream_ids() -> list[str]

Get all stream IDs in the store.

compact
async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for a stream up to (and including) the given version (MF-04).

close
async def close() -> None

Close the database connection.


SQLite snapshot store implementation.

This store manages aggregate snapshots using a generic DatabaseProviderProtocol.

__init__
def __init__(
    config: SqliteConfig | None = None,
    provider: DatabaseProviderProtocol | None = None
) -> None

Initialize SQLite snapshot store.

Parameters
ParameterTypeDescription
`config`SqliteConfig | NoneSQLite configuration.
`provider`DatabaseProviderProtocol | NoneThe generic database provider injected via DI.
connect
async def connect() -> None

Connection is managed by the provider.

save_snapshot
async def save_snapshot(snapshot: Snapshot) -> None

Save a snapshot (M-14 standardized API).

get_latest
async def get_latest(aggregate_id: str) -> Snapshot | None

Get latest snapshot (M-14 standardized API).

get_by_version
async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get a specific snapshot version (M-14).

delete_old_snapshots
async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

close
async def close() -> None

Close the database connection.


Information about an event stream.

Starting position for event streams.

Base class for value objects.

Value objects:

  • Are immutable (frozen=True)
  • Have no identity
  • Are compared by value (all fields)
  • Can be freely shared

Delivers domain events to registered external webhook endpoints.

Signs each outbound POST with a cryptographic signature via the injected WebhookSignatureVerifierProtocol so that receivers can verify authenticity. Retries transient 5xx failures according to retry_policy. Delivery failures for one endpoint are logged and do not prevent fan-out to remaining endpoints.

Example

dispatcher = WebhookDispatcher(
http_client=http_client,
signature_verifier=HMACWebhookVerifier(),
retry_policy=RetryConfig(max_attempts=3),
)
await dispatcher.dispatch(user_created_event, registered_endpoints)
dispatcher = WebhookDispatcher(
http_client=http_client,
signature_verifier=HMACWebhookVerifier(),
retry_policy=RetryConfig(max_attempts=3),
)
await dispatcher.dispatch(user_created_event, registered_endpoints)
__init__
def __init__(
    http_client: HTTPClientProtocol,
    signature_verifier: WebhookSignatureVerifierProtocol,
    retry_policy: RetryConfig | None = None
) -> None

Initialise the dispatcher.

Parameters
ParameterTypeDescription
`http_client`HTTPClientProtocolHTTP client used for outbound POST requests.
`signature_verifier`WebhookSignatureVerifierProtocolUsed to compute the outbound HMAC signature included in ``X-Webhook-Signature``.
`retry_policy`RetryConfig | NoneRetry configuration; defaults to 3 attempts with exponential backoff when not provided.
dispatch
async def dispatch(
    event: DomainEvent,
    endpoints: list[WebhookEndpoint]
) -> None

Sign and POST event to each eligible registered endpoint.

Only endpoints whose event_types filter matches the event are delivered to. Each delivery is retried per retry_policy on transient (5xx) failures. Failures are logged but do not abort delivery to remaining endpoints.

Parameters
ParameterTypeDescription
`event`DomainEventDomain event to fan-out.
`endpoints`list[WebhookEndpoint]Registered endpoints to consider for delivery.

An external endpoint registered to receive domain events.

Attributes: url: HTTP(S) URL to POST events to. secret: Shared secret used to compute the HMAC signature. event_types: Set of event type names this endpoint subscribes to. When None, all events are delivered.

accepts
def accepts(event: DomainEvent) -> bool

Return True when this endpoint should receive event.

Parameters
ParameterTypeDescription
`event`DomainEventDomain event to test against the filter.
Returns
TypeDescription
bool``True`` when ``event_types`` is ``None`` (all events) or the event's type name is contained in ``event_types``.

clear_handler_registry
def clear_handler_registry() -> None
Clear all registered handlers from the global handler registry.

Intended for use in tests to reset handler state between test runs. Clears both the HandlerRegistry instances and the global decorator registry.


command_handler
def command_handler(
    command_type: type[Command],
    *,
    name: str | None = None,
    **metadata: Any
) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to register a function as a command handler.
Parameters
ParameterTypeDescription
`command_type`type[Command]The Command class this handler handles.
`name`str | NoneOptional custom name for the handler. **metadata: Additional metadata to attach to the handler.
Returns
TypeDescription
Callable[[Callable[P, R]], Callable[P, R]]Decorator function.

Example

@command_handler(CreateUserCommand)
async def handle_create_user(command: CreateUserCommand) -> str:
user = await user_service.create(command.username, command.email)
return user.user_id

event_handler
def event_handler(
    event_type: type[Event],
    *,
    name: str | None = None,
    priority: int = 0,
    **metadata: Any
) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to register a function as an event handler.
Parameters
ParameterTypeDescription
`event_type`type[Event]The Event class this handler handles.
`name`str | NoneOptional custom name for the handler.
`priority`intHandler priority (higher = executed first). **metadata: Additional metadata to attach to the handler.
Returns
TypeDescription
Callable[[Callable[P, R]], Callable[P, R]]Decorator function.

Example

@event_handler(UserCreatedEvent, priority=10)
async def send_welcome_email(event: UserCreatedEvent) -> None:
await email_service.send_welcome(event.email)

query_handler
def query_handler(
    query_type: type[Query],
    *,
    name: str | None = None,
    cacheable: bool = False,
    cache_ttl: int | None = None,
    **metadata: Any
) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to register a function as a query handler.
Parameters
ParameterTypeDescription
`query_type`type[Query]The Query class this handler handles.
`name`str | NoneOptional custom name for the handler.
`cacheable`boolWhether results can be cached.
`cache_ttl`int | NoneCache time-to-live in seconds. **metadata: Additional metadata to attach to the handler.
Returns
TypeDescription
Callable[[Callable[P, R]], Callable[P, R]]Decorator function.

Example

@query_handler(GetUserQuery, cacheable=True, cache_ttl=300)
async def handle_get_user(query: GetUserQuery) -> UserDTO:
return await user_repository.get(query.user_id)

Raised when aggregate is not found.

Command execution error.
__init__
def __init__(
    message: str = 'Command execution failed',
    command_type: str | None = None,
    error: str | None = None,
    **kwargs: Any
) -> None

Concurrency/optimistic locking error.

Raised when event version conflicts indicate concurrent modifications.

__init__
def __init__(
    message: str = 'Concurrency error',
    expected_version: int | None = None,
    actual_version: int | None = None,
    **kwargs: Any
) -> None

Duplicate handler registration.

Raised when attempting to register a handler for a type that already has one.

__init__
def __init__(
    message: str = 'Duplicate handler',
    message_type: str | None = None,
    **kwargs: Any
) -> None

Base events/messaging error.
__init__
def __init__(
    message: str = 'Events error',
    **kwargs: Any
) -> None

Raised when event loading fails.
__init__
def __init__(
    message: str = 'Event load error',
    **kwargs: Any
) -> None

Raised when event persistence fails.

Raised when event store connection fails.
__init__
def __init__(
    message: str = 'Event store connection error',
    **kwargs: Any
) -> None

Raised for event store errors.
__init__
def __init__(
    message: str = 'Event store error',
    **kwargs: Any
) -> None

Event/command handler not found.

Raised when no handler is registered for a specific event or command type.

__init__
def __init__(
    message: str = 'Handler not found',
    handler_type: str | None = None,
    message_type: str | None = None,
    **kwargs: Any
) -> None

Raised when projection building fails.

Raised when a projection is not found.

Query execution error.
__init__
def __init__(
    message: str = 'Query execution failed',
    query_type: str | None = None,
    error: str | None = None,
    **kwargs: Any
) -> None

Raised for schema-related errors.

Raised for security-related errors.

Raised when stream is not found.

Parameters mirror the helper used in tests: (stream_type, stream_id).

__init__
def __init__(
    stream_type: str,
    stream_id: str,
    message: str | None = None,
    **kwargs: Any
) -> None

Raised for streaming-related errors.

Raised when an outbound webhook delivery fails after all retry attempts.

Carries the target url and the HTTP status code returned by the remote endpoint so callers can log structured failure context.

__init__
def __init__(
    url: str,
    status: int,
    message: str = 'Webhook delivery failed',
    **kwargs: Any
) -> None