Skip to content
GitHubDiscord

API Reference

Protocol for command bus implementations.

Example

class CommandBusProtocol:
async def dispatch(self, command: Command) -> Any:
handler = self._handlers[type(command)]
return await handler.handle(command)
async def dispatch(command: Any) -> Any

Dispatch a command to its handler.

Parameters
ParameterTypeDescription
`command`AnyCommand to dispatch.
Returns
TypeDescription
AnyResult from the command handler.

Protocol for publishing domain events.
async def publish(event: Any) -> None

Publish a domain event.

Parameters
ParameterTypeDescription
`event`AnyDomainEvent instance.

Protocol for event bus implementations.

The event bus manages event publication and subscription.

Example

```python
class InMemoryEventBus:
async def publish(self, event: DomainEvent) -> "Result[None, EventError]":
for handler in self._handlers.get(type(event), []):
await handler.handle(event)
return Ok(None)
def subscribe(self, event_type, handler):
self._handlers.setdefault(event_type, []).append(handler)
<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>async </span><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>publish</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>Any</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L97' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Publish an event to all subscribers.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Any</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>DomainEvent to publish.</td></tr></tbody></table></div>
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Returns</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:45%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Ok(None) when the event is successfully enqueued for dispatch. Err(EventError) when the event cannot be accepted (e.g.\ no handlers registered and the bus requires at least one).</td></tr></tbody></table></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>subscribe</span><span style='color: var(--lex-color-colon)'>(</span>
<span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span>
<span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>EventHandlerProtocol</span>
<span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L110' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Subscribe a handler to an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to subscribe to.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>EventHandlerProtocol</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to call when event is published.</td></tr></tbody></table></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>unsubscribe</span><span style='color: var(--lex-color-colon)'>(</span>
<span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span>
<span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>EventHandlerProtocol</span>
<span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L119' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Remove a handler subscription for an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to unsubscribe from.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>EventHandlerProtocol</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to remove.</td></tr></tbody></table></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventFilterProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L22' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Predicate that determines whether an event should be dispatched.
<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>matches</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>DomainEvent</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>bool</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L25' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventSerializerProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L14' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Serializes and deserializes domain events for transport.
<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>serialize</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>DomainEvent</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>bytes</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L17' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>deserialize</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>data</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>bytes</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>DomainEvent</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L18' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventStoreProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L253' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Protocol for event store implementations.
The event store persists and retrieves domain events.
**Example**
```python
class PostgresEventStore:
async def append(self, stream_id, events, expected_version=None):
async with self.db.transaction():
for event in events:
await self.db.insert("events", event.to_dict())
async def append(
    stream_id: str,
    events: list[Any],
    expected_version: int | None = None
) -> int

Append events to a stream.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`events`list[Any]List of events to append.
`expected_version`int | NoneExpected stream version for optimistic concurrency.
Returns
TypeDescription
intNew stream version.
Raises
ExceptionDescription
ConcurrencyErrorIf expected version doesn't match.
async def read(
    stream_id: str,
    start: int = 0,
    count: int | None = None
) -> list[Any]

Read events from a stream.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`start`intStarting position.
`count`int | NoneMaximum events to read.
Returns
TypeDescription
list[Any]List of events.
async def read_all(
    position: int = 0,
    count: int | None = None
) -> list[Any]

Read events from all streams.

Parameters
ParameterTypeDescription
`position`intStarting global position.
`count`int | NoneMaximum events to read.
Returns
TypeDescription
list[Any]List of events.

Protocol for handlers that handle multiple event types.
def handles() -> list[type]

Get list of event types handled.

Returns
TypeDescription
list[type]List of event classes.
async def handle(event: Any) -> Result[None, EventError]

Handle an event.

Parameters
ParameterTypeDescription
`event`AnyDomainEvent to handle.
Returns
TypeDescription
Result[None, EventError]``Ok(None)`` on success, ``Err(EventError)`` if handling fails in an expected, recoverable way.

ProjectionProtocol protocol for building read models from events.
def apply(event: Any) -> None

Apply an event to the projection state.

Parameters
ParameterTypeDescription
`event`AnyDomain event to apply.

Protocol for query bus implementations.

Example

class QueryBusProtocol:
async def execute(self, query: Query) -> Any:
handler = self._handlers[type(query)]
return await handler.handle(query)
async def execute(query: Any) -> Any

Execute a query through its handler.

Parameters
ParameterTypeDescription
`query`AnyQuery to execute.
Returns
TypeDescription
AnyQuery result.

Protocol for read-only repository operations.

Use this for query-only access patterns (CQRS query side).

Example

class UserQueryRepository:
async def get(self, id: str) -> User | None:
return await self.db.query("users").where(id=id).first()
async def list(self, skip: int = 0, limit: int = 100) -> list[User]:
return await self.db.query("users").offset(skip).limit(limit).all()
async def get(item_id: str) -> T | None

Get entity by ID.

Parameters
ParameterTypeDescription
`item_id`strEntity identifier.
Returns
TypeDescription
T | NoneEntity if found, None otherwise.
async def list(
    skip: int = 0,
    limit: int = 100,
    **filters: Any
) -> list[T]

List entities with pagination.

Parameters
ParameterTypeDescription
`skip`intNumber of records to skip.
`limit`intMaximum records to return. **filters: Optional filter criteria.
Returns
TypeDescription
list[T]List of entities.
async def find_by_spec(spec: SpecificationProtocol[T]) -> list[T]

Find entities matching a complex specification.

Parameters
ParameterTypeDescription
`spec`SpecificationProtocol[T]The DDD specification to evaluate.
Returns
TypeDescription
list[T]List of matching entities.
async def count(**filters: Any) -> int

Count entities matching filters.

Parameters
ParameterTypeDescription
Returns
TypeDescription
intTotal count of matching entities.

Protocol for full repository operations.

Extends ReadOnlyRepositoryProtocol with write operations.

Example

class UserRepository:
async def save(self, entity: User) -> User:
if entity.id:
await self.db.update("users", entity.dict()).where(id=entity.id)
else:
entity.id = await self.db.insert("users", entity.dict())
return entity
async def delete(self, id: str) -> bool:
result = await self.db.delete("users").where(id=id)
return result.affected_rows > 0
async def save(entity: T) -> T

Save (create or update) an entity.

Parameters
ParameterTypeDescription
`entity`TEntity to save.
Returns
TypeDescription
TSaved entity with any generated fields populated.
async def delete(item_id: str) -> bool

Delete entity by ID.

Parameters
ParameterTypeDescription
`item_id`strEntity identifier.
Returns
TypeDescription
boolTrue if deleted, False if not found.
async def save_many(entities: list[T]) -> list[T]

Save (create or update) multiple entities in a single operation.

Implementations SHOULD execute this as a batch for efficiency.

Parameters
ParameterTypeDescription
`entities`list[T]Entities to save.
Returns
TypeDescription
list[T]Saved entities with any generated fields populated.
async def delete_many(item_ids: list[str]) -> int

Delete multiple entities by ID.

Implementations SHOULD execute this as a batch for efficiency.

Parameters
ParameterTypeDescription
`item_ids`list[str]Entity identifiers to delete.
Returns
TypeDescription
intNumber of entities actually deleted.

Protocol for saga lifecycle management.

The manager routes incoming events to all registered sagas and coordinates their execution.

async def process(event: Any) -> None

Process an event through all relevant sagas.

Parameters
ParameterTypeDescription
`event`AnyDomain event to route and process.

Protocol for saga / process-manager implementations.

Sagas coordinate long-running business processes that span multiple aggregates by reacting to domain events and dispatching commands.

async def handle(event: Any) -> list[Any]

Handle a domain event and produce commands.

Parameters
ParameterTypeDescription
`event`AnyDomain event to handle.
Returns
TypeDescription
list[Any]List of commands to dispatch to the command bus.

Protocol for aggregate snapshot storage.
async def save(
    aggregate_id: str,
    snapshot: Any,
    version: int
) -> None

Save an aggregate snapshot.

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate identifier.
`snapshot`AnyAggregate state snapshot.
`version`intAggregate version at snapshot.
async def load(aggregate_id: str) -> tuple[Any, int] | None

Load the latest snapshot.

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate identifier.
Returns
TypeDescription
tuple[Any, int] | NoneTuple of (snapshot, version) or None if not found.

Abstract base class for event storage.

Inherits stream/cursor/replay helpers from EventStreamMixin. Snapshot storage is provided by AbstractSnapshotStore.

Example

# Append events
await store.append(
stream_id="order-123",
events=[OrderCreated(...), ItemAdded(...)],
expected_version=0
)
# Read events
events = await store.read("order-123")
def __init__(schema_evolution: SchemaEvolution | None = None) -> None

Initialise event store, optionally with schema evolution (M-03).

async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to a stream with optimistic concurrency.

Returns
TypeDescription
intNew stream version.
Parameters
ParameterTypeDescription
`stream_id`strUnique identifier for the event stream
`events`list[Event]List of events to save
`expected_version`int | NoneExpected current version for concurrency control
Raises
ExceptionDescription
ConcurrencyErrorIf version conflict occurs
EventPersistenceErrorIf save operation fails
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream (M-15).

Parameters
ParameterTypeDescription
`stream_id`strStream to load events from
`from_version`intStarting version (inclusive)
`to_version`int | NoneEnding version (inclusive)
`limit`int | NoneMax number of events to read
Returns
TypeDescription
list[Event]List of events in chronological order
Raises
ExceptionDescription
EventLoadErrorIf load operation fails
async def get_stream_version(stream_id: str) -> int

Get the current version of a stream.

Parameters
ParameterTypeDescription
`stream_id`strStream to check
Returns
TypeDescription
intCurrent version (0 if stream doesn't exist)
async def stream_exists(stream_id: str) -> bool

Check if a stream exists.

Parameters
ParameterTypeDescription
`stream_id`strStream to check
Returns
TypeDescription
boolTrue if stream has events
async def get_stream_info(stream_id: str) -> StreamInfo | None

Get information about a stream.

Parameters
ParameterTypeDescription
`stream_id`strStream to get info for
Returns
TypeDescription
StreamInfo | NoneStreamInfo or None if stream doesn't exist
async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events from the store in global order.

Parameters
ParameterTypeDescription
`from_position`intStarting global sequence number.
`batch_size`intNumber of events to fetch per batch.
`partition`int | NoneOptional partition index (0 to total_partitions - 1).
`total_partitions`int | NoneTotal number of partitions for sharding.
async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for a stream up to (and including) the given version (MF-04).

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
`up_to_version`intMaximum version to delete.
Returns
TypeDescription
intNumber of events purged.
async def find_by_aggregate(aggregate_id: str) -> list[Event]

Get all events for an aggregate (for reconstitution).

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate identifier (same as stream_id).
Returns
TypeDescription
list[Event]All events for the aggregate in chronological order.
async def close() -> None

Close the event store and release resources.


Abstract base for middleware components.

Middleware wraps around handlers to add cross-cutting concerns like logging, validation, transactions, etc.

Example

class TimingMiddleware(AbstractMiddleware[Message, Any]):
async def __call__(
self,
message: Message,
next_handler: NextHandler
) -> Any:
start = time.time()
result = await next_handler(message)
duration = time.time() - start
logger.info("Handler took %.3fs", duration)
return result

Read-only repository interface.

Useful for query-side repositories in CQRS.

async def get(aggregate_id: UUID | str) -> TAggregate | None

Get an aggregate by ID.

async def exists(aggregate_id: UUID | str) -> bool

Check if an aggregate exists.

async def get_all(
    limit: int | None = None,
    offset: int | None = None
) -> list[TAggregate]

Get all aggregates with pagination.


Abstract base repository for aggregate persistence.

Repositories provide collection-like access to aggregates, abstracting away the persistence mechanism.

Example

class OrderRepository(AbstractRepository[Order]):
async def get(self, id: UUID) -> Order | None:
...
async def save(self, aggregate: Order) -> None:
...
async def get(aggregate_id: UUID | str) -> TAggregate | None

Get an aggregate by ID.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
TAggregate | NoneThe aggregate if found, None otherwise
async def save(aggregate: TAggregate) -> None

Save an aggregate.

This persists any pending changes and clears the pending events.

Parameters
ParameterTypeDescription
`aggregate`TAggregateThe aggregate to save
async def exists(aggregate_id: UUID | str) -> bool

Check if an aggregate exists.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
boolTrue if the aggregate exists

Abstract base class for snapshot storage.

Provides optimised aggregate loading by storing periodic snapshots. Avoids replaying the entire event history for aggregates with many events.

Strategy: 1. Load the latest snapshot (if any). 2. Load events after the snapshot version. 3. Apply those events to the snapshot state.

This reduces loading from O(N) to O(1) + O(events since snapshot).

Example (M-14 — uses Snapshot object throughout)

from lexigram.events.types import Snapshot
# Save a snapshot
await store.save(Snapshot(
aggregate_id=order.id,
aggregate_type="Order",
version=order.version,
state=order.to_dict()
))
# Load the latest snapshot
snapshot = await store.get_latest("order-123")
if snapshot:
order = Order.from_snapshot(snapshot.state)
events = await event_store.read("order-123", start=snapshot.version)
async def save_snapshot(snapshot: Snapshot) -> None

Save an aggregate snapshot.

Parameters
ParameterTypeDescription
`snapshot`SnapshotSnapshot object containing aggregate_id, version, and state.
async def get_latest(aggregate_id: str) -> Snapshot | None

Load the latest snapshot for an aggregate.

Parameters
ParameterTypeDescription
`aggregate_id`strAggregate to load snapshot for.
Returns
TypeDescription
Snapshot | NoneSnapshot object or None if no snapshot exists.
async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get a specific snapshot by version.

Parameters
ParameterTypeDescription
`aggregate_id`str | UUIDAggregate identifier.
`version`intExact version to retrieve.
Returns
TypeDescription
Snapshot | NoneSnapshot at that version, or None.
async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

Parameters
ParameterTypeDescription
`aggregate_id`str | UUIDID of the aggregate.
`keep_count`intNumber of snapshots to retain (default: 3).
Returns
TypeDescription
intNumber of snapshots deleted.
async def close() -> None

Close the snapshot store and release resources.


Factory for creating aggregates.
def create(aggregate_id: UUID | str) -> TAggregate_co

Create a new aggregate instance.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strAggregate ID.
Returns
TypeDescription
TAggregate_coNew aggregate instance.

Base class for event-sourced aggregates.

Aggregates:

  • Maintain consistency boundaries
  • Generate domain events on state changes
  • Can be reconstructed from events (event sourcing)
  • Support snapshotting for performance

Example

class Order(AggregateRoot):
status: str = "draft"
items: list[OrderItem] = Field(default_factory=list)
total: Decimal = Decimal("0.00")
def add_item(self, product_id: str, quantity: int, price: Decimal):
if self.status != "draft":
raise InvalidOperationError("Cannot modify non-draft order")
self._apply(ItemAddedEvent(
order_id=self.id,
product_id=product_id,
quantity=quantity,
price=price
))
def _handle_item_added(self, event: ItemAddedEvent):
self.items.append(OrderItem(
product_id=event.product_id,
quantity=event.quantity,
price=event.price
))
self.total += event.price * event.quantity
def __init__(
    id: UUID | None = None,
    **kwargs: Any
) -> None

Initialise the aggregate, auto-generating a UUID id if none is given.

def add_event(event: Any) -> None

Buffer a domain event for persistence, calling invariant checks first.

def pull_events() -> list[Any]

Return a copy of pending events without clearing them.

def clear_events() -> None

Discard all buffered events.

property has_uncommitted_events() -> bool

True when there are buffered events awaiting persistence.

def get_pending_events() -> list[Event]

Get events pending persistence.

Returns
TypeDescription
list[Event]List of uncommitted events
def clear_pending_events() -> None

Clear pending events after persistence.

def load_from_history(events: list[Event]) -> None

Reconstruct aggregate state from event history.

This is used during aggregate loading to replay all events and rebuild the current state.

Parameters
ParameterTypeDescription
`events`list[Event]Historical events to replay
def load_from_snapshot(snapshot: Snapshot) -> None

Restore aggregate state from a snapshot.

Parameters
ParameterTypeDescription
`snapshot`SnapshotThe snapshot to restore from
def create_snapshot() -> Snapshot

Create a snapshot of current aggregate state.

Returns
TypeDescription
SnapshotSnapshot instance

Checkpoint for tracking projection progress.

Command extended with aggregate-targeting helpers.
property target_aggregate_id() -> UUID | None
property expected_version() -> int | None
def for_aggregate(
    aggregate_id: UUID,
    version: int | None = None
) -> Command

Target a specific aggregate.


Configuration for command bus.

Command bus for dispatching commands.

Handlers are registered explicitly via register() or through HandlerRegistry.register_with_buses() during provider registration.

Example

# Commands are automatically routed to handlers
result = await bus.dispatch(CreateOrderCommand(
customer_id="cust-123",
items=[...]
))
def __init__(
    middlewares: list[Any] | None = None,
    config: Any | None = None
) -> None

Initialize the command bus.

async def dispatch(command: Command) -> Any

Dispatch a command for execution.

Parameters
ParameterTypeDescription
`command`CommandThe command to execute.
Returns
TypeDescription
AnyCommand execution result.
Raises
ExceptionDescription
HandlerNotFoundErrorIf no handler is registered.
CommandExecutionErrorIf command execution fails.

Base class for command handlers.

Command handlers process commands and optionally return results. Each command type should have exactly one handler.

Example

class CreateOrderHandler(CommandHandlerProtocol[CreateOrderCommand, OrderId]):
def __init__(self, repository: OrderRepository):
self.repository = repository
async def handle(self, command: CreateOrderCommand) -> OrderId:
order = Order.create(
customer_id=command.customer_id,
items=command.items
)
await self.repository.save(order)
return order.id
async def handle(command: TCommand) -> TResult

Handle a command.

Parameters
ParameterTypeDescription
`command`TCommandThe command to handle
Returns
TypeDescription
TResultCommand execution result

Result of command execution.

Result of an event dispatch operation.

Attributes: success: True if all handlers completed successfully. handler_results: List of return values from each handler. errors: List of (exception_type, exception_instance) tuples.

def raise_if_errors() -> None

Raise if any handler failed. Call explicitly when needed.

property has_errors() -> bool

Return True if any handler failed.


Base class for domain events.
def __init__(**kwargs: Any) -> None

Initialize the event setting base fields and any extra subclass fields.

Declared dataclass fields receive their default values when not supplied. Extra keyword arguments (e.g. fields declared on plain subclasses without @dataclass) are set directly on the instance via object.__setattr__, which bypasses the frozen=True guard safely during construction.

def to_dict() -> dict[str, Any]

Return a JSON-serializable dictionary representation.

def from_dict(
    cls,
    data: dict[str, Any]
) -> DomainEvent
def for_aggregate(
    aggregate_id: UUID,
    aggregate_type: str
) -> DomainEvent

Base class for entities within aggregates.

Entities:

  • Have identity (id field)
  • Are mutable
  • Belong to an aggregate root
  • Don’t have their own event streams
def touch() -> None

Update the modified timestamp.


Represents a fact that happened in the past.

We override __init__ to accept arbitrary extra keyword arguments, which are attached as attributes on the instance. This preserves the historical ability to create Event(data=...) without declaring a new subclass. event_type handling is performed by the contract base class (which now includes actor_id).

The event version is an aggregate-specific concept and therefore is declared on this subclass rather than the canonical DomainEvent base class. with_version and related helpers operate on this field.

def __init__(
    *args: Any,
    **kwargs: Any
) -> None
def with_version(version: int) -> Event

Create a copy with a specific version (aggregate version).

def for_aggregate(
    aggregate_id: UUID,
    aggregate_type: str
) -> Event

Create a copy for a specific aggregate.


Configuration for event bus.

Event bus for publishing domain events.

Orchestrates subscription management, per-event-type BoundedChannel backpressure (MAJ-11), and background drain tasks. All handler dispatch logic (lookup, parallel/sequential dispatch, retry) is provided by HandlerDispatchMixin.

Attributes: DEFAULT_MAX_CONCURRENT_HANDLERS: Maximum concurrent handler executions. DEFAULT_HANDLER_TIMEOUT_SECONDS: Timeout for each handler execution. DEFAULT_RETRY_FAILED_HANDLERS: Whether to retry failed handlers. DEFAULT_MAX_HANDLER_RETRIES: Maximum retry attempts per handler. DEFAULT_ENABLE_DEAD_LETTER: Whether to enable dead letter queue. DEFAULT_ALLOW_NO_HANDLERS: Whether to allow events with no handlers. DEFAULT_PARALLEL_DISPATCH: Whether to dispatch handlers in parallel. DEFAULT_CONTINUE_ON_ERROR: Whether to continue on handler errors.

def __init__(
    middlewares: list[MiddlewareFunc] | None = None,
    config: EventBusConfig | None = None,
    parallel: ParallelProtocol | None = None,
    tracer: TracerProtocol | None = None,
    hooks: HookRegistryProtocol | None = None
) -> None

Initialize the event bus.

Parameters
ParameterTypeDescription
`middlewares`list[MiddlewareFunc] | NoneList of middleware functions.
`config`EventBusConfig | NoneEvent bus configuration.
`parallel`ParallelProtocol | NoneParallel execution protocol for concurrent handler execution.
`tracer`TracerProtocol | NoneOptional tracer for distributed tracing.
def subscribe(
    event_type: type[Event],
    handler: Any
) -> None

Subscribe a handler to an event type.

Parameters
ParameterTypeDescription
`event_type`type[Event]The event class to subscribe to.
`handler`AnyHandler to call when event is published.
def set_tracer(tracer: TracerProtocol | None) -> None

Attach an optional tracer after provider boot wiring.

Parameters
ParameterTypeDescription
`tracer`TracerProtocol | NoneTracer to attach, or None to clear.
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None

Attach an optional hook registry after provider boot wiring.

def handler(event_type: type[Event]) -> Callable[[Callable], Callable]

Decorator to subscribe a handler to an event type.

def subscribe_all(handler: Any) -> None

Subscribe a handler to all events.

Parameters
ParameterTypeDescription
`handler`AnyHandler to call for every published event.
def unsubscribe(
    event_type: type[Event],
    handler: Any
) -> bool

Unsubscribe a handler from an event type.

Parameters
ParameterTypeDescription
`event_type`type[Event]The event class.
`handler`AnyHandler to remove.
Returns
TypeDescription
boolTrue if handler was found and removed.
async def publish(event: Event) -> Result[None, EventError]

Publish an event to all subscribers via a bounded channel.

Enqueues the event into a per-event-type BoundedChannel; a background drain task dequeues and dispatches it to all handlers. Blocks when the channel is at capacity providing natural backpressure (MAJ-11).

Parameters
ParameterTypeDescription
`event`EventThe event to publish.
Returns
TypeDescription
Result[None, EventError]Ok(None) on successful enqueue. Err(EventHandlerError) when no handlers are registered and allow_no_handlers is False.
async def flush() -> None

Wait until all pending events in every channel have been dispatched.

Yields control repeatedly until every BoundedChannel is empty and all in-progress handler dispatches have completed. Useful in tests.


Wrapper for events with metadata for storage and transmission.

Payload fired after an event handler successfully processes an event.

Attributes: event_type: Fully-qualified type name of the handled event. handler: Qualified name of the handler class or function.


Base class for event handlers.

Event handlers process events and perform side effects. Unlike command handlers, multiple handlers can subscribe to the same event type.

Example

class SendOrderConfirmationHandler(EventHandlerProtocol[OrderCreatedEvent]):
def __init__(self, email_service: EmailService):
self.email_service = email_service
async def handle(self, event: OrderCreatedEvent) -> None:
await self.email_service.send_confirmation(
order_id=event.order_id,
customer_email=event.customer_email
)
async def handle(event: TEvent) -> Result[None, EventError]

Handle an event.

Parameters
ParameterTypeDescription
`event`TEventThe event to handle.
Returns
TypeDescription
Result[None, EventError]``Ok(None)`` on success, ``Err(EventError)`` if handling fails in an expected, recoverable way.

Emitted when the event bus successfully publishes a domain event.

Attributes: source_event_type: Fully-qualified type name of the published event. handler_count: Number of registered handlers that were notified.


Payload fired when an event is published onto the event bus.

Attributes: event_type: Fully-qualified type name of the published event. aggregate_id: Identifier of the aggregate that emitted the event, if known.


AbstractRepository using event sourcing with snapshot acceleration.

This repository:

  1. Loads aggregates using snapshots when available (O(1) vs O(N))
  2. Persists changes as events
  3. Automatically creates snapshots based on policy
  4. Publishes events to the event bus

Example

# Create repository with snapshot support
repository = EventSourcingRepository(
aggregate_type=Order,
event_store=event_store,
snapshot_store=snapshot_store,
event_bus=event_bus,
snapshot_policy=EventCountPolicy(100)
)
# Load aggregate - uses snapshot if available
order = await repository.get(order_id)
# Modify aggregate
order.add_item(product_id, quantity, price)
# Save - stores events, maybe creates snapshot, publishes events
await repository.save(order)

Performance comparison: Without snapshots (1000 events): ~100ms to load With snapshots (1000 events): ~1ms to load (snapshot + ~0 events)

def __init__(
    aggregate_type: type[TAggregate],
    event_store: AbstractEventStore,
    snapshot_store: AbstractSnapshotStore | None = None,
    event_bus: EventBusImpl | None = None,
    snapshot_policy: SnapshotPolicy | None = None,
    snapshot_every: int = 100
)

Initialize the event sourcing repository.

Parameters
ParameterTypeDescription
`aggregate_type`type[TAggregate]The aggregate class to manage
`event_store`AbstractEventStoreEvent store for event persistence
`snapshot_store`AbstractSnapshotStore | NoneSnapshot store (optional, enables O(1) loading)
`event_bus`EventBusImpl | NoneEvent bus for publishing events (optional)
`snapshot_policy`SnapshotPolicy | NoneCustom snapshot policy (default: every N events)
`snapshot_every`intEvents between snapshots (default: 100)
async def get(aggregate_id: UUID | str) -> TAggregate | None

Load an aggregate by ID using snapshot acceleration.

This method:

  1. Tries to load the latest snapshot
  2. Loads only events after the snapshot version
  3. Reconstructs the aggregate
Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
TAggregate | NoneThe aggregate if found, None otherwise
async def save(aggregate: TAggregate) -> None

Save an aggregate’s pending events.

This method:

  1. Persists pending events to the event store
  2. Creates a snapshot if policy triggers
  3. Publishes events to the event bus
  4. Clears pending events from the aggregate
Parameters
ParameterTypeDescription
`aggregate`TAggregateThe aggregate to save
Raises
ExceptionDescription
ConcurrencyErrorIf version conflict detected
async def exists(aggregate_id: UUID | str) -> bool

Check if an aggregate exists.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
boolTrue if the aggregate has any events
async def get_or_raise(aggregate_id: UUID | str) -> TAggregate

Get an aggregate or raise an error if not found.

Parameters
ParameterTypeDescription
`aggregate_id`UUID | strThe aggregate identifier
Returns
TypeDescription
TAggregateThe aggregate
Raises
ExceptionDescription
AggregateNotFoundErrorIf aggregate not found
async def force_snapshot(aggregate: TAggregate) -> None

Force creation of a snapshot for an aggregate.

Useful for bulk operations or maintenance.

Parameters
ParameterTypeDescription
`aggregate`TAggregateThe aggregate to snapshot

Payload fired when an event is persisted to the event store.

Attributes: event_type: Fully-qualified type name of the stored event. stream_id: Identifier of the event stream the event was written to.


Top-level Events configuration.

This consolidated class combines all event system configuration. No TYPE_CHECKING guards or cast() wrappers — all classes defined in this module, so runtime type resolution and dict→model coercion work.

def validate_backend_config() -> list[str]

Validate that the selected backend has a matching config.

Returns a list of validation error messages. Empty list means valid.


CQRS + Event Sourcing: CommandBusProtocol, QueryBusProtocol, EventBusProtocol, EventStoreProtocol, sagas.

Call configure to configure the events subsystem.

Usage

from lexigram.events.config import EventsConfig
from lexigram.events.types import EventStoreBackend
@module(
imports=[
EventsModule.configure(
EventsConfig(event_store_backend=EventStoreBackend.MEMORY)
)
]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: Any | None = None,
    handler_modules: list[str] | None = None
) -> DynamicModule

Create an EventsModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneEventsConfig or ``None`` for framework defaults (in-memory event store).
`handler_modules`list[str] | NonePython module paths to auto-discover event/command handlers from (e.g. ``["myapp.handlers"]``).
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def scope(
    cls,
    *handlers: type
) -> DynamicModule

Scope event/command/query handler classes into a feature module.

Registers the given handler classes as providers so they are resolved by the bus implementations. The parent module graph must already include EventsModule.configure — this does not create a new event store or bus.

Uses the anonymous token pattern so both configure() and scope() can coexist in the same compiled graph without a ModuleDuplicateError.

Example

@module(
imports=[
EventsModule.configure(config),
EventsModule.scope(
CreateOrderHandler,
GetOrderHandler,
OrderShippedHandler,
),
]
)
class OrderFeatureModule(Module):
pass
Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule scoped to this feature.
def stub(cls) -> DynamicModule

Return an in-memory EventsModule for unit testing.

Registers an in-memory event store with no external broker connections. Suitable for testing event handlers and CQRS logic.

Returns
TypeDescription
DynamicModuleA DynamicModule backed by in-memory event storage.

Thin orchestrator that coordinates Events sub-providers (M-01).

Delegates all setup, lifecycle, and container-registration work to:

  • StoreSubProvider
  • BusSubProvider
  • HandlerSubProvider
  • ManagerSubProvider

Public API is unchanged from the monolithic version.

def __init__(
    config: EventsConfig | dict[str, Any] | None = None,
    handler_modules: list[str] | None = None
) -> None
def from_config(
    cls,
    config: EventsConfig,
    **context: Any
) -> EventsProvider

Create EventsProvider from config.

property config() -> EventsConfig
def config(value: EventsConfig) -> None

Allow orchestrator to auto-inject config from LexigramConfig.

property event_store() -> Any
property snapshot_manager() -> Any
property command_bus() -> Any
property query_bus() -> Any
property event_bus() -> Any
property projection_manager() -> Any
property saga_manager() -> Any
async def register(container: ContainerRegistrarProtocol) -> None

Register all events components with the DI container.

async def boot(container: ContainerResolverProtocol) -> None

Start buses, managers, and wire optional tracer.

async def shutdown() -> None

Shutdown all components.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Aggregate health across stores and buses.


Information about a registered handler.

Registry for handler discovery and registration.

The registry provides:

  • Auto-discovery of handlers from modules
  • Registration with command, query, and event buses
  • Tracking of handler-message relationships

Example

# Create registry
registry = HandlerRegistry()
# Discover handlers from modules
registry.discover("app.handlers")
# Register with buses
registry.register_with_buses(
command_bus=command_bus,
query_bus=query_bus,
event_bus=event_bus
)
# Or use DI container for handler instantiation
registry.register_with_container(container)
def __init__() -> None

Initialize the handler registry.

def load_decorators() -> None

Load handlers from the global decorator registry (M-17).

Call this explicitly to register handlers that were annotated with @command_handler, @event_handler, etc.

def register_command_handler(
    command_type: type,
    handler_type: Any
) -> None

Register a command handler.

Parameters
ParameterTypeDescription
`command_type`typeThe command class
`handler_type`AnyThe handler class
def register_query_handler(
    query_type: type,
    handler_type: Any
) -> None

Register a query handler.

Parameters
ParameterTypeDescription
`query_type`typeThe query class
`handler_type`AnyThe handler class
def register_event_handler(
    event_type: type,
    handler_type: Any
) -> None

Register an event handler.

Parameters
ParameterTypeDescription
`event_type`typeThe event class
`handler_type`AnyThe handler class
def discover(
    module_path: str,
    recursive: bool = True
) -> int

Discover handlers from a module or package.

Scans the module for classes that inherit from handler base classes and registers them automatically.

Parameters
ParameterTypeDescription
`module_path`strPython module path (e.g., "app.handlers")
`recursive`boolWhether to scan submodules
Returns
TypeDescription
intNumber of handlers discovered
def register_with_buses(
    command_bus: CommandBusProtocol | None = None,
    query_bus: QueryBusProtocol | None = None,
    event_bus: EventBusProtocol | None = None,
    handler_factory: Callable[[type], Any] | None = None
) -> None

Register all discovered handlers with buses.

Parameters
ParameterTypeDescription
`command_bus`CommandBusProtocol | NoneCommand bus to register with
`query_bus`QueryBusProtocol | NoneQuery bus to register with
`event_bus`EventBusProtocol | NoneEvent bus to register with
`handler_factory`Callable[[type], Any] | NoneFactory function to instantiate handlers
def get_command_handlers() -> dict[type, type[CommandHandlerProtocol[Any, Any]]]

Get all registered command handlers.

def get_query_handlers() -> dict[type, type[QueryHandlerProtocol[Any, Any]]]

Get all registered query handlers.

def get_event_handlers() -> dict[type, list[type[EventHandlerProtocol[Any]]]]

Get all registered event handlers.

def with_defaults(cls) -> HandlerRegistry

Create a registry with no pre-registered handlers.

Returns a fresh instance ready for handler registration via the @event_handler decorator or register() method.

Returns
TypeDescription
HandlerRegistryA new HandlerRegistry instance.

Idempotent command extended with aggregate-targeting helpers.

In-memory event store for testing and development.

M-19: Uses a global log and type index for O(n) and O(matching) streaming performance.

Example

store = InMemoryEventStore()
# Save events
await store.append(
stream_id="order-123",
events=[OrderCreated(...)],
expected_version=0
)
# Load events
events = await store.read("order-123")
def __init__(
    schema_evolution: SchemaEvolution | None = None,
    max_events_per_stream: int = 10000
) -> None

Initialize the in-memory store.

Parameters
ParameterTypeDescription
`schema_evolution`SchemaEvolution | NoneOptional schema evolution handler.
`max_events_per_stream`intMaximum events retained per stream. Oldest events are evicted when the limit is exceeded. Defaults to 10,000 — set to 0 for unbounded.
async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to an in-memory stream with optimistic concurrency.

Returns
TypeDescription
intNew stream version.
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream (M-15).

async def get_stream_version(stream_id: str) -> int

Get the current version of a stream.

async def get_stream_ids() -> list[str]

Get all stream IDs (for testing/debugging).

async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for a stream up to (and including) the given version (MF-04).

async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events across all streams.

M-19: O(n) from position using _global_log — no merging required.

async def stream_by_type(
    event_types: list[str],
    from_position: int = 0
) -> AsyncIterator[Event]

Stream events filtered by type.

M-19: O(matching_events) not O(total_events) using type index.

def get_all_streams() -> list[str]

Get all stream IDs (for testing/debugging).

def get_global_position() -> int

Get current global position.

def clear() -> None

Clear all events (for testing).

async def close() -> None

Close the in-memory event store (no-op).


Configuration for in-memory event store.

In-memory snapshot store for testing and development.

M-14: Uses Snapshot objects throughout (save_snapshot / get_latest / get_by_version). The legacy tuple-based save/load methods are provided by the AbstractSnapshotStore base class via backward-compat shims.

Example

from lexigram.events.types import Snapshot
store = InMemorySnapshotStore()
# Save snapshot
await store.save_snapshot(Snapshot(
aggregate_id="order-123",
aggregate_type="Order",
version=100,
state={"status": "completed", "total": 150.00}
))
# Load latest snapshot
snapshot = await store.get_latest("order-123")
def __init__(max_snapshots_per_aggregate: int = 5) -> None

Initialize the in-memory snapshot store.

Parameters
ParameterTypeDescription
`max_snapshots_per_aggregate`intMaximum snapshots to keep per aggregate
async def save_snapshot(snapshot: Snapshot) -> None

Save a snapshot (M-14: accepts Snapshot object).

async def get_latest(aggregate_id: str) -> Snapshot | None

Load the latest snapshot for an aggregate.

async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get the snapshot for a specific version.

async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

def clear() -> None

Clear all snapshots (for testing).

async def close() -> None

Close the in-memory snapshot store (no-op).


Event for cross-service communication.

Base class for all messages in the Events system.
def with_metadata(metadata: MessageMetadata) -> Message

Create a copy of the message with new metadata.

def with_correlation_id(correlation_id: UUID) -> Message

Create a copy with a correlation ID.

def with_causation_id(causation_id: UUID) -> Message

Create a copy with a causation ID.

property correlation_id() -> UUID | None

Get correlation ID from metadata.

property causation_id() -> UUID | None

Get causation ID from metadata.

def to_dict() -> dict[str, Any]

Convert message to dictionary.

def from_dict(
    cls,
    data: dict[str, Any]
) -> Message

Create message from dictionary.


Metadata attached to messages for tracking and correlation.
def with_correlation(correlation_id: UUID) -> MessageMetadata

Create new metadata with correlation ID.

def with_causation(causation_id: UUID) -> MessageMetadata

Create new metadata with causation ID.

def with_user(user_id: str) -> MessageMetadata

Create new metadata with user ID.

def with_tenant(tenant_id: str) -> MessageMetadata

Create new metadata with tenant ID.

def with_custom(
    key: str,
    value: Any
) -> MessageMetadata

Create new metadata with additional custom field.


Information about registered middleware.

MongoDB connection configuration for store implementations.

Used directly by MongoDBSnapshotStore for motor connection lifecycle management.


MongoDB event store implementation using ``DocumentStoreProtocol``.

Uses the lexigram-nosql collection abstraction for standard CRUD while keeping direct motor access for change streams and transactions.

Features: - Optimistic concurrency with version checking - Global event ordering with sequence counters - Efficient event streaming with cursors - Automatic index creation - Change streams support for real-time events

Example

store = MongoDBEventStore(document_store, config)
await store.connect()
try:
await store.save("order-123", [event], expected_version=0)
finally:
await store.close()
def __init__(
    document_store: DocumentStoreProtocol,
    config: Any,
    event_serializer: Any | None = None
) -> None

Initialize MongoDB event store.

Parameters
ParameterTypeDescription
`document_store`DocumentStoreProtocolA connected ``DocumentStoreProtocol`` (e.g. MongoDBDocumentStore).
`config`AnyEvent store configuration with collection names.
`event_serializer`Any | NoneOptional custom event serializer.
property events() -> CollectionProtocol

Events collection.

property counters() -> CollectionProtocol

Counters collection.

async def connect() -> None

Connect to MongoDB and set up collections.

async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to a stream with optimistic concurrency.

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
`events`list[Event]Events to save.
`expected_version`int | NoneExpected current version.
Returns
TypeDescription
intNew stream version.
Raises
ExceptionDescription
ConcurrencyErrorIf version conflict occurs.
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream.

async def get_stream_version(stream_id: str) -> int

Get current stream version.

Parameters
ParameterTypeDescription
`stream_id`strStream to check.
Returns
TypeDescription
intCurrent version (0 if stream doesn't exist).
async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events from a global position.

async def stream_by_type(
    event_types: list[str],
    from_position: int = 0
) -> AsyncIterator[Event]

Stream events filtered by type.

async def watch_events(event_types: list[str] | None = None) -> AsyncIterator[Event]

Watch for new events using MongoDB change streams.

Note: This requires direct motor access for change stream support. Falls back to polling if the underlying store doesn’t support watch().

Parameters
ParameterTypeDescription
`event_types`list[str] | NoneOptional list of event types to filter.
Yields
TypeDescription
AsyncIterator[Event]New events as they are inserted.
async def find_by_type(
    event_type: str,
    limit: int = 100,
    offset: int = 0
) -> list[Event]

Get events of a specific type.

Parameters
ParameterTypeDescription
`event_type`strEvent type name.
`limit`intMaximum events to return.
`offset`intNumber of events to skip.
Returns
TypeDescription
list[Event]List of events.
async def get_events_count(stream_id: str | None = None) -> int

Get total event count.

Parameters
ParameterTypeDescription
`stream_id`str | NoneOptional stream to count (None for all).
Returns
TypeDescription
intEvent count.
async def delete_stream(stream_id: str) -> int

Delete all events for a stream.

Parameters
ParameterTypeDescription
`stream_id`strStream to delete.
Returns
TypeDescription
intNumber of deleted events.
async def close() -> None

Close the MongoDB connection.


Configuration for MongoDB event store (top-level user config).

Used by EventsConfig.mongodb <lexigram.events.config.EventsConfig.mongodb>. For the store-level connection config used by MongoDBSnapshotStore, see lexigram.events.stores.mongodb.MongoDBConfig.

def validate_connection_string(
    cls,
    v: SecretStr | str
) -> SecretStr

Validate and coerce MongoDB connection string to SecretStr.


MongoDB snapshot store implementation.

This store manages aggregate snapshots for optimized loading.

Example

config = MongoDBConfig(uri="mongodb://localhost:27017")
store = MongoDBSnapshotStore(config)
await store.connect()
# Save snapshot
await store.save(Snapshot(
aggregate_id="order-123",
aggregate_type="Order",
version=100,
state=order.to_dict(),
))
# Load latest
snapshot = await store.get_latest("order-123")
def __init__(config: MongoDBConfig) -> None

Initialize MongoDB snapshot store.

Parameters
ParameterTypeDescription
`config`MongoDBConfigMongoDB configuration.
property snapshots() -> AsyncIOMotorCollection
async def connect() -> None

Connect to MongoDB.

async def save_snapshot(snapshot: Snapshot) -> None

Save a snapshot (M-14 standardized API).

async def get_latest(aggregate_id: str) -> Snapshot | None

Get latest snapshot (M-14 standardized API).

async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get a specific snapshot version (M-14).

async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

Parameters
ParameterTypeDescription
`aggregate_id`str | UUIDAggregate identifier.
`keep_count`intNumber of snapshots to keep.
Returns
TypeDescription
intNumber of deleted snapshots.
async def close() -> None

Close the MongoDB connection.


Container for paginated results.

Attributes: items: The items in the current page. total: Total number of items across all pages. page: Current page number. page_size: Number of items per page.

Example

result = PagedResult(
items=[...],
total=150,
page=2,
page_size=20
)
logger.info(result.total_pages) # 8
logger.info(result.has_next) # True
def __init__(
    items: list[TResult],
    total: int,
    page: int,
    page_size: int
) -> None

Initialize paged result.

Parameters
ParameterTypeDescription
`items`list[TResult]Items in the current page.
`total`intTotal item count.
`page`intCurrent page number.
`page_size`intItems per page.
property total_pages() -> int

Get total number of pages.

property has_next() -> bool

Check if there is a next page.

Returns
TypeDescription
boolTrue if there are more pages after current.
property has_prev() -> bool

Check if there is a previous page.

Returns
TypeDescription
boolTrue if there are pages before current.
def to_dict() -> dict[str, Any]

Convert to dictionary.

Returns
TypeDescription
dict[str, Any]Dictionary representation of paged result.

Query with built-in pagination support.
property offset() -> int

Get query offset based on page.

property limit() -> int

Get query limit.


PostgreSQL connection configuration.

Attributes: dsn: Database connection string (treated as a secret). pool_min_size: Minimum connection pool size. pool_max_size: Maximum connection pool size. command_timeout: Command timeout in seconds. events_table: Name of the events table. snapshots_table: Name of the snapshots table. auto_create_tables: Whether to create tables automatically.

def validate_dsn(
    cls,
    v: SecretStr | str | None
) -> SecretStr | None

Validate and normalize the DSN. Returns None when not provided.


Emitted when a read-model projection is rebuilt or updated.

Attributes: projection_name: Canonical name of the projection that was updated. events_processed: Number of domain events processed in this update.


Represents a request for data.

Configuration for query bus.

Query bus for dispatching queries.

Handlers are registered explicitly via register() or through HandlerRegistry.register_with_buses() during provider registration.

Example

# Queries are automatically routed to handlers
orders = await bus.execute(GetOrdersByCustomerQuery(
customer_id="cust-123",
status="pending"
))
def __init__(
    middlewares: list[Any] | None = None,
    config: Any | None = None
) -> None

Initialize the query bus.

async def execute(query: Query[TResult]) -> TResult

Execute a query.

Parameters
ParameterTypeDescription
`query`Query[TResult]The query to execute.
Returns
TypeDescription
TResultQuery result.
Raises
ExceptionDescription
HandlerNotFoundErrorIf no handler is registered.
QueryExecutionErrorIf query execution fails.

Base class for query handlers.

Query handlers process queries and return results. Each query type should have exactly one handler. Queries should not modify state.

Example

class GetOrdersHandler(QueryHandlerProtocol[GetOrdersQuery, list[OrderDTO]]):
def __init__(self, read_model: OrderReadModel):
self.read_model = read_model
async def handle(self, query: GetOrdersQuery) -> list[OrderDTO]:
return await self.read_model.get_orders(
customer_id=query.customer_id,
status=query.status
)
async def handle(query: TQuery) -> TResult

Handle a query.

Parameters
ParameterTypeDescription
`query`TQueryThe query to handle
Returns
TypeDescription
TResultQuery result

Result of query execution.

Redis-backed AbstractEventStore for lightweight deployments.

Uses the StateStoreProtocol protocol for storage, so no direct Redis dependency is introduced into this package. Suitable for single-node or low-throughput deployments where a full relational or document store is not required.

Key layout

{namespace}:{stream_id}:events — JSON array of serialised event dicts
{namespace}:{stream_id}:version — current integer version for the stream
{namespace}:_streams — JSON array of all known stream IDs

Example

from lexigram.events.stores.redis import RedisEventStore
store = RedisEventStore(state_store, namespace="myapp:events")
await store.append("order-123", [OrderCreated(...)])
events = await store.read("order-123")
def __init__(
    store: StateStoreProtocol,
    namespace: str = 'lexigram:events'
) -> None

Initialise the Redis-backed event store.

Parameters
ParameterTypeDescription
`store`StateStoreProtocolA StateStoreProtocol instance (e.g. a Redis-backed implementation).
`namespace`strKey prefix used for all stored keys.
async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to stream_id with optimistic concurrency control.

Parameters
ParameterTypeDescription
`stream_id`strUnique stream identifier.
`events`list[Event]Events to append. Empty list is a no-op.
`expected_version`int | NoneIf provided, the current stream version must equal this value or ConcurrencyError is raised.
Returns
TypeDescription
intNew stream version after the append.
Raises
ExceptionDescription
ConcurrencyErrorIf *expected_version* does not match the actual current version of the stream.
async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from stream_id.

Parameters
ParameterTypeDescription
`stream_id`strStream to read from.
`from_version`intLowest version to include (inclusive, default 0 = all).
`to_version`int | NoneHighest version to include (inclusive).
`limit`int | NoneMaximum number of events to return.
Returns
TypeDescription
list[Event]Ordered list of Event objects.
async def load(
    stream_id: str,
    after_version: int = 0
) -> list[Event]

Load events after after_version (non-inclusive lower bound).

This alias is used by several base-class helpers.

Parameters
ParameterTypeDescription
`stream_id`strStream to load events from.
`after_version`intEvents with version <= this value are excluded.
Returns
TypeDescription
list[Event]Ordered list of events.
async def get_stream_version(stream_id: str) -> int

Return the current version of stream_id (0 if the stream is empty).

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
Returns
TypeDescription
intCurrent integer version.
async def get_version(stream_id: str) -> int

Public alias for get_stream_version.

Parameters
ParameterTypeDescription
`stream_id`strStream identifier.
Returns
TypeDescription
intCurrent integer version.
async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events across all known streams.

Note: Cross-stream global ordering is not guaranteed by this implementation. Streams are iterated in registration order and events within each stream are returned in version order. For strict global ordering, use the PostgreSQL or MongoDB stores.

Parameters
ParameterTypeDescription
`from_position`intEvents with ``sequence_number`` <= this value are skipped.
`batch_size`intUnused; present for interface compatibility.
`partition`int | NoneOptional partition index for sharding.
`total_partitions`int | NoneTotal number of partitions.
Yields
TypeDescription
AsyncIterator[Event]Events in stream-registration order, version order within each stream.
async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for stream_id up to and including up_to_version.

Parameters
ParameterTypeDescription
`stream_id`strStream to compact.
`up_to_version`intEvents at or below this version are removed.
Returns
TypeDescription
intNumber of events deleted.
async def close() -> None

Close the event store.

No-op for this implementation; the underlying StateStoreProtocol manages its own lifecycle.


Aggregate state snapshot for performance optimization.

Configuration for snapshotting behavior.

Manages snapshot creation and retrieval for aggregates.

This is the CRITICAL component that transforms O(N) aggregate loading into O(1) by storing and loading snapshots.

Example

manager = SnapshotManager(
event_store=event_store,
snapshot_store=snapshot_store,
policy=EventCountPolicy(100)
)
# Load aggregate with snapshot acceleration
state, events = await manager.load_with_snapshot(
aggregate_id="order-123",
aggregate_type="Order"
)
# Auto-snapshot after save if policy triggers
await manager.save_and_maybe_snapshot(
aggregate_id="order-123",
aggregate_type="Order",
events=[...],
current_state=state,
expected_version=100
)
def __init__(
    event_store: EventStoreProtocol,
    snapshot_store: SnapshotStoreProtocol,
    policy: SnapshotPolicy | None = None,
    config: SnapshotConfig | None = None
)

Initialize the snapshot manager.

Parameters
ParameterTypeDescription
`event_store`EventStoreProtocolThe event store instance
`snapshot_store`SnapshotStoreProtocolThe snapshot store instance
`policy`SnapshotPolicy | NoneSnapshot creation policy (default: EventCountPolicy(100))
`config`SnapshotConfig | NoneSnapshot configuration
async def load_with_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str
) -> tuple[dict[str, Any] | None, list[Any], int]
async def save_and_maybe_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str,
    events: list[Any],
    current_state: dict[str, Any],
    expected_version: int
) -> bool

Save events and create snapshot if policy triggers.

Returns
TypeDescription
boolTrue if a snapshot was created
async def create_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str,
    version: int,
    state: dict[str, Any]
) -> Snapshot

Create and store a snapshot.

async def force_snapshot(
    aggregate_id: str | UUID,
    aggregate_type: str,
    state_extractor: Callable[[], dict[str, Any]]
) -> Snapshot

Force creation of a snapshot regardless of policy.

async def cleanup_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int | None = None
) -> int

Clean up old snapshots for an aggregate.


SQLite configuration for event stores.

SQLite event store implementation.

This store uses the generic DatabaseProviderProtocol for async database access. Suitable for development, testing, and single-instance deployments.

def __init__(
    config: SqliteConfig | None = None,
    provider: DatabaseProviderProtocol | None = None,
    event_serializer: Any | None = None
) -> None

Initialize SQLite event store.

Parameters
ParameterTypeDescription
`config`SqliteConfig | NoneSQLite configuration.
`provider`DatabaseProviderProtocol | NoneThe generic database provider injected via DI.
`event_serializer`Any | NoneOptional custom event serializer.
def register_event_type(
    event_type: str,
    event_class: type
) -> None

Register an event type for proper deserialization.

async def connect() -> None

Connection is managed by the provider. Tables are created if configured.

async def append(
    stream_id: str,
    events: list[Event],
    expected_version: int | None = None
) -> int

Append events to a stream with optimistic concurrency.

async def read(
    stream_id: str,
    from_version: int = 0,
    to_version: int | None = None,
    limit: int | None = None
) -> list[Event]

Read events from a stream (M-15).

async def get_stream_version(stream_id: str) -> int

Get current stream version.

async def stream_all(
    from_position: int = 0,
    batch_size: int = 100,
    partition: int | None = None,
    total_partitions: int | None = None
) -> AsyncIterator[Event]

Stream all events from a global position.

async def stream_by_type(
    event_types: list[str],
    from_position: int = 0
) -> AsyncIterator[Event]

Stream events filtered by type.

async def find_by_type(
    event_type: str,
    count: int = 100,
    start: int = 0
) -> list[Event]

Get events of a specific type.

async def get_events_count(stream_id: str | None = None) -> int

Get total event count.

async def delete_stream(stream_id: str) -> int

Delete all events for a stream.

async def get_stream_ids() -> list[str]

Get all stream IDs in the store.

async def compact(
    stream_id: str,
    up_to_version: int
) -> int

Purge events for a stream up to (and including) the given version (MF-04).

async def close() -> None

Close the database connection.


SQLite snapshot store implementation.

This store manages aggregate snapshots using a generic DatabaseProviderProtocol.

def __init__(
    config: SqliteConfig | None = None,
    provider: DatabaseProviderProtocol | None = None
) -> None

Initialize SQLite snapshot store.

Parameters
ParameterTypeDescription
`config`SqliteConfig | NoneSQLite configuration.
`provider`DatabaseProviderProtocol | NoneThe generic database provider injected via DI.
async def connect() -> None

Connection is managed by the provider.

async def save_snapshot(snapshot: Snapshot) -> None

Save a snapshot (M-14 standardized API).

async def get_latest(aggregate_id: str) -> Snapshot | None

Get latest snapshot (M-14 standardized API).

async def get_by_version(
    aggregate_id: str | UUID,
    version: int
) -> Snapshot | None

Get a specific snapshot version (M-14).

async def delete_old_snapshots(
    aggregate_id: str | UUID,
    keep_count: int = 3
) -> int

Delete old snapshots, keeping only the most recent ones.

async def close() -> None

Close the database connection.


Information about an event stream.

Base class for value objects.

Value objects:

  • Are immutable (frozen=True)
  • Have no identity
  • Are compared by value (all fields)
  • Can be freely shared

Delivers domain events to registered external webhook endpoints.

Signs each outbound POST with a cryptographic signature via the injected WebhookSignatureVerifierProtocol so that receivers can verify authenticity. Retries transient 5xx failures according to retry_policy. Delivery failures for one endpoint are logged and do not prevent fan-out to remaining endpoints.

Example

dispatcher = WebhookDispatcher(
http_client=http_client,
signature_verifier=HMACWebhookVerifier(),
retry_policy=RetryConfig(max_attempts=3),
)
await dispatcher.dispatch(user_created_event, registered_endpoints)
def __init__(
    http_client: HTTPClientProtocol,
    signature_verifier: WebhookSignatureVerifierProtocol,
    retry_policy: RetryConfig | None = None
) -> None

Initialise the dispatcher.

Parameters
ParameterTypeDescription
`http_client`HTTPClientProtocolHTTP client used for outbound POST requests.
`signature_verifier`WebhookSignatureVerifierProtocolUsed to compute the outbound HMAC signature included in ``X-Webhook-Signature``.
`retry_policy`RetryConfig | NoneRetry configuration; defaults to 3 attempts with exponential backoff when not provided.
async def dispatch(
    event: DomainEvent,
    endpoints: list[WebhookEndpoint]
) -> None

Sign and POST event to each eligible registered endpoint.

Only endpoints whose event_types filter matches the event are delivered to. Each delivery is retried per retry_policy on transient (5xx) failures. Failures are logged but do not abort delivery to remaining endpoints.

Parameters
ParameterTypeDescription
`event`DomainEventDomain event to fan-out.
`endpoints`list[WebhookEndpoint]Registered endpoints to consider for delivery.

An external endpoint registered to receive domain events.

Attributes: url: HTTP(S) URL to POST events to. secret: Shared secret used to compute the HMAC signature. event_types: Set of event type names this endpoint subscribes to. When None, all events are delivered.

def accepts(event: DomainEvent) -> bool

Return True when this endpoint should receive event.

Parameters
ParameterTypeDescription
`event`DomainEventDomain event to test against the filter.
Returns
TypeDescription
bool``True`` when ``event_types`` is ``None`` (all events) or the event's type name is contained in ``event_types``.

def clear_handler_registry() -> None

Clear all registered handlers from the global handler registry.

Intended for use in tests to reset handler state between test runs. Clears both the HandlerRegistry instances and the global decorator registry.


def command_handler(
    command_type: type[Command],
    *,
    name: str | None = None,
    **metadata: Any
) -> Callable[[Callable[P, R]], Callable[P, R]]

Decorator to register a function as a command handler.

Parameters
ParameterTypeDescription
`command_type`type[Command]The Command class this handler handles.
`name`str | NoneOptional custom name for the handler. **metadata: Additional metadata to attach to the handler.
Returns
TypeDescription
Callable[[Callable[P, R]], Callable[P, R]]Decorator function.

Example

@command_handler(CreateUserCommand)
async def handle_create_user(command: CreateUserCommand) -> str:
user = await user_service.create(command.username, command.email)
return user.user_id

def event_handler(
    event_type: type[Event],
    *,
    name: str | None = None,
    priority: int = 0,
    **metadata: Any
) -> Callable[[Callable[P, R]], Callable[P, R]]

Decorator to register a function as an event handler.

Parameters
ParameterTypeDescription
`event_type`type[Event]The Event class this handler handles.
`name`str | NoneOptional custom name for the handler.
`priority`intHandler priority (higher = executed first). **metadata: Additional metadata to attach to the handler.
Returns
TypeDescription
Callable[[Callable[P, R]], Callable[P, R]]Decorator function.

Example

@event_handler(UserCreatedEvent, priority=10)
async def send_welcome_email(event: UserCreatedEvent) -> None:
await email_service.send_welcome(event.email)

def query_handler(
    query_type: type[Query],
    *,
    name: str | None = None,
    cacheable: bool = False,
    cache_ttl: int | None = None,
    **metadata: Any
) -> Callable[[Callable[P, R]], Callable[P, R]]

Decorator to register a function as a query handler.

Parameters
ParameterTypeDescription
`query_type`type[Query]The Query class this handler handles.
`name`str | NoneOptional custom name for the handler.
`cacheable`boolWhether results can be cached.
`cache_ttl`int | NoneCache time-to-live in seconds. **metadata: Additional metadata to attach to the handler.
Returns
TypeDescription
Callable[[Callable[P, R]], Callable[P, R]]Decorator function.

Example

@query_handler(GetUserQuery, cacheable=True, cache_ttl=300)
async def handle_get_user(query: GetUserQuery) -> UserDTO:
return await user_repository.get(query.user_id)

Raised when aggregate is not found.

Command execution error.
def __init__(
    message: str = 'Command execution failed',
    command_type: str | None = None,
    error: str | None = None,
    **kwargs: Any
) -> None

Concurrency/optimistic locking error.

Raised when event version conflicts indicate concurrent modifications.

def __init__(
    message: str = 'Concurrency error',
    expected_version: int | None = None,
    actual_version: int | None = None,
    **kwargs: Any
) -> None

Duplicate handler registration.

Raised when attempting to register a handler for a type that already has one.

def __init__(
    message: str = 'Duplicate handler',
    message_type: str | None = None,
    **kwargs: Any
) -> None

Base events/messaging error.
def __init__(
    message: str = 'Events error',
    **kwargs: Any
) -> None

Raised when event loading fails.
def __init__(
    message: str = 'Event load error',
    **kwargs: Any
) -> None

Raised when event persistence fails.

Raised when event store connection fails.
def __init__(
    message: str = 'Event store connection error',
    **kwargs: Any
) -> None

Raised for event store errors.
def __init__(
    message: str = 'Event store error',
    **kwargs: Any
) -> None

Event/command handler not found.

Raised when no handler is registered for a specific event or command type.

def __init__(
    message: str = 'Handler not found',
    handler_type: str | None = None,
    message_type: str | None = None,
    **kwargs: Any
) -> None

Raised when projection building fails.

Raised when a projection is not found.

Query execution error.
def __init__(
    message: str = 'Query execution failed',
    query_type: str | None = None,
    error: str | None = None,
    **kwargs: Any
) -> None

Raised for schema-related errors.

Raised for security-related errors.

Raised when stream is not found.

Parameters mirror the helper used in tests: (stream_type, stream_id).

def __init__(
    stream_type: str,
    stream_id: str,
    message: str | None = None,
    **kwargs: Any
) -> None

Raised for streaming-related errors.

Raised when an outbound webhook delivery fails after all retry attempts.

Carries the target url and the HTTP status code returned by the remote endpoint so callers can log structured failure context.

def __init__(
    url: str,
    status: int,
    message: str = 'Webhook delivery failed',
    **kwargs: Any
) -> None