API Reference
Protocols
Section titled “Protocols”CommandBusProtocol
Section titled “CommandBusProtocol”Protocol for command bus implementations.
Example
class CommandBusProtocol: async def dispatch(self, command: Command) -> Any: handler = self._handlers[type(command)] return await handler.handle(command)Dispatch a command to its handler.
| Parameter | Type | Description |
|---|---|---|
| `command` | Any | Command to dispatch. |
| Type | Description |
|---|---|
| Any | Result from the command handler. |
DomainEventPublisherProtocol
Section titled “DomainEventPublisherProtocol”Protocol for publishing domain events.
Publish a domain event.
| Parameter | Type | Description |
|---|---|---|
| `event` | Any | DomainEvent instance. |
EventBusProtocol
Section titled “EventBusProtocol”Protocol for event bus implementations.
The event bus manages event publication and subscription.
Example
```pythonclass InMemoryEventBus: async def publish(self, event: DomainEvent) -> "Result[None, EventError]": for handler in self._handlers.get(type(event), []): await handler.handle(event) return Ok(None)
def subscribe(self, event_type, handler): self._handlers.setdefault(event_type, []).append(handler)<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'><div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>async </span><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>publish</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>Any</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L97' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Publish an event to all subscribers.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Any</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>DomainEvent to publish.</td></tr></tbody></table></div>
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Returns</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:45%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>Result<span style='color: var(--lex-color-colon)'>[</span><span style='color: var(--lex-color-default) !important'>None</span><span style='color: var(--lex-color-colon)'>,</span> <a href='/packages/events/lexigram-events/api/#eventerror' style='color:inherit; text-decoration:underline; text-decoration-color:rgba(128,128,128,0.3); text-underline-offset:2px;'>EventError</a><span style='color: var(--lex-color-colon)'>]</span></td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Ok(None) when the event is successfully enqueued for dispatch. Err(EventError) when the event cannot be accepted (e.g.\ no handlers registered and the bus requires at least one).</td></tr></tbody></table></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>subscribe</span><span style='color: var(--lex-color-colon)'>(</span> <span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span> <span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>EventHandlerProtocol</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L110' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Subscribe a handler to an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to subscribe to.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>EventHandlerProtocol</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to call when event is published.</td></tr></tbody></table></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>unsubscribe</span><span style='color: var(--lex-color-colon)'>(</span> <span style='color: var(--lex-color-name)'>event_type</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>type</span><span style='color: var(--lex-color-colon)'>,</span> <span style='color: var(--lex-color-name)'>handler</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>EventHandlerProtocol</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-default) !important'>None</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L119' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
Remove a handler subscription for an event type.
<div style='margin:0;line-height:1.4;'><span style='display:block;font-size:0.7em;font-weight:700;letter-spacing:0.07em;text-transform:uppercase;color:var(--color-brand);margin-top:1rem;margin-bottom:0.4rem;'>Parameters</span><table style='border-collapse:collapse;width:100%;font-size:0.85em;margin:0;margin-bottom:1rem;table-layout:fixed;'><thead><tr><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:20%;'>Parameter</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);width:25%;'>Type</th><th style='text-align:left;padding:0.4rem 0.5rem;color:var(--color-text-strong);font-weight:600;font-size:0.82em;border-bottom:1px solid var(--color-border-weak);padding-left:1.2rem;border-left:1px solid var(--color-border-weak);width:55%;'>Description</th></tr></thead><tbody><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`event_type`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>type</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Type of event to unsubscribe from.</td></tr><tr><td style='padding:0.6rem 0.5rem;vertical-align:top;white-space:nowrap;font-family:var(--sl-font-mono);font-size:0.85em;color:var(--lex-color-name);border-bottom:1px solid var(--color-border-weak);'>`handler`</td><td style='padding:0.6rem 0.5rem;vertical-align:top;color:var(--lex-color-type) !important;font-family:var(--sl-font-mono);font-size:0.82em;border-bottom:1px solid var(--color-border-weak);'>EventHandlerProtocol</td><td style='padding:0.6rem 0.5rem 0.6rem 1.2rem;vertical-align:top;font-size:0.9em;font-family:var(--sl-font-mono);color:var(--color-text-weak);border-left:1px solid var(--color-border-weak);border-bottom:1px solid var(--color-border-weak);'>Handler to remove.</td></tr></tbody></table></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventFilterProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L22' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>Predicate that determines whether an event should be dispatched.
<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'><div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>matches</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>DomainEvent</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>bool</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L25' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventSerializerProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L14' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>Serializes and deserializes domain events for transport.
<div style='padding-left: 1rem; border-left: 1px solid var(--sl-color-gray-5); margin-top: 2rem; margin-bottom: 2rem;'><div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>serialize</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>event</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>DomainEvent</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>bytes</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L17' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
<div style='background: var(--color-background-weak); padding: 0.6rem 1rem; border-radius: 4px; border-left: 2px solid var(--color-border-weak); margin-bottom: 1rem; display: flex; justify-content: space-between; align-items: flex-start;'><pre style='margin: 0; font-family: var(--sl-font-mono); font-size: 0.875em; line-height: 1.6; white-space: pre-wrap; word-break: break-all; flex: 1;'><span style='color: var(--lex-color-keyword)'>def </span><span style='color: var(--lex-color-fname); font-weight: 600'>deserialize</span><span style='color: var(--lex-color-colon)'>(</span><span style='color: var(--lex-color-name)'>data</span><span style='color: var(--lex-color-colon)'>: </span><span style='color: var(--lex-color-type)'>bytes</span><span style='color: var(--lex-color-colon)'>)</span><span style='color: var(--lex-color-keyword)'> -> </span><span style='color: var(--lex-color-return)'>DomainEvent</span></pre><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-events/src/lexigram/events/protocols.py#L18' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>
</div>
<hr style='border:none;border-top:2px solid rgba(99,102,241,0.45);margin:1.75rem 0 0 0;' />
<div data-pagefind-weight='10'>
### `EventStoreProtocol`
</div>
<span data-api-type='Protocols' style='display:none;'></span>
<div style='display: flex; justify-content: flex-end; margin-top: -1rem; margin-bottom: 1rem;'><a href='https://github.com/dbtinoy-/lexigram/blob/main/lexigram/lexigram-contracts/src/lexigram/contracts/events/protocols.py#L253' target='_blank' title='View Source' style='color: var(--sl-color-gray-3); opacity: 0.6; flex-shrink: 0; margin-left: 0.75rem; display: inline-flex; align-items: center; text-decoration: none;' onmouseover="this.style.opacity='1'" onmouseout="this.style.opacity='0.6'"><svg xmlns='http://www.w3.org/2000/svg' width='15' height='15' viewBox='0 0 24 24' fill='currentColor'><path d='M12 0C5.374 0 0 5.373 0 12c0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23A11.509 11.509 0 0 1 12 5.803c1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 C20.566 21.797 24 17.3 24 12c0-6.627-5.373-12-12-12z'/></svg></a></div>Protocol for event store implementations.
The event store persists and retrieves domain events.
**Example**
```pythonclass PostgresEventStore: async def append(self, stream_id, events, expected_version=None): async with self.db.transaction(): for event in events: await self.db.insert("events", event.to_dict())Append events to a stream.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Unique stream identifier. |
| `events` | list[Any] | List of events to append. |
| `expected_version` | int | None | Expected stream version for optimistic concurrency. |
| Type | Description |
|---|---|
| int | New stream version. |
| Exception | Description |
|---|---|
| ConcurrencyError | If expected version doesn't match. |
Read events from a stream.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Unique stream identifier. |
| `start` | int | Starting position. |
| `count` | int | None | Maximum events to read. |
| Type | Description |
|---|---|
| list[Any] | List of events. |
Read events from all streams.
| Parameter | Type | Description |
|---|---|---|
| `position` | int | Starting global position. |
| `count` | int | None | Maximum events to read. |
| Type | Description |
|---|---|
| list[Any] | List of events. |
MultiEventHandlerProtocol
Section titled “MultiEventHandlerProtocol”Protocol for handlers that handle multiple event types.
Get list of event types handled.
| Type | Description |
|---|---|
| list[type] | List of event classes. |
async def handle(event: Any) -> Result[None, EventError]
Handle an event.
| Parameter | Type | Description |
|---|---|---|
| `event` | Any | DomainEvent to handle. |
| Type | Description |
|---|---|
| Result[None, EventError] | ``Ok(None)`` on success, ``Err(EventError)`` if handling fails in an expected, recoverable way. |
ProjectionProtocol
Section titled “ProjectionProtocol”ProjectionProtocol protocol for building read models from events.
Apply an event to the projection state.
| Parameter | Type | Description |
|---|---|---|
| `event` | Any | Domain event to apply. |
QueryBusProtocol
Section titled “QueryBusProtocol”Protocol for query bus implementations.
Example
class QueryBusProtocol: async def execute(self, query: Query) -> Any: handler = self._handlers[type(query)] return await handler.handle(query)Execute a query through its handler.
| Parameter | Type | Description |
|---|---|---|
| `query` | Any | Query to execute. |
| Type | Description |
|---|---|
| Any | Query result. |
ReadOnlyRepositoryProtocol
Section titled “ReadOnlyRepositoryProtocol”Protocol for read-only repository operations.
Use this for query-only access patterns (CQRS query side).
Example
class UserQueryRepository: async def get(self, id: str) -> User | None: return await self.db.query("users").where(id=id).first()
async def list(self, skip: int = 0, limit: int = 100) -> list[User]: return await self.db.query("users").offset(skip).limit(limit).all()Get entity by ID.
| Parameter | Type | Description |
|---|---|---|
| `item_id` | str | Entity identifier. |
| Type | Description |
|---|---|
| T | None | Entity if found, None otherwise. |
List entities with pagination.
| Parameter | Type | Description |
|---|---|---|
| `skip` | int | Number of records to skip. |
| `limit` | int | Maximum records to return. **filters: Optional filter criteria. |
| Type | Description |
|---|---|
| list[T] | List of entities. |
async def find_by_spec(spec: SpecificationProtocol[T]) -> list[T]
Find entities matching a complex specification.
| Parameter | Type | Description |
|---|---|---|
| `spec` | SpecificationProtocol[T] | The DDD specification to evaluate. |
| Type | Description |
|---|---|
| list[T] | List of matching entities. |
Count entities matching filters.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| int | Total count of matching entities. |
RepositoryProtocol
Section titled “RepositoryProtocol”Protocol for full repository operations.
Extends ReadOnlyRepositoryProtocol with write operations.
Example
class UserRepository: async def save(self, entity: User) -> User: if entity.id: await self.db.update("users", entity.dict()).where(id=entity.id) else: entity.id = await self.db.insert("users", entity.dict()) return entity
async def delete(self, id: str) -> bool: result = await self.db.delete("users").where(id=id) return result.affected_rows > 0Save (create or update) an entity.
| Parameter | Type | Description |
|---|---|---|
| `entity` | T | Entity to save. |
| Type | Description |
|---|---|
| T | Saved entity with any generated fields populated. |
Delete entity by ID.
| Parameter | Type | Description |
|---|---|---|
| `item_id` | str | Entity identifier. |
| Type | Description |
|---|---|
| bool | True if deleted, False if not found. |
Save (create or update) multiple entities in a single operation.
Implementations SHOULD execute this as a batch for efficiency.
| Parameter | Type | Description |
|---|---|---|
| `entities` | list[T] | Entities to save. |
| Type | Description |
|---|---|
| list[T] | Saved entities with any generated fields populated. |
Delete multiple entities by ID.
Implementations SHOULD execute this as a batch for efficiency.
| Parameter | Type | Description |
|---|---|---|
| `item_ids` | list[str] | Entity identifiers to delete. |
| Type | Description |
|---|---|
| int | Number of entities actually deleted. |
SagaManagerProtocol
Section titled “SagaManagerProtocol”Protocol for saga lifecycle management.
The manager routes incoming events to all registered sagas and coordinates their execution.
Process an event through all relevant sagas.
| Parameter | Type | Description |
|---|---|---|
| `event` | Any | Domain event to route and process. |
SagaProtocol
Section titled “SagaProtocol”Protocol for saga / process-manager implementations.
Sagas coordinate long-running business processes that span multiple aggregates by reacting to domain events and dispatching commands.
Handle a domain event and produce commands.
| Parameter | Type | Description |
|---|---|---|
| `event` | Any | Domain event to handle. |
| Type | Description |
|---|---|
| list[Any] | List of commands to dispatch to the command bus. |
SnapshotStoreProtocol
Section titled “SnapshotStoreProtocol”Protocol for aggregate snapshot storage.
Save an aggregate snapshot.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | str | Aggregate identifier. |
| `snapshot` | Any | Aggregate state snapshot. |
| `version` | int | Aggregate version at snapshot. |
Load the latest snapshot.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | str | Aggregate identifier. |
| Type | Description |
|---|---|
| tuple[Any, int] | None | Tuple of (snapshot, version) or None if not found. |
Classes
Section titled “Classes”AbstractEventStore
Section titled “AbstractEventStore”Abstract base class for event storage.
Inherits stream/cursor/replay helpers from EventStreamMixin. Snapshot storage is provided by AbstractSnapshotStore.
Example
# Append eventsawait store.append( stream_id="order-123", events=[OrderCreated(...), ItemAdded(...)], expected_version=0)
# Read eventsevents = await store.read("order-123")Initialise event store, optionally with schema evolution (M-03).
async def append( stream_id: str, events: list[Event], expected_version: int | None = None ) -> int
Append events to a stream with optimistic concurrency.
| Type | Description |
|---|---|
| int | New stream version. |
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Unique identifier for the event stream |
| `events` | list[Event] | List of events to save |
| `expected_version` | int | None | Expected current version for concurrency control |
| Exception | Description |
|---|---|
| ConcurrencyError | If version conflict occurs |
| EventPersistenceError | If save operation fails |
async def read( stream_id: str, from_version: int = 0, to_version: int | None = None, limit: int | None = None ) -> list[Event]
Read events from a stream (M-15).
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to load events from |
| `from_version` | int | Starting version (inclusive) |
| `to_version` | int | None | Ending version (inclusive) |
| `limit` | int | None | Max number of events to read |
| Type | Description |
|---|---|
| list[Event] | List of events in chronological order |
| Exception | Description |
|---|---|
| EventLoadError | If load operation fails |
Get the current version of a stream.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to check |
| Type | Description |
|---|---|
| int | Current version (0 if stream doesn't exist) |
Check if a stream exists.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to check |
| Type | Description |
|---|---|
| bool | True if stream has events |
async def get_stream_info(stream_id: str) -> StreamInfo | None
Get information about a stream.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to get info for |
| Type | Description |
|---|---|
| StreamInfo | None | StreamInfo or None if stream doesn't exist |
async def stream_all( from_position: int = 0, batch_size: int = 100, partition: int | None = None, total_partitions: int | None = None ) -> AsyncIterator[Event]
Stream all events from the store in global order.
| Parameter | Type | Description |
|---|---|---|
| `from_position` | int | Starting global sequence number. |
| `batch_size` | int | Number of events to fetch per batch. |
| `partition` | int | None | Optional partition index (0 to total_partitions - 1). |
| `total_partitions` | int | None | Total number of partitions for sharding. |
Purge events for a stream up to (and including) the given version (MF-04).
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream identifier. |
| `up_to_version` | int | Maximum version to delete. |
| Type | Description |
|---|---|
| int | Number of events purged. |
async def find_by_aggregate(aggregate_id: str) -> list[Event]
Get all events for an aggregate (for reconstitution).
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | str | Aggregate identifier (same as stream_id). |
| Type | Description |
|---|---|
| list[Event] | All events for the aggregate in chronological order. |
Close the event store and release resources.
AbstractMiddleware
Section titled “AbstractMiddleware”Abstract base for middleware components.
Middleware wraps around handlers to add cross-cutting concerns like logging, validation, transactions, etc.
Example
class TimingMiddleware(AbstractMiddleware[Message, Any]): async def __call__( self, message: Message, next_handler: NextHandler ) -> Any: start = time.time() result = await next_handler(message) duration = time.time() - start logger.info("Handler took %.3fs", duration) return resultAbstractReadOnlyRepository
Section titled “AbstractReadOnlyRepository”Read-only repository interface.
Useful for query-side repositories in CQRS.
Get an aggregate by ID.
Check if an aggregate exists.
Get all aggregates with pagination.
AbstractRepository
Section titled “AbstractRepository”Abstract base repository for aggregate persistence.
Repositories provide collection-like access to aggregates, abstracting away the persistence mechanism.
Example
class OrderRepository(AbstractRepository[Order]): async def get(self, id: UUID) -> Order | None: ...
async def save(self, aggregate: Order) -> None: ...Get an aggregate by ID.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | UUID | str | The aggregate identifier |
| Type | Description |
|---|---|
| TAggregate | None | The aggregate if found, None otherwise |
Save an aggregate.
This persists any pending changes and clears the pending events.
| Parameter | Type | Description |
|---|---|---|
| `aggregate` | TAggregate | The aggregate to save |
Check if an aggregate exists.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | UUID | str | The aggregate identifier |
| Type | Description |
|---|---|
| bool | True if the aggregate exists |
AbstractSnapshotStore
Section titled “AbstractSnapshotStore”Abstract base class for snapshot storage.
Provides optimised aggregate loading by storing periodic snapshots. Avoids replaying the entire event history for aggregates with many events.
Strategy: 1. Load the latest snapshot (if any). 2. Load events after the snapshot version. 3. Apply those events to the snapshot state.
This reduces loading from O(N) to O(1) + O(events since snapshot).
Example (M-14 — uses Snapshot object throughout)
from lexigram.events.types import Snapshot
# Save a snapshotawait store.save(Snapshot( aggregate_id=order.id, aggregate_type="Order", version=order.version, state=order.to_dict()))
# Load the latest snapshotsnapshot = await store.get_latest("order-123")if snapshot: order = Order.from_snapshot(snapshot.state) events = await event_store.read("order-123", start=snapshot.version)async def save_snapshot(snapshot: Snapshot) -> None
Save an aggregate snapshot.
| Parameter | Type | Description |
|---|---|---|
| `snapshot` | Snapshot | Snapshot object containing aggregate_id, version, and state. |
async def get_latest(aggregate_id: str) -> Snapshot | None
Load the latest snapshot for an aggregate.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | str | Aggregate to load snapshot for. |
| Type | Description |
|---|---|
| Snapshot | None | Snapshot object or None if no snapshot exists. |
async def get_by_version( aggregate_id: str | UUID, version: int ) -> Snapshot | None
Get a specific snapshot by version.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | str | UUID | Aggregate identifier. |
| `version` | int | Exact version to retrieve. |
| Type | Description |
|---|---|
| Snapshot | None | Snapshot at that version, or None. |
Delete old snapshots, keeping only the most recent ones.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | str | UUID | ID of the aggregate. |
| `keep_count` | int | Number of snapshots to retain (default: 3). |
| Type | Description |
|---|---|
| int | Number of snapshots deleted. |
Close the snapshot store and release resources.
AggregateFactoryProtocol
Section titled “AggregateFactoryProtocol”Factory for creating aggregates.
Create a new aggregate instance.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | UUID | str | Aggregate ID. |
| Type | Description |
|---|---|
| TAggregate_co | New aggregate instance. |
AggregateRoot
Section titled “AggregateRoot”Base class for event-sourced aggregates.
Aggregates:
- Maintain consistency boundaries
- Generate domain events on state changes
- Can be reconstructed from events (event sourcing)
- Support snapshotting for performance
Example
class Order(AggregateRoot): status: str = "draft" items: list[OrderItem] = Field(default_factory=list) total: Decimal = Decimal("0.00")
def add_item(self, product_id: str, quantity: int, price: Decimal): if self.status != "draft": raise InvalidOperationError("Cannot modify non-draft order")
self._apply(ItemAddedEvent( order_id=self.id, product_id=product_id, quantity=quantity, price=price ))
def _handle_item_added(self, event: ItemAddedEvent): self.items.append(OrderItem( product_id=event.product_id, quantity=event.quantity, price=event.price )) self.total += event.price * event.quantityInitialise the aggregate, auto-generating a UUID id if none is given.
Buffer a domain event for persistence, calling invariant checks first.
Return a copy of pending events without clearing them.
Discard all buffered events.
True when there are buffered events awaiting persistence.
def get_pending_events() -> list[Event]
Get events pending persistence.
| Type | Description |
|---|---|
| list[Event] | List of uncommitted events |
Clear pending events after persistence.
def load_from_history(events: list[Event]) -> None
Reconstruct aggregate state from event history.
This is used during aggregate loading to replay all events and rebuild the current state.
| Parameter | Type | Description |
|---|---|---|
| `events` | list[Event] | Historical events to replay |
def load_from_snapshot(snapshot: Snapshot) -> None
Restore aggregate state from a snapshot.
| Parameter | Type | Description |
|---|---|---|
| `snapshot` | Snapshot | The snapshot to restore from |
def create_snapshot() -> Snapshot
Create a snapshot of current aggregate state.
| Type | Description |
|---|---|
| Snapshot | Snapshot instance |
Checkpoint
Section titled “Checkpoint”Checkpoint for tracking projection progress.
Command
Section titled “Command”Command extended with aggregate-targeting helpers.
def for_aggregate( aggregate_id: UUID, version: int | None = None ) -> Command
Target a specific aggregate.
CommandBusConfig
Section titled “CommandBusConfig”Configuration for command bus.
CommandBusImpl
Section titled “CommandBusImpl”Command bus for dispatching commands.
Handlers are registered explicitly via register() or through
HandlerRegistry.register_with_buses() during provider registration.
Example
# Commands are automatically routed to handlersresult = await bus.dispatch(CreateOrderCommand( customer_id="cust-123", items=[...]))Initialize the command bus.
async def dispatch(command: Command) -> Any
Dispatch a command for execution.
| Parameter | Type | Description |
|---|---|---|
| `command` | Command | The command to execute. |
| Type | Description |
|---|---|
| Any | Command execution result. |
| Exception | Description |
|---|---|
| HandlerNotFoundError | If no handler is registered. |
| CommandExecutionError | If command execution fails. |
CommandHandlerProtocol
Section titled “CommandHandlerProtocol”Base class for command handlers.
Command handlers process commands and optionally return results. Each command type should have exactly one handler.
Example
class CreateOrderHandler(CommandHandlerProtocol[CreateOrderCommand, OrderId]): def __init__(self, repository: OrderRepository): self.repository = repository
async def handle(self, command: CreateOrderCommand) -> OrderId: order = Order.create( customer_id=command.customer_id, items=command.items ) await self.repository.save(order) return order.idHandle a command.
| Parameter | Type | Description |
|---|---|---|
| `command` | TCommand | The command to handle |
| Type | Description |
|---|---|
| TResult | Command execution result |
CommandResult
Section titled “CommandResult”Result of command execution.
DispatchResult
Section titled “DispatchResult”Result of an event dispatch operation.
Attributes: success: True if all handlers completed successfully. handler_results: List of return values from each handler. errors: List of (exception_type, exception_instance) tuples.
Raise if any handler failed. Call explicitly when needed.
Return True if any handler failed.
DomainEvent
Section titled “DomainEvent”Base class for domain events.
Initialize the event setting base fields and any extra subclass fields.
Declared dataclass fields receive their default values when not
supplied. Extra keyword arguments (e.g. fields declared on plain
subclasses without @dataclass) are set directly on the instance
via object.__setattr__, which bypasses the frozen=True
guard safely during construction.
Return a JSON-serializable dictionary representation.
Base class for entities within aggregates.
Entities:
- Have identity (id field)
- Are mutable
- Belong to an aggregate root
- Don’t have their own event streams
Represents a fact that happened in the past.
We override __init__ to accept arbitrary extra keyword arguments,
which are attached as attributes on the instance. This preserves the
historical ability to create Event(data=...) without declaring a
new subclass. event_type handling is performed by the contract
base class (which now includes actor_id).
The event version is an aggregate-specific concept and therefore
is declared on this subclass rather than the canonical DomainEvent
base class. with_version and related helpers operate on this field.
def with_version(version: int) -> Event
Create a copy with a specific version (aggregate version).
def for_aggregate( aggregate_id: UUID, aggregate_type: str ) -> Event
Create a copy for a specific aggregate.
EventBusConfig
Section titled “EventBusConfig”Configuration for event bus.
EventBusImpl
Section titled “EventBusImpl”Event bus for publishing domain events.
Orchestrates subscription management, per-event-type BoundedChannel backpressure (MAJ-11), and background drain tasks. All handler dispatch logic (lookup, parallel/sequential dispatch, retry) is provided by HandlerDispatchMixin.
Attributes: DEFAULT_MAX_CONCURRENT_HANDLERS: Maximum concurrent handler executions. DEFAULT_HANDLER_TIMEOUT_SECONDS: Timeout for each handler execution. DEFAULT_RETRY_FAILED_HANDLERS: Whether to retry failed handlers. DEFAULT_MAX_HANDLER_RETRIES: Maximum retry attempts per handler. DEFAULT_ENABLE_DEAD_LETTER: Whether to enable dead letter queue. DEFAULT_ALLOW_NO_HANDLERS: Whether to allow events with no handlers. DEFAULT_PARALLEL_DISPATCH: Whether to dispatch handlers in parallel. DEFAULT_CONTINUE_ON_ERROR: Whether to continue on handler errors.
def __init__( middlewares: list[MiddlewareFunc] | None = None, config: EventBusConfig | None = None, parallel: ParallelProtocol | None = None, tracer: TracerProtocol | None = None, hooks: HookRegistryProtocol | None = None ) -> None
Initialize the event bus.
| Parameter | Type | Description |
|---|---|---|
| `middlewares` | list[MiddlewareFunc] | None | List of middleware functions. |
| `config` | EventBusConfig | None | Event bus configuration. |
| `parallel` | ParallelProtocol | None | Parallel execution protocol for concurrent handler execution. |
| `tracer` | TracerProtocol | None | Optional tracer for distributed tracing. |
def subscribe( event_type: type[Event], handler: Any ) -> None
Subscribe a handler to an event type.
| Parameter | Type | Description |
|---|---|---|
| `event_type` | type[Event] | The event class to subscribe to. |
| `handler` | Any | Handler to call when event is published. |
Attach an optional tracer after provider boot wiring.
| Parameter | Type | Description |
|---|---|---|
| `tracer` | TracerProtocol | None | Tracer to attach, or None to clear. |
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None
Attach an optional hook registry after provider boot wiring.
def handler(event_type: type[Event]) -> Callable[[Callable], Callable]
Decorator to subscribe a handler to an event type.
Subscribe a handler to all events.
| Parameter | Type | Description |
|---|---|---|
| `handler` | Any | Handler to call for every published event. |
def unsubscribe( event_type: type[Event], handler: Any ) -> bool
Unsubscribe a handler from an event type.
| Parameter | Type | Description |
|---|---|---|
| `event_type` | type[Event] | The event class. |
| `handler` | Any | Handler to remove. |
| Type | Description |
|---|---|
| bool | True if handler was found and removed. |
async def publish(event: Event) -> Result[None, EventError]
Publish an event to all subscribers via a bounded channel.
Enqueues the event into a per-event-type BoundedChannel; a background drain task dequeues and dispatches it to all handlers. Blocks when the channel is at capacity providing natural backpressure (MAJ-11).
| Parameter | Type | Description |
|---|---|---|
| `event` | Event | The event to publish. |
| Type | Description |
|---|---|
| Result[None, EventError] | Ok(None) on successful enqueue. Err(EventHandlerError) when no handlers are registered and allow_no_handlers is False. |
Wait until all pending events in every channel have been dispatched.
Yields control repeatedly until every BoundedChannel is empty and all in-progress handler dispatches have completed. Useful in tests.
EventEnvelope
Section titled “EventEnvelope”Wrapper for events with metadata for storage and transmission.
EventHandledHook
Section titled “EventHandledHook”Payload fired after an event handler successfully processes an event.
Attributes: event_type: Fully-qualified type name of the handled event. handler: Qualified name of the handler class or function.
EventHandlerProtocol
Section titled “EventHandlerProtocol”Base class for event handlers.
Event handlers process events and perform side effects. Unlike command handlers, multiple handlers can subscribe to the same event type.
Example
class SendOrderConfirmationHandler(EventHandlerProtocol[OrderCreatedEvent]): def __init__(self, email_service: EmailService): self.email_service = email_service
async def handle(self, event: OrderCreatedEvent) -> None: await self.email_service.send_confirmation( order_id=event.order_id, customer_email=event.customer_email )async def handle(event: TEvent) -> Result[None, EventError]
Handle an event.
| Parameter | Type | Description |
|---|---|---|
| `event` | TEvent | The event to handle. |
| Type | Description |
|---|---|
| Result[None, EventError] | ``Ok(None)`` on success, ``Err(EventError)`` if handling fails in an expected, recoverable way. |
EventPublishedEvent
Section titled “EventPublishedEvent”Emitted when the event bus successfully publishes a domain event.
Attributes: source_event_type: Fully-qualified type name of the published event. handler_count: Number of registered handlers that were notified.
EventPublishedHook
Section titled “EventPublishedHook”Payload fired when an event is published onto the event bus.
Attributes: event_type: Fully-qualified type name of the published event. aggregate_id: Identifier of the aggregate that emitted the event, if known.
EventSourcingRepository
Section titled “EventSourcingRepository”AbstractRepository using event sourcing with snapshot acceleration.
This repository:
- Loads aggregates using snapshots when available (O(1) vs O(N))
- Persists changes as events
- Automatically creates snapshots based on policy
- Publishes events to the event bus
Example
# Create repository with snapshot supportrepository = EventSourcingRepository( aggregate_type=Order, event_store=event_store, snapshot_store=snapshot_store, event_bus=event_bus, snapshot_policy=EventCountPolicy(100))
# Load aggregate - uses snapshot if availableorder = await repository.get(order_id)
# Modify aggregateorder.add_item(product_id, quantity, price)
# Save - stores events, maybe creates snapshot, publishes eventsawait repository.save(order)Performance comparison: Without snapshots (1000 events): ~100ms to load With snapshots (1000 events): ~1ms to load (snapshot + ~0 events)
def __init__( aggregate_type: type[TAggregate], event_store: AbstractEventStore, snapshot_store: AbstractSnapshotStore | None = None, event_bus: EventBusImpl | None = None, snapshot_policy: SnapshotPolicy | None = None, snapshot_every: int = 100 )
Initialize the event sourcing repository.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_type` | type[TAggregate] | The aggregate class to manage |
| `event_store` | AbstractEventStore | Event store for event persistence |
| `snapshot_store` | AbstractSnapshotStore | None | Snapshot store (optional, enables O(1) loading) |
| `event_bus` | EventBusImpl | None | Event bus for publishing events (optional) |
| `snapshot_policy` | SnapshotPolicy | None | Custom snapshot policy (default: every N events) |
| `snapshot_every` | int | Events between snapshots (default: 100) |
Load an aggregate by ID using snapshot acceleration.
This method:
- Tries to load the latest snapshot
- Loads only events after the snapshot version
- Reconstructs the aggregate
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | UUID | str | The aggregate identifier |
| Type | Description |
|---|---|
| TAggregate | None | The aggregate if found, None otherwise |
Save an aggregate’s pending events.
This method:
- Persists pending events to the event store
- Creates a snapshot if policy triggers
- Publishes events to the event bus
- Clears pending events from the aggregate
| Parameter | Type | Description |
|---|---|---|
| `aggregate` | TAggregate | The aggregate to save |
| Exception | Description |
|---|---|
| ConcurrencyError | If version conflict detected |
Check if an aggregate exists.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | UUID | str | The aggregate identifier |
| Type | Description |
|---|---|
| bool | True if the aggregate has any events |
Get an aggregate or raise an error if not found.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | UUID | str | The aggregate identifier |
| Type | Description |
|---|---|
| TAggregate | The aggregate |
| Exception | Description |
|---|---|
| AggregateNotFoundError | If aggregate not found |
Force creation of a snapshot for an aggregate.
Useful for bulk operations or maintenance.
| Parameter | Type | Description |
|---|---|---|
| `aggregate` | TAggregate | The aggregate to snapshot |
EventStoredHook
Section titled “EventStoredHook”Payload fired when an event is persisted to the event store.
Attributes: event_type: Fully-qualified type name of the stored event. stream_id: Identifier of the event stream the event was written to.
EventsConfig
Section titled “EventsConfig”Top-level Events configuration.
This consolidated class combines all event system configuration. No TYPE_CHECKING guards or cast() wrappers — all classes defined in this module, so runtime type resolution and dict→model coercion work.
Validate that the selected backend has a matching config.
Returns a list of validation error messages. Empty list means valid.
EventsModule
Section titled “EventsModule”CQRS + Event Sourcing: CommandBusProtocol, QueryBusProtocol, EventBusProtocol, EventStoreProtocol, sagas.
Call configure to configure the events subsystem.
Usage
from lexigram.events.config import EventsConfigfrom lexigram.events.types import EventStoreBackend
@module( imports=[ EventsModule.configure( EventsConfig(event_store_backend=EventStoreBackend.MEMORY) ) ])class AppModule(Module): passdef configure( cls, config: Any | None = None, handler_modules: list[str] | None = None ) -> DynamicModule
Create an EventsModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | Any | None | EventsConfig or ``None`` for framework defaults (in-memory event store). |
| `handler_modules` | list[str] | None | Python module paths to auto-discover event/command handlers from (e.g. ``["myapp.handlers"]``). |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def scope( cls, *handlers: type ) -> DynamicModule
Scope event/command/query handler classes into a feature module.
Registers the given handler classes as providers so they are resolved by the bus implementations. The parent module graph must already include EventsModule.configure — this does not create a new event store or bus.
Uses the anonymous token pattern so both configure() and
scope() can coexist in the same compiled graph without a
ModuleDuplicateError.
Example
@module( imports=[ EventsModule.configure(config), EventsModule.scope( CreateOrderHandler, GetOrderHandler, OrderShippedHandler, ), ])class OrderFeatureModule(Module): pass| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule scoped to this feature. |
def stub(cls) -> DynamicModule
Return an in-memory EventsModule for unit testing.
Registers an in-memory event store with no external broker connections. Suitable for testing event handlers and CQRS logic.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule backed by in-memory event storage. |
EventsProvider
Section titled “EventsProvider”Thin orchestrator that coordinates Events sub-providers (M-01).
Delegates all setup, lifecycle, and container-registration work to:
- StoreSubProvider
- BusSubProvider
- HandlerSubProvider
- ManagerSubProvider
Public API is unchanged from the monolithic version.
def __init__( config: EventsConfig | dict[str, Any] | None = None, handler_modules: list[str] | None = None ) -> None
def from_config( cls, config: EventsConfig, **context: Any ) -> EventsProvider
Create EventsProvider from config.
property config() -> EventsConfig
def config(value: EventsConfig) -> None
Allow orchestrator to auto-inject config from LexigramConfig.
async def register(container: ContainerRegistrarProtocol) -> None
Register all events components with the DI container.
async def boot(container: ContainerResolverProtocol) -> None
Start buses, managers, and wire optional tracer.
Shutdown all components.
Aggregate health across stores and buses.
HandlerInfo
Section titled “HandlerInfo”Information about a registered handler.
HandlerRegistry
Section titled “HandlerRegistry”Registry for handler discovery and registration.
The registry provides:
- Auto-discovery of handlers from modules
- Registration with command, query, and event buses
- Tracking of handler-message relationships
Example
# Create registryregistry = HandlerRegistry()
# Discover handlers from modulesregistry.discover("app.handlers")
# Register with busesregistry.register_with_buses( command_bus=command_bus, query_bus=query_bus, event_bus=event_bus)
# Or use DI container for handler instantiationregistry.register_with_container(container)Initialize the handler registry.
Load handlers from the global decorator registry (M-17).
Call this explicitly to register handlers that were annotated with @command_handler, @event_handler, etc.
Register a command handler.
| Parameter | Type | Description |
|---|---|---|
| `command_type` | type | The command class |
| `handler_type` | Any | The handler class |
Register a query handler.
| Parameter | Type | Description |
|---|---|---|
| `query_type` | type | The query class |
| `handler_type` | Any | The handler class |
Register an event handler.
| Parameter | Type | Description |
|---|---|---|
| `event_type` | type | The event class |
| `handler_type` | Any | The handler class |
Discover handlers from a module or package.
Scans the module for classes that inherit from handler base classes and registers them automatically.
| Parameter | Type | Description |
|---|---|---|
| `module_path` | str | Python module path (e.g., "app.handlers") |
| `recursive` | bool | Whether to scan submodules |
| Type | Description |
|---|---|
| int | Number of handlers discovered |
def register_with_buses( command_bus: CommandBusProtocol | None = None, query_bus: QueryBusProtocol | None = None, event_bus: EventBusProtocol | None = None, handler_factory: Callable[[type], Any] | None = None ) -> None
Register all discovered handlers with buses.
| Parameter | Type | Description |
|---|---|---|
| `command_bus` | CommandBusProtocol | None | Command bus to register with |
| `query_bus` | QueryBusProtocol | None | Query bus to register with |
| `event_bus` | EventBusProtocol | None | Event bus to register with |
| `handler_factory` | Callable[[type], Any] | None | Factory function to instantiate handlers |
def get_command_handlers() -> dict[type, type[CommandHandlerProtocol[Any, Any]]]
Get all registered command handlers.
def get_query_handlers() -> dict[type, type[QueryHandlerProtocol[Any, Any]]]
Get all registered query handlers.
Get all registered event handlers.
Create a registry with no pre-registered handlers.
Returns a fresh instance ready for handler registration via the @event_handler decorator or register() method.
| Type | Description |
|---|---|
| HandlerRegistry | A new HandlerRegistry instance. |
IdempotentCommand
Section titled “IdempotentCommand”Idempotent command extended with aggregate-targeting helpers.
InMemoryEventStore
Section titled “InMemoryEventStore”In-memory event store for testing and development.
M-19: Uses a global log and type index for O(n) and O(matching) streaming performance.
Example
store = InMemoryEventStore()
# Save eventsawait store.append( stream_id="order-123", events=[OrderCreated(...)], expected_version=0)
# Load eventsevents = await store.read("order-123")def __init__( schema_evolution: SchemaEvolution | None = None, max_events_per_stream: int = 10000 ) -> None
Initialize the in-memory store.
| Parameter | Type | Description |
|---|---|---|
| `schema_evolution` | SchemaEvolution | None | Optional schema evolution handler. |
| `max_events_per_stream` | int | Maximum events retained per stream. Oldest events are evicted when the limit is exceeded. Defaults to 10,000 — set to 0 for unbounded. |
async def append( stream_id: str, events: list[Event], expected_version: int | None = None ) -> int
Append events to an in-memory stream with optimistic concurrency.
| Type | Description |
|---|---|
| int | New stream version. |
async def read( stream_id: str, from_version: int = 0, to_version: int | None = None, limit: int | None = None ) -> list[Event]
Read events from a stream (M-15).
Get the current version of a stream.
Get all stream IDs (for testing/debugging).
Purge events for a stream up to (and including) the given version (MF-04).
async def stream_all( from_position: int = 0, batch_size: int = 100, partition: int | None = None, total_partitions: int | None = None ) -> AsyncIterator[Event]
Stream all events across all streams.
M-19: O(n) from position using _global_log — no merging required.
async def stream_by_type( event_types: list[str], from_position: int = 0 ) -> AsyncIterator[Event]
Stream events filtered by type.
M-19: O(matching_events) not O(total_events) using type index.
Get all stream IDs (for testing/debugging).
Get current global position.
Clear all events (for testing).
Close the in-memory event store (no-op).
InMemoryEventStoreConfig
Section titled “InMemoryEventStoreConfig”Configuration for in-memory event store.
InMemorySnapshotStore
Section titled “InMemorySnapshotStore”In-memory snapshot store for testing and development.
M-14: Uses Snapshot objects throughout (save_snapshot / get_latest / get_by_version). The legacy tuple-based save/load methods are provided by the AbstractSnapshotStore base class via backward-compat shims.
Example
from lexigram.events.types import Snapshotstore = InMemorySnapshotStore()
# Save snapshotawait store.save_snapshot(Snapshot( aggregate_id="order-123", aggregate_type="Order", version=100, state={"status": "completed", "total": 150.00}))
# Load latest snapshotsnapshot = await store.get_latest("order-123")Initialize the in-memory snapshot store.
| Parameter | Type | Description |
|---|---|---|
| `max_snapshots_per_aggregate` | int | Maximum snapshots to keep per aggregate |
async def save_snapshot(snapshot: Snapshot) -> None
Save a snapshot (M-14: accepts Snapshot object).
async def get_latest(aggregate_id: str) -> Snapshot | None
Load the latest snapshot for an aggregate.
async def get_by_version( aggregate_id: str | UUID, version: int ) -> Snapshot | None
Get the snapshot for a specific version.
Delete old snapshots, keeping only the most recent ones.
Clear all snapshots (for testing).
Close the in-memory snapshot store (no-op).
IntegrationEvent
Section titled “IntegrationEvent”Event for cross-service communication.
Message
Section titled “Message”Base class for all messages in the Events system.
def with_metadata(metadata: MessageMetadata) -> Message
Create a copy of the message with new metadata.
def with_correlation_id(correlation_id: UUID) -> Message
Create a copy with a correlation ID.
def with_causation_id(causation_id: UUID) -> Message
Create a copy with a causation ID.
Get correlation ID from metadata.
Get causation ID from metadata.
Convert message to dictionary.
def from_dict( cls, data: dict[str, Any] ) -> Message
Create message from dictionary.
MessageMetadata
Section titled “MessageMetadata”Metadata attached to messages for tracking and correlation.
def with_correlation(correlation_id: UUID) -> MessageMetadata
Create new metadata with correlation ID.
def with_causation(causation_id: UUID) -> MessageMetadata
Create new metadata with causation ID.
def with_user(user_id: str) -> MessageMetadata
Create new metadata with user ID.
def with_tenant(tenant_id: str) -> MessageMetadata
Create new metadata with tenant ID.
def with_custom( key: str, value: Any ) -> MessageMetadata
Create new metadata with additional custom field.
MiddlewareInfo
Section titled “MiddlewareInfo”Information about registered middleware.
MongoDBConfig
Section titled “MongoDBConfig”MongoDB connection configuration for store implementations.
Used directly by MongoDBSnapshotStore for motor connection lifecycle management.
MongoDBEventStore
Section titled “MongoDBEventStore”MongoDB event store implementation using ``DocumentStoreProtocol``.
Uses the lexigram-nosql collection abstraction for standard CRUD
while keeping direct motor access for change streams and transactions.
Features: - Optimistic concurrency with version checking - Global event ordering with sequence counters - Efficient event streaming with cursors - Automatic index creation - Change streams support for real-time events
Example
store = MongoDBEventStore(document_store, config)await store.connect()
try: await store.save("order-123", [event], expected_version=0)finally: await store.close()def __init__( document_store: DocumentStoreProtocol, config: Any, event_serializer: Any | None = None ) -> None
Initialize MongoDB event store.
| Parameter | Type | Description |
|---|---|---|
| `document_store` | DocumentStoreProtocol | A connected ``DocumentStoreProtocol`` (e.g. MongoDBDocumentStore). |
| `config` | Any | Event store configuration with collection names. |
| `event_serializer` | Any | None | Optional custom event serializer. |
Events collection.
Counters collection.
Connect to MongoDB and set up collections.
async def append( stream_id: str, events: list[Event], expected_version: int | None = None ) -> int
Append events to a stream with optimistic concurrency.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream identifier. |
| `events` | list[Event] | Events to save. |
| `expected_version` | int | None | Expected current version. |
| Type | Description |
|---|---|
| int | New stream version. |
| Exception | Description |
|---|---|
| ConcurrencyError | If version conflict occurs. |
async def read( stream_id: str, from_version: int = 0, to_version: int | None = None, limit: int | None = None ) -> list[Event]
Read events from a stream.
Get current stream version.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to check. |
| Type | Description |
|---|---|
| int | Current version (0 if stream doesn't exist). |
async def stream_all( from_position: int = 0, batch_size: int = 100, partition: int | None = None, total_partitions: int | None = None ) -> AsyncIterator[Event]
Stream all events from a global position.
async def stream_by_type( event_types: list[str], from_position: int = 0 ) -> AsyncIterator[Event]
Stream events filtered by type.
async def watch_events(event_types: list[str] | None = None) -> AsyncIterator[Event]
Watch for new events using MongoDB change streams.
Note: This requires direct motor access for change stream support.
Falls back to polling if the underlying store doesn’t support
watch().
| Parameter | Type | Description |
|---|---|---|
| `event_types` | list[str] | None | Optional list of event types to filter. |
| Type | Description |
|---|---|
| AsyncIterator[Event] | New events as they are inserted. |
async def find_by_type( event_type: str, limit: int = 100, offset: int = 0 ) -> list[Event]
Get events of a specific type.
| Parameter | Type | Description |
|---|---|---|
| `event_type` | str | Event type name. |
| `limit` | int | Maximum events to return. |
| `offset` | int | Number of events to skip. |
| Type | Description |
|---|---|
| list[Event] | List of events. |
Get total event count.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | None | Optional stream to count (None for all). |
| Type | Description |
|---|---|
| int | Event count. |
Delete all events for a stream.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to delete. |
| Type | Description |
|---|---|
| int | Number of deleted events. |
Close the MongoDB connection.
MongoDBEventStoreConfig
Section titled “MongoDBEventStoreConfig”Configuration for MongoDB event store (top-level user config).
Used by EventsConfig.mongodb <lexigram.events.config.EventsConfig.mongodb>. For the store-level connection config used by MongoDBSnapshotStore, see lexigram.events.stores.mongodb.MongoDBConfig.
Validate and coerce MongoDB connection string to SecretStr.
MongoDBSnapshotStore
Section titled “MongoDBSnapshotStore”MongoDB snapshot store implementation.
This store manages aggregate snapshots for optimized loading.
Example
config = MongoDBConfig(uri="mongodb://localhost:27017")store = MongoDBSnapshotStore(config)await store.connect()
# Save snapshotawait store.save(Snapshot( aggregate_id="order-123", aggregate_type="Order", version=100, state=order.to_dict(),))
# Load latestsnapshot = await store.get_latest("order-123")Initialize MongoDB snapshot store.
| Parameter | Type | Description |
|---|---|---|
| `config` | MongoDBConfig | MongoDB configuration. |
Connect to MongoDB.
async def save_snapshot(snapshot: Snapshot) -> None
Save a snapshot (M-14 standardized API).
async def get_latest(aggregate_id: str) -> Snapshot | None
Get latest snapshot (M-14 standardized API).
async def get_by_version( aggregate_id: str | UUID, version: int ) -> Snapshot | None
Get a specific snapshot version (M-14).
Delete old snapshots, keeping only the most recent ones.
| Parameter | Type | Description |
|---|---|---|
| `aggregate_id` | str | UUID | Aggregate identifier. |
| `keep_count` | int | Number of snapshots to keep. |
| Type | Description |
|---|---|
| int | Number of deleted snapshots. |
Close the MongoDB connection.
PagedResult
Section titled “PagedResult”Container for paginated results.
Attributes: items: The items in the current page. total: Total number of items across all pages. page: Current page number. page_size: Number of items per page.
Example
result = PagedResult( items=[...], total=150, page=2, page_size=20)logger.info(result.total_pages) # 8logger.info(result.has_next) # TrueInitialize paged result.
| Parameter | Type | Description |
|---|---|---|
| `items` | list[TResult] | Items in the current page. |
| `total` | int | Total item count. |
| `page` | int | Current page number. |
| `page_size` | int | Items per page. |
Get total number of pages.
Check if there is a next page.
| Type | Description |
|---|---|
| bool | True if there are more pages after current. |
Check if there is a previous page.
| Type | Description |
|---|---|
| bool | True if there are pages before current. |
Convert to dictionary.
| Type | Description |
|---|---|
| dict[str, Any] | Dictionary representation of paged result. |
PaginatedQuery
Section titled “PaginatedQuery”Query with built-in pagination support.
PostgresEventStoreConfig
Section titled “PostgresEventStoreConfig”PostgreSQL connection configuration.
Attributes: dsn: Database connection string (treated as a secret). pool_min_size: Minimum connection pool size. pool_max_size: Maximum connection pool size. command_timeout: Command timeout in seconds. events_table: Name of the events table. snapshots_table: Name of the snapshots table. auto_create_tables: Whether to create tables automatically.
Validate and normalize the DSN. Returns None when not provided.
ProjectionUpdatedEvent
Section titled “ProjectionUpdatedEvent”Emitted when a read-model projection is rebuilt or updated.
Attributes: projection_name: Canonical name of the projection that was updated. events_processed: Number of domain events processed in this update.
Represents a request for data.
QueryBusConfig
Section titled “QueryBusConfig”Configuration for query bus.
QueryBusImpl
Section titled “QueryBusImpl”Query bus for dispatching queries.
Handlers are registered explicitly via register() or through
HandlerRegistry.register_with_buses() during provider registration.
Example
# Queries are automatically routed to handlersorders = await bus.execute(GetOrdersByCustomerQuery( customer_id="cust-123", status="pending"))Initialize the query bus.
async def execute(query: Query[TResult]) -> TResult
Execute a query.
| Parameter | Type | Description |
|---|---|---|
| `query` | Query[TResult] | The query to execute. |
| Type | Description |
|---|---|
| TResult | Query result. |
| Exception | Description |
|---|---|
| HandlerNotFoundError | If no handler is registered. |
| QueryExecutionError | If query execution fails. |
QueryHandlerProtocol
Section titled “QueryHandlerProtocol”Base class for query handlers.
Query handlers process queries and return results. Each query type should have exactly one handler. Queries should not modify state.
Example
class GetOrdersHandler(QueryHandlerProtocol[GetOrdersQuery, list[OrderDTO]]): def __init__(self, read_model: OrderReadModel): self.read_model = read_model
async def handle(self, query: GetOrdersQuery) -> list[OrderDTO]: return await self.read_model.get_orders( customer_id=query.customer_id, status=query.status )Handle a query.
| Parameter | Type | Description |
|---|---|---|
| `query` | TQuery | The query to handle |
| Type | Description |
|---|---|
| TResult | Query result |
QueryResult
Section titled “QueryResult”Result of query execution.
RedisEventStore
Section titled “RedisEventStore”Redis-backed AbstractEventStore for lightweight deployments.
Uses the StateStoreProtocol protocol for storage, so no direct Redis dependency is introduced into this package. Suitable for single-node or low-throughput deployments where a full relational or document store is not required.
Key layout
{namespace}:{stream_id}:events — JSON array of serialised event dicts{namespace}:{stream_id}:version — current integer version for the stream{namespace}:_streams — JSON array of all known stream IDsExample
from lexigram.events.stores.redis import RedisEventStore
store = RedisEventStore(state_store, namespace="myapp:events")await store.append("order-123", [OrderCreated(...)])events = await store.read("order-123")def __init__( store: StateStoreProtocol, namespace: str = 'lexigram:events' ) -> None
Initialise the Redis-backed event store.
| Parameter | Type | Description |
|---|---|---|
| `store` | StateStoreProtocol | A StateStoreProtocol instance (e.g. a Redis-backed implementation). |
| `namespace` | str | Key prefix used for all stored keys. |
async def append( stream_id: str, events: list[Event], expected_version: int | None = None ) -> int
Append events to stream_id with optimistic concurrency control.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Unique stream identifier. |
| `events` | list[Event] | Events to append. Empty list is a no-op. |
| `expected_version` | int | None | If provided, the current stream version must equal this value or ConcurrencyError is raised. |
| Type | Description |
|---|---|
| int | New stream version after the append. |
| Exception | Description |
|---|---|
| ConcurrencyError | If *expected_version* does not match the actual current version of the stream. |
async def read( stream_id: str, from_version: int = 0, to_version: int | None = None, limit: int | None = None ) -> list[Event]
Read events from stream_id.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to read from. |
| `from_version` | int | Lowest version to include (inclusive, default 0 = all). |
| `to_version` | int | None | Highest version to include (inclusive). |
| `limit` | int | None | Maximum number of events to return. |
async def load( stream_id: str, after_version: int = 0 ) -> list[Event]
Load events after after_version (non-inclusive lower bound).
This alias is used by several base-class helpers.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to load events from. |
| `after_version` | int | Events with version <= this value are excluded. |
| Type | Description |
|---|---|
| list[Event] | Ordered list of events. |
Return the current version of stream_id (0 if the stream is empty).
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream identifier. |
| Type | Description |
|---|---|
| int | Current integer version. |
Public alias for get_stream_version.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream identifier. |
| Type | Description |
|---|---|
| int | Current integer version. |
async def stream_all( from_position: int = 0, batch_size: int = 100, partition: int | None = None, total_partitions: int | None = None ) -> AsyncIterator[Event]
Stream all events across all known streams.
Note: Cross-stream global ordering is not guaranteed by this implementation. Streams are iterated in registration order and events within each stream are returned in version order. For strict global ordering, use the PostgreSQL or MongoDB stores.
| Parameter | Type | Description |
|---|---|---|
| `from_position` | int | Events with ``sequence_number`` <= this value are skipped. |
| `batch_size` | int | Unused; present for interface compatibility. |
| `partition` | int | None | Optional partition index for sharding. |
| `total_partitions` | int | None | Total number of partitions. |
| Type | Description |
|---|---|
| AsyncIterator[Event] | Events in stream-registration order, version order within each stream. |
Purge events for stream_id up to and including up_to_version.
| Parameter | Type | Description |
|---|---|---|
| `stream_id` | str | Stream to compact. |
| `up_to_version` | int | Events at or below this version are removed. |
| Type | Description |
|---|---|
| int | Number of events deleted. |
Close the event store.
No-op for this implementation; the underlying StateStoreProtocol manages its own lifecycle.
Snapshot
Section titled “Snapshot”Aggregate state snapshot for performance optimization.
SnapshotConfig
Section titled “SnapshotConfig”Configuration for snapshotting behavior.
SnapshotManager
Section titled “SnapshotManager”Manages snapshot creation and retrieval for aggregates.
This is the CRITICAL component that transforms O(N) aggregate loading into O(1) by storing and loading snapshots.
Example
manager = SnapshotManager( event_store=event_store, snapshot_store=snapshot_store, policy=EventCountPolicy(100))
# Load aggregate with snapshot accelerationstate, events = await manager.load_with_snapshot( aggregate_id="order-123", aggregate_type="Order")
# Auto-snapshot after save if policy triggersawait manager.save_and_maybe_snapshot( aggregate_id="order-123", aggregate_type="Order", events=[...], current_state=state, expected_version=100)def __init__( event_store: EventStoreProtocol, snapshot_store: SnapshotStoreProtocol, policy: SnapshotPolicy | None = None, config: SnapshotConfig | None = None )
Initialize the snapshot manager.
| Parameter | Type | Description |
|---|---|---|
| `event_store` | EventStoreProtocol | The event store instance |
| `snapshot_store` | SnapshotStoreProtocol | The snapshot store instance |
| `policy` | SnapshotPolicy | None | Snapshot creation policy (default: EventCountPolicy(100)) |
| `config` | SnapshotConfig | None | Snapshot configuration |
async def load_with_snapshot( aggregate_id: str | UUID, aggregate_type: str ) -> tuple[dict[str, Any] | None, list[Any], int]
async def save_and_maybe_snapshot( aggregate_id: str | UUID, aggregate_type: str, events: list[Any], current_state: dict[str, Any], expected_version: int ) -> bool
Save events and create snapshot if policy triggers.
| Type | Description |
|---|---|
| bool | True if a snapshot was created |
async def create_snapshot( aggregate_id: str | UUID, aggregate_type: str, version: int, state: dict[str, Any] ) -> Snapshot
Create and store a snapshot.
async def force_snapshot( aggregate_id: str | UUID, aggregate_type: str, state_extractor: Callable[[], dict[str, Any]] ) -> Snapshot
Force creation of a snapshot regardless of policy.
Clean up old snapshots for an aggregate.
SqliteConfig
Section titled “SqliteConfig”SQLite configuration for event stores.
SqliteEventStore
Section titled “SqliteEventStore”SQLite event store implementation.
This store uses the generic DatabaseProviderProtocol for async database access. Suitable for development, testing, and single-instance deployments.
def __init__( config: SqliteConfig | None = None, provider: DatabaseProviderProtocol | None = None, event_serializer: Any | None = None ) -> None
Initialize SQLite event store.
| Parameter | Type | Description |
|---|---|---|
| `config` | SqliteConfig | None | SQLite configuration. |
| `provider` | DatabaseProviderProtocol | None | The generic database provider injected via DI. |
| `event_serializer` | Any | None | Optional custom event serializer. |
Register an event type for proper deserialization.
Connection is managed by the provider. Tables are created if configured.
async def append( stream_id: str, events: list[Event], expected_version: int | None = None ) -> int
Append events to a stream with optimistic concurrency.
async def read( stream_id: str, from_version: int = 0, to_version: int | None = None, limit: int | None = None ) -> list[Event]
Read events from a stream (M-15).
Get current stream version.
async def stream_all( from_position: int = 0, batch_size: int = 100, partition: int | None = None, total_partitions: int | None = None ) -> AsyncIterator[Event]
Stream all events from a global position.
async def stream_by_type( event_types: list[str], from_position: int = 0 ) -> AsyncIterator[Event]
Stream events filtered by type.
async def find_by_type( event_type: str, count: int = 100, start: int = 0 ) -> list[Event]
Get events of a specific type.
Get total event count.
Delete all events for a stream.
Get all stream IDs in the store.
Purge events for a stream up to (and including) the given version (MF-04).
Close the database connection.
SqliteSnapshotStore
Section titled “SqliteSnapshotStore”SQLite snapshot store implementation.
This store manages aggregate snapshots using a generic DatabaseProviderProtocol.
def __init__( config: SqliteConfig | None = None, provider: DatabaseProviderProtocol | None = None ) -> None
Initialize SQLite snapshot store.
| Parameter | Type | Description |
|---|---|---|
| `config` | SqliteConfig | None | SQLite configuration. |
| `provider` | DatabaseProviderProtocol | None | The generic database provider injected via DI. |
Connection is managed by the provider.
async def save_snapshot(snapshot: Snapshot) -> None
Save a snapshot (M-14 standardized API).
async def get_latest(aggregate_id: str) -> Snapshot | None
Get latest snapshot (M-14 standardized API).
async def get_by_version( aggregate_id: str | UUID, version: int ) -> Snapshot | None
Get a specific snapshot version (M-14).
Delete old snapshots, keeping only the most recent ones.
Close the database connection.
StreamInfo
Section titled “StreamInfo”Information about an event stream.
ValueObject
Section titled “ValueObject”Base class for value objects.
Value objects:
- Are immutable (frozen=True)
- Have no identity
- Are compared by value (all fields)
- Can be freely shared
WebhookDispatcher
Section titled “WebhookDispatcher”Delivers domain events to registered external webhook endpoints.
Signs each outbound POST with a cryptographic signature via the injected
WebhookSignatureVerifierProtocol so that receivers can verify
authenticity. Retries transient 5xx failures according to retry_policy.
Delivery failures for one endpoint are logged and do not prevent fan-out
to remaining endpoints.
Example
dispatcher = WebhookDispatcher( http_client=http_client, signature_verifier=HMACWebhookVerifier(), retry_policy=RetryConfig(max_attempts=3),)await dispatcher.dispatch(user_created_event, registered_endpoints)def __init__( http_client: HTTPClientProtocol, signature_verifier: WebhookSignatureVerifierProtocol, retry_policy: RetryConfig | None = None ) -> None
Initialise the dispatcher.
| Parameter | Type | Description |
|---|---|---|
| `http_client` | HTTPClientProtocol | HTTP client used for outbound POST requests. |
| `signature_verifier` | WebhookSignatureVerifierProtocol | Used to compute the outbound HMAC signature included in ``X-Webhook-Signature``. |
| `retry_policy` | RetryConfig | None | Retry configuration; defaults to 3 attempts with exponential backoff when not provided. |
async def dispatch( event: DomainEvent, endpoints: list[WebhookEndpoint] ) -> None
Sign and POST event to each eligible registered endpoint.
Only endpoints whose event_types filter matches the event are
delivered to. Each delivery is retried per retry_policy on
transient (5xx) failures. Failures are logged but do not abort
delivery to remaining endpoints.
| Parameter | Type | Description |
|---|---|---|
| `event` | DomainEvent | Domain event to fan-out. |
| `endpoints` | list[WebhookEndpoint] | Registered endpoints to consider for delivery. |
WebhookEndpoint
Section titled “WebhookEndpoint”An external endpoint registered to receive domain events.
Attributes:
url: HTTP(S) URL to POST events to.
secret: Shared secret used to compute the HMAC signature.
event_types: Set of event type names this endpoint subscribes to.
When None, all events are delivered.
Return True when this endpoint should receive event.
| Parameter | Type | Description |
|---|---|---|
| `event` | DomainEvent | Domain event to test against the filter. |
| Type | Description |
|---|---|
| bool | ``True`` when ``event_types`` is ``None`` (all events) or the event's type name is contained in ``event_types``. |
Functions
Section titled “Functions”clear_handler_registry
Section titled “clear_handler_registry”
Clear all registered handlers from the global handler registry.
Intended for use in tests to reset handler state between test runs. Clears both the HandlerRegistry instances and the global decorator registry.
command_handler
Section titled “command_handler”
def command_handler( command_type: type[Command], *, name: str | None = None, **metadata: Any ) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to register a function as a command handler.
| Parameter | Type | Description |
|---|---|---|
| `command_type` | type[Command] | The Command class this handler handles. |
| `name` | str | None | Optional custom name for the handler. **metadata: Additional metadata to attach to the handler. |
| Type | Description |
|---|---|
| Callable[[Callable[P, R]], Callable[P, R]] | Decorator function. |
Example
@command_handler(CreateUserCommand)async def handle_create_user(command: CreateUserCommand) -> str: user = await user_service.create(command.username, command.email) return user.user_idevent_handler
Section titled “event_handler”
def event_handler( event_type: type[Event], *, name: str | None = None, priority: int = 0, **metadata: Any ) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to register a function as an event handler.
| Parameter | Type | Description |
|---|---|---|
| `event_type` | type[Event] | The Event class this handler handles. |
| `name` | str | None | Optional custom name for the handler. |
| `priority` | int | Handler priority (higher = executed first). **metadata: Additional metadata to attach to the handler. |
| Type | Description |
|---|---|
| Callable[[Callable[P, R]], Callable[P, R]] | Decorator function. |
Example
@event_handler(UserCreatedEvent, priority=10)async def send_welcome_email(event: UserCreatedEvent) -> None: await email_service.send_welcome(event.email)query_handler
Section titled “query_handler”
def query_handler( query_type: type[Query], *, name: str | None = None, cacheable: bool = False, cache_ttl: int | None = None, **metadata: Any ) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to register a function as a query handler.
| Parameter | Type | Description |
|---|---|---|
| `query_type` | type[Query] | The Query class this handler handles. |
| `name` | str | None | Optional custom name for the handler. |
| `cacheable` | bool | Whether results can be cached. |
| `cache_ttl` | int | None | Cache time-to-live in seconds. **metadata: Additional metadata to attach to the handler. |
| Type | Description |
|---|---|
| Callable[[Callable[P, R]], Callable[P, R]] | Decorator function. |
Example
@query_handler(GetUserQuery, cacheable=True, cache_ttl=300)async def handle_get_user(query: GetUserQuery) -> UserDTO: return await user_repository.get(query.user_id)Exceptions
Section titled “Exceptions”AggregateNotFoundError
Section titled “AggregateNotFoundError”Raised when aggregate is not found.
CommandExecutionError
Section titled “CommandExecutionError”Command execution error.
ConcurrencyError
Section titled “ConcurrencyError”Concurrency/optimistic locking error.
Raised when event version conflicts indicate concurrent modifications.
DuplicateHandlerError
Section titled “DuplicateHandlerError”Duplicate handler registration.
Raised when attempting to register a handler for a type that already has one.
EventError
Section titled “EventError”Base events/messaging error.
EventLoadError
Section titled “EventLoadError”Raised when event loading fails.
EventPersistenceError
Section titled “EventPersistenceError”Raised when event persistence fails.
EventStoreConnectionError
Section titled “EventStoreConnectionError”Raised when event store connection fails.
EventStoreError
Section titled “EventStoreError”Raised for event store errors.
HandlerNotFoundError
Section titled “HandlerNotFoundError”Event/command handler not found.
Raised when no handler is registered for a specific event or command type.
ProjectionBuildError
Section titled “ProjectionBuildError”Raised when projection building fails.
ProjectionNotFoundError
Section titled “ProjectionNotFoundError”Raised when a projection is not found.
QueryExecutionError
Section titled “QueryExecutionError”Query execution error.
SchemaError
Section titled “SchemaError”Raised for schema-related errors.
SecurityError
Section titled “SecurityError”Raised for security-related errors.
StreamNotFoundError
Section titled “StreamNotFoundError”Raised when stream is not found.
Parameters mirror the helper used in tests: (stream_type, stream_id).
StreamingError
Section titled “StreamingError”Raised for streaming-related errors.
WebhookDeliveryError
Section titled “WebhookDeliveryError”Raised when an outbound webhook delivery fails after all retry attempts.
Carries the target url and the HTTP status code returned by the remote endpoint so callers can log structured failure context.