API Reference
Classes
Section titled “Classes”AbstractSaga
Section titled “AbstractSaga”Abstract base class for orchestration sagas.
Subclasses declare their saga identity (get_id) and terminal
condition (is_completed). Steps are registered via add_step
and the full sequence is driven by execute.
Type Parameters: T: The saga state / context type (unused by the base class but carried for subclass typing convenience).
Lifecycle:
PENDING → RUNNING → COMPLETED (all steps succeed)
PENDING → RUNNING → COMPENSATING → FAILED
(a step fails; already-completed steps are unwound)
Return the current schema version for this saga class.
| Type | Description |
|---|---|
| int | The integer version declared on the class. |
Check if stored state version is compatible with this class.
Override for custom migration logic (e.g. allow loading V1 in V2).
| Parameter | Type | Description |
|---|---|---|
| `stored_version` | int | Version number found in the persisted state. |
| Type | Description |
|---|---|
| bool | ``True`` when the stored version matches the class VERSION. |
Return the stable identifier for this saga instance.
| Type | Description |
|---|---|
| str | A string that uniquely identifies this saga process. |
Return whether the saga has reached a terminal state.
| Type | Description |
|---|---|
| bool | ``True`` when no further steps remain (success *or* fully compensated failure). |
def add_step(step: SagaStep) -> None
Register a step to be executed as part of this saga.
Steps are executed in the order they are added.
| Parameter | Type | Description |
|---|---|---|
| `step` | SagaStep | The step to append to the execution sequence. |
Return whether step_name has been successfully compensated.
| Parameter | Type | Description |
|---|---|---|
| `step_name` | str | The SagaStep.name to check. |
| Type | Description |
|---|---|
| bool | ``True`` if the step's compensation action ran successfully. |
Manually record step_name as compensated.
Useful when compensation is performed externally (e.g., a separate process handles the rollback) and the saga state needs to be updated.
| Parameter | Type | Description |
|---|---|---|
| `step_name` | str | The SagaStep.name to mark as compensated. |
Execute all registered steps in registration order.
On first step failure, compensation is triggered for all
previously completed steps (in reverse order) and an
Err(SagaError(...)) is returned.
| Type | Description |
|---|---|
| Result[None, SagaError] | ``Ok(None)`` when all steps complete successfully. ``Err(SagaError)`` when any step fails (compensation already attempted before returning). |
Manually trigger compensation for all completed steps.
Useful when the calling code detects a failure condition outside
the normal execute flow.
AbstractWorkflowNode
Section titled “AbstractWorkflowNode”Abstract base class for all workflow nodes.
All built-in and custom nodes inherit from this class. The only required override is execute.
| Parameter | Type | Description |
|---|---|---|
| `name` | Unique node identifier within the workflow graph. | |
| `node_type` | The NodeType label for this node. |
def __init__( name: str, node_type: NodeType = NodeType.CUSTOM ) -> None
Unique node identifier.
property node_type() -> NodeType
Type label for this node.
Execute this node and return a state-update dict.
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current shared workflow state (read-only reference; mutations are applied via the returned dict). |
| Type | Description |
|---|---|
| dict[str, Any] | Dict of key-value pairs to merge into the shared state. Return an empty dict if no state updates are needed. |
AgentNode
Section titled “AgentNode”Workflow node that executes a Lexigram agent.
| Parameter | Type | Description |
|---|---|---|
| `name` | Node identifier. | |
| `agent` | Object implementing AgentProtocol. | |
| `executor` | Optional AgentExecutorProtocol instance. | |
| `input_key` | State key to read the agent input from. | |
| `output_key` | State key where the agent response is stored. |
def __init__( name: str, *, agent: Any, executor: Any | None = None, input_key: str = 'input', output_key: str = 'output' ) -> None
Run the agent with the value of state[input_key].
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current workflow state. |
| Type | Description |
|---|---|
| dict[str, Any] | Dict with {output_key: agent_response_text}. |
BulkBatchResult
Section titled “BulkBatchResult”Result of processing a single batch.
Get the duration of this batch.
Get the number of successful items in this batch.
Get the number of failed items in this batch.
Check if the batch completed successfully.
BulkItemError
Section titled “BulkItemError”Structured error information for a failed batch.
Attributes: batch_id: The ID of the batch that failed. error: Human-readable error message. error_type: The fully-qualified type name of the exception. retry_count: Number of retry attempts made before giving up. timestamp: Monotonic timestamp of the failure.
BulkOperation
Section titled “BulkOperation”Manages bulk operations with batching, concurrency control, and error handling.
def __init__( config: BulkOperationConfig | None = None, processor: Callable[[list[T]], Awaitable[list[R]]] | None = None, circuit_breaker: CircuitBreakerProtocol | None = None, checkpoint_store: CacheBackendProtocol | None = None, *, on_progress: Callable[[int, int, str], Awaitable[None]] | None = None, on_complete: Callable[[int, int, str], Awaitable[None]] | None = None, on_error: Callable[[int, int, str], Awaitable[None]] | None = None ) -> None
property state() -> BulkOperationState
Get the current operation state.
property metrics() -> BulkOperationMetrics
Get operation metrics.
def add_progress_callback(callback: Callable[[BulkOperationMetrics], Awaitable[None]]) -> None
Add a progress tracking callback.
Remove a stored checkpoint, allowing the operation to restart from scratch.
| Parameter | Type | Description |
|---|---|---|
| `operation_id` | str | Unique identifier for the bulk operation run to clear. |
async def execute( items: list[T] | AsyncIterable[T], processor: Callable[[list[T]], Awaitable[list[R]]] | None = None, operation_id: str | None = None ) -> AsyncIterator[BulkBatchResult[T, R]]
Execute bulk operation on items.
| Parameter | Type | Description |
|---|---|---|
| `items` | list[T] | AsyncIterable[T] | Items to process — either a list or an async iterable. |
| `processor` | Callable[[list[T]], Awaitable[list[R]]] | None | Optional processor to override the one set at construction. |
| `operation_id` | str | None | Stable identifier used for checkpoint/resume. If supplied and a ``checkpoint_store`` was provided at construction, completed batches from a previous interrupted run are automatically skipped. |
Cancel the bulk operation.
Shut down the bulk operation, cancelling any in-progress execution.
Safe to call even when the operation is not running. Ensures the operation reaches a terminal state so resources can be reclaimed.
BulkOperationConfig
Section titled “BulkOperationConfig”Configuration for bulk operations and pipeline execution.
Attributes: batch_size: Number of items per batch. max_concurrency: Maximum number of concurrent operations. timeout: Operation timeout in seconds. retry_attempts: Number of retry attempts. retry_delay: Delay between retries in seconds. enable_progress_tracking: Whether to track progress. circuit_breaker_config: Optional circuit breaker configuration. pipeline_timeout: Default pipeline execution timeout in seconds.
BulkOperationMetrics
Section titled “BulkOperationMetrics”Metrics for bulk operations.
Record the start of the operation.
Record the end of the operation.
def record_batch_result(result: BulkBatchResult) -> None
Update metrics with result from a processed batch.
Record an unexpected operation-level error.
Get the total duration of the operation.
Get the success rate as a percentage.
Get items processed per second.
BulkOperationState
Section titled “BulkOperationState”States for bulk operations.
ConditionalStep
Section titled “ConditionalStep”Pipeline step that executes conditionally based on a predicate.
def __init__( name: str, condition: Callable[[PipelineContext], Result[bool, Exception] | Awaitable[Result[bool, Exception]]], true_step: PipelineStep, false_step: PipelineStep | None = None, dependencies: list[str] | None = None, timeout: float | None = None ) -> None
async def execute(context: PipelineContext) -> Result[Any, Exception]
Execute the appropriate step based on condition.
async def should_skip(context: PipelineContext) -> Result[bool, Exception]
Conditional steps should not be skipped.
ExecutionHistory
Section titled “ExecutionHistory”Formats and inspects the execution trace of a completed workflow run.
| Parameter | Type | Description |
|---|---|---|
| `result` | The completed GraphResult to inspect. |
def __init__(result: GraphResult) -> None
Return the full execution trace as a JSON-serialisable dict.
| Type | Description |
|---|---|
| dict[str, Any] | Dict with keys iterations, duration_ms, terminated_at, and nodes. |
Return a multi-line human-readable execution trace.
| Type | Description |
|---|---|
| str | A string with one line per node showing its outcome. |
def failed_nodes() -> list[NodeResult]
Return node results that terminated with an error.
| Type | Description |
|---|---|
| list[NodeResult] | Filtered list of failed NodeResult entries. |
def succeeded_nodes() -> list[NodeResult]
Return node results that completed successfully.
| Type | Description |
|---|---|
| list[NodeResult] | Filtered list of successful NodeResult entries. |
Return cumulative node execution time in milliseconds.
FunctionStep
Section titled “FunctionStep”Pipeline step that executes a function.
def __init__( name: str, func: Callable[[PipelineContext], Result[Any, Exception] | Awaitable[Result[Any, Exception]]], dependencies: list[str] | None = None, skip_condition: Callable[[PipelineContext], Result[bool, Exception] | Awaitable[Result[bool, Exception]]] | None = None, error_handler: Callable[[PipelineContext, Exception], Result[Any, Exception] | Awaitable[Result[Any, Exception]]] | None = None, cleanup_func: Callable[[PipelineContext], Result[None, Exception] | Awaitable[Result[None, Exception]]] | None = None, timeout: float | None = None ) -> None
async def execute(context: PipelineContext) -> Result[Any, Exception]
Execute the function, honoring optional per-step timeout.
async def should_skip(context: PipelineContext) -> Result[bool, Exception]
Check skip condition.
async def on_error( context: PipelineContext, error: Exception ) -> Result[Any, Exception]
Handle errors.
async def cleanup(context: PipelineContext) -> Result[None, Exception]
Cleanup resources.
GateNode
Section titled “GateNode”Routing node that forces the graph to branch on state values.
| Parameter | Type | Description |
|---|---|---|
| `name` | Node identifier. | |
| `routes` | Mapping of target-node-name to a (state) -> bool predicate. |
def __init__( name: str, *, routes: dict[str, Callable[[dict[str, Any]], bool]] | None = None ) -> None
Return empty dict — routing is handled by WorkflowEdge conditions.
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current workflow state (not mutated here). |
| Type | Description |
|---|---|
| dict[str, Any] | Empty dict — gate nodes produce no output. |
GraphConfig
Section titled “GraphConfig”Configuration for the workflow graph engine.
Attributes: enabled: Enable the graph workflow subsystem. max_iterations: Maximum graph traversal steps (anti-infinite-loop guard). node_timeout: Per-node execution timeout in seconds. 0 = no limit. total_timeout: Total workflow timeout in seconds. 0 = no limit. checkpoint_enabled: Persist state checkpoints for resumption after failures. parallel_branches: Execute independent parallel branches via asyncio.gather. max_parallel_branches: Max parallel branches at once. 0 = unlimited.
Check config is safe for the target environment.
| Parameter | Type | Description |
|---|---|---|
| `env` | Environment | None | Target deployment environment. |
| Type | Description |
|---|---|
| list[ConfigIssue] | List of configuration issues (empty if all OK). |
GraphResult
Section titled “GraphResult”Aggregate result of a complete graph workflow execution.
Attributes: final_state: The workflow state after all nodes have executed. node_results: Ordered list of per-node results. iterations: Total graph traversal steps taken. duration_ms: Total wall-clock execution time. terminated_at: Name of the terminal node that ended execution.
Return the output key from final_state, or None.
True when all executed nodes succeeded.
HumanNode
Section titled “HumanNode”Pause the workflow to collect a human response.
| Parameter | Type | Description |
|---|---|---|
| `name` | Node identifier (doubles as checkpoint_id). | |
| `prompt` | Question or instruction to display to the operator. Supports {key} substitution from state. | |
| `output_key` | State key where the human response will be stored. | |
| `resume_key` | State key the engine populates with the human answer when WorkflowEngine.resume() is called. |
def __init__( name: str, *, prompt: str = 'Human input required.', output_key: str = 'human_response', resume_key: str | None = None ) -> None
Return human response if resumed, otherwise pause execution.
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current workflow state. |
| Type | Description |
|---|---|
| dict[str, Any] | Dict with {output_key: human_response} when resumed. |
| Exception | Description |
|---|---|
| HumanInputRequiredError | When no prior human response is in state. |
LLMNode
Section titled “LLMNode”Workflow node that makes a raw LLM call.
| Parameter | Type | Description |
|---|---|---|
| `name` | Node identifier. | |
| `llm` | LLMClientProtocol implementation. | |
| `prompt_template` | Prompt string with optional {key} variables. | |
| `system_prompt` | Optional system message prepended to the call. | |
| `output_key` | State key where the completion is stored. | |
| `model` | Optional model override. | |
| `temperature` | Sampling temperature override. | |
| `max_tokens` | Max output token override. |
def __init__( name: str, *, llm: Any, prompt_template: str = '{input}', system_prompt: str = '', output_key: str = 'output', model: str | None = None, temperature: float | None = None, max_tokens: int | None = None ) -> None
Render the prompt from state and call the LLM.
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current workflow state used for prompt substitution. |
| Type | Description |
|---|---|
| dict[str, Any] | Dict with {output_key: completion_text}. |
NodeResult
Section titled “NodeResult”Result produced by a single workflow graph node execution.
Attributes: node_name: Name of the node that produced this result. output: State update dict returned by the node. duration_ms: Wall-clock duration of the node execution. error: Error message if the node raised; None on success. skipped: True when the node was bypassed by conditional routing.
NodeType
Section titled “NodeType”Enumeration of built-in workflow node types.
ParallelStep
Section titled “ParallelStep”Pipeline step that executes multiple steps in parallel.
def __init__( name: str, steps: list[PipelineStep], dependencies: list[str] | None = None, *, fail_fast: bool = True, timeout: float | None = None ) -> None
async def execute(context: PipelineContext) -> Result[list[Any], Exception]
Execute all steps in parallel.
Pipeline
Section titled “Pipeline”Declarative pipeline for composing business workflows.
def __init__( name: str, steps: list[PipelineStep], checkpoint_store: CacheBackendProtocol | None = None ) -> None
def add_step(step: PipelineStep) -> None
Append a step to this pipeline.
| Parameter | Type | Description |
|---|---|---|
| `step` | PipelineStep | The PipelineStep to append. Validates pipeline structure after addition. |
| Exception | Description |
|---|---|
| ValueError | If the new step creates duplicate names or broken deps. |
Return a Graphviz DOT representation of the pipeline execution graph.
Nodes represent pipeline steps; directed edges represent declared
dependencies (dep → step). The output can be rendered with
dot -Tpng or pasted into an online Graphviz viewer.
| Type | Description |
|---|---|
| str | A string containing valid Graphviz DOT source for the pipeline. |
Example
dot_src = pipeline.to_dot()# dot -Tsvg -o pipeline.svg <<< "$(python -c '...')"async def execute( initial_context: PipelineContext | None = None, fail_fast: bool = True ) -> Result[PipelineContext, Exception]
Execute the pipeline.
PipelineContext
Section titled “PipelineContext”Context passed through pipeline execution, accumulating step results and metadata.
Get result from a completed step.
| Parameter | Type | Description |
|---|---|---|
| `step_name` | str | The name of the step whose result to retrieve. |
| `default` | Any | Value to return if the step has no recorded result. |
| Type | Description |
|---|---|
| Any | The recorded step result, or *default* if not found. |
Set result for a step.
| Parameter | Type | Description |
|---|---|---|
| `step_name` | str | The name of the step. |
| `result` | Any | The result value to record. |
Add metadata to the context.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Metadata key. |
| `value` | Any | Metadata value. |
Get metadata from the context.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Metadata key to look up. |
| `default` | Any | Value to return if the key is absent. |
| Type | Description |
|---|---|
| Any | The metadata value, or *default* if not found. |
Total duration of pipeline execution in seconds.
property failed_steps() -> list[StepExecutionResult]
List of failed step results.
property successful_steps() -> list[StepExecutionResult]
List of successfully completed step results.
Check if any steps in the pipeline failed.
| Type | Description |
|---|---|
| bool | ``True`` if at least one step has a FAILED status. |
PipelineStep
Section titled “PipelineStep”Abstract base class for pipeline steps.
Non-execute helpers are deliberately simple: they return plain
values and may raise exceptions. This keeps user implementations
lightweight and avoids the noise of wrapping every result in
Result. Helpers are responsible for handling their own errors;
the pipeline driver will catch and convert them into failed step
records if necessary.
Steps may specify an optional timeout (in seconds). When set, the
execute invocation will be wrapped with asyncio.wait_for
so that a long-running or hung step is interrupted and treated as a
failure. The timeout does not apply to should_skip,
on_error or cleanup helpers.
def __init__( name: str, dependencies: list[str] | None = None, timeout: float | None = None, resilience: ResiliencePipelineProtocol | None = None ) -> None
async def execute(context: PipelineContext) -> Result[Any, Exception]
Execute the pipeline step.
| Parameter | Type | Description |
|---|---|---|
| `context` | PipelineContext | The shared pipeline context carrying inter-step data. |
| Type | Description |
|---|---|
| Result[Any, Exception] | A ``Result`` wrapping the step's output or a failure. |
async def should_skip(context: PipelineContext) -> Result[bool, Exception]
Determine whether to skip this step.
| Parameter | Type | Description |
|---|---|---|
| `context` | PipelineContext | The current pipeline context. |
| Type | Description |
|---|---|
| Result[bool, Exception] | ``Ok(True)`` to skip, ``Ok(False)`` to execute, or ``Err(e)`` on failure while evaluating the condition. |
The default implementation always returns Ok(False).
async def on_error( context: PipelineContext, error: Exception ) -> Any
Handle errors raised during execute.
| Parameter | Type | Description |
|---|---|---|
| `context` | PipelineContext | The current pipeline context. |
| `error` | Exception | The exception that caused the step to fail. |
The default behaviour is to re-raise the error, which causes the step (and pipeline) to fail. Custom handlers may return a value to recover and continue the pipeline.
async def cleanup(context: PipelineContext) -> Result[None, Exception]
Cleanup resources after step execution.
| Parameter | Type | Description |
|---|---|---|
| `context` | PipelineContext | The current pipeline context. |
Called regardless of success or failure. The default implementation does nothing. Exceptions are logged but not propagated.
SagaStep
Section titled “SagaStep”Descriptor for a single saga orchestration step.
Shared across lexigram-workflow and lexigram-events via contracts.
The action callable must return Result[Any, SagaStepError].
The compensation callable is called only when the saga is unwinding
and may raise exceptions freely (infrastructure failures in compensation
should propagate).
Attributes:
name: Unique step identifier within the saga.
action: Async callable returning Result[Any, SagaStepError].
compensation: Async callable to undo the step on failure.
max_retries: Maximum retry attempts if action returns Err.
retry_delay: Seconds to wait between retry attempts.
idempotent: Whether the compensation is safe to call multiple times.
StepExecutionResult
Section titled “StepExecutionResult”Result of executing a single pipeline step.
StepStatus
Section titled “StepStatus”Pipeline step execution status.
SubworkflowNode
Section titled “SubworkflowNode”Embed a nested WorkflowEngine as a single node in a parent workflow.
| Parameter | Type | Description |
|---|---|---|
| `name` | Node identifier. | |
| `workflow` | A WorkflowEngine instance to nest. | |
| `input_key` | State key to read as the subworkflow input value. | |
| `output_key` | State key where the subworkflow result is stored. |
def __init__( name: str, *, workflow: Any, input_key: str = 'input', output_key: str = 'output' ) -> None
Execute the nested workflow with the current parent state.
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current parent workflow state. |
| Type | Description |
|---|---|
| dict[str, Any] | Dict with {output_key: subworkflow_output} on success or {output_key: "Subworkflow error: ..."} on failure. |
ToolNode
Section titled “ToolNode”Workflow node that executes a Lexigram tool.
| Parameter | Type | Description |
|---|---|---|
| `name` | Node identifier. | |
| `tool` | Object implementing ToolProtocol with async execute(). | |
| `input_key` | State key to read the tool primary input from. | |
| `output_key` | State key where the tool result is stored. | |
| `extra_keys` | Additional state keys forwarded to the tool as arguments. |
def __init__( name: str, *, tool: Any, input_key: str = 'input', output_key: str = 'output', extra_keys: list[str] | None = None ) -> None
Call the tool with state-derived arguments.
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current workflow state. |
| Type | Description |
|---|---|
| dict[str, Any] | Dict with {output_key: tool_result}. |
TransformPipe
Section titled “TransformPipe”Composable transformation pipe.
Each step in the pipe receives the output of the previous step and returns the (possibly transformed) value. Steps can be sync or async callables.
The pipe is immutable — calling pipe returns a new
TransformPipe instance with the added step.
def __init__( steps: list[Callable[Ellipsis, Any]] | None = None, error_handler: Callable[[Exception, Any], Any] | None = None ) -> None
def pipe(step: Callable[[T], U | Awaitable[U]]) -> TransformPipe[U]
Add a transformation step to the pipe.
The supplied step should accept the current value (of type
T) and return either a new value of type U or an awaitable
producing U. The returned pipe has type TransformPipe[U] so
that subsequent calls are type safe.
| Parameter | Type | Description |
|---|---|---|
| `step` | Callable[[T], U | Awaitable[U]] | A callable (sync or async) that takes a value and returns the transformed value. |
| Type | Description |
|---|---|
| TransformPipe[U] | A new ``TransformPipe`` with the step appended. |
def tap(side_effect: Callable[[T], Any]) -> TransformPipe[T]
Add a side-effect step that observes but does not transform the value.
The side-effect callable receives the current value. Its return value is ignored and the original value is forwarded unchanged.
| Parameter | Type | Description |
|---|---|---|
| `side_effect` | Callable[[T], Any] | A callable (sync or async) for observation/logging. |
| Type | Description |
|---|---|
| TransformPipe[T] | A new ``TransformPipe`` with the tap step appended. |
def pipe_if( predicate: Callable[[T], bool], step: Callable[[T], U | Awaitable[U]] ) -> TransformPipe[U | T]
Add a conditional transformation step.
The step is only applied when predicate(value) returns True.
Otherwise the value passes through unchanged.
| Parameter | Type | Description |
|---|---|---|
| `predicate` | Callable[[T], bool] | A callable that decides whether to apply *step*. |
| `step` | Callable[[T], U | Awaitable[U]] | The transformation to apply when the predicate holds. |
| Type | Description |
|---|---|
| TransformPipe[U | T] | A new ``TransformPipe`` with the conditional step appended. |
def catch(handler: Callable[[Exception, T], Any]) -> TransformPipe[T]
Set a raw exception recovery handler for the pipe.
When any step raises and a handler is set, the handler receives
(exception, last_value) and its return value becomes the pipe
result, swallowing the exception.
Warning
catch()operates outside theResultpattern — it intercepts raw exceptions rather than returningErr(…). Prefer designing pipe steps to returnResult[T, E]and handling errors at the call-site viaresult.match()orresult.and_then()(async) /result.and_then_sync()when possible. Usecatch()only for coarse-grained last-resort recovery (e.g. logging and substituting a sentinel value) where propagating exceptions would crash the surrounding pipeline.
| Parameter | Type | Description |
|---|---|---|
| `handler` | Callable[[Exception, T], Any] | A callable ``(exception, value) -> recovery_value``. |
| Type | Description |
|---|---|
| TransformPipe[T] | A new ``TransformPipe`` with the error handler configured. |
Execute the pipe by passing the value through all steps.
| Parameter | Type | Description |
|---|---|---|
| `value` | T | The initial input value. **kwargs: Additional keyword arguments forwarded to each step. |
| Type | Description |
|---|---|
| T | The final transformed value. |
WorkflowBuilder
Section titled “WorkflowBuilder”Fluent builder for WorkflowEngine.
| Parameter | Type | Description |
|---|---|---|
| `name` | Workflow identifier used in logs and checkpoints. | |
| `config` | Default runtime configuration. |
def add_node( name: str, *, node: AbstractWorkflowNode | None = None, agent: Any | None = None, tool: Any | None = None, llm: Any | None = None, prompt: str = '', human_prompt: str = '', subworkflow: Any | None = None, executor: Any | None = None, output_key: str = 'output' ) -> WorkflowBuilder
Register a node in the workflow graph.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Unique node identifier. |
| `node` | AbstractWorkflowNode | None | Pre-built AbstractWorkflowNode instance. |
| `agent` | Any | None | AgentProtocol wrapped in AgentNode. |
| `tool` | Any | None | ToolProtocol wrapped in ToolNode. |
| `llm` | Any | None | LLMClientProtocol wrapped in LLMNode. |
| `prompt` | str | Prompt template for llm nodes. |
| `human_prompt` | str | Prompt shown to the human for human nodes. |
| `subworkflow` | Any | None | Nested workflow for SubworkflowNode. |
| `executor` | Any | None | Agent executor required for agent nodes. |
| `output_key` | str | State key to store the node primary output under. |
| Type | Description |
|---|---|
| WorkflowBuilder | self for chaining. |
| Exception | Description |
|---|---|
| ValueError | If name is already registered or no node type is given. |
def add_gate( name: str, routes: dict[str, Callable[[dict[str, Any]], bool]] ) -> WorkflowBuilder
Add a GateNode with conditional outgoing edges.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Gate node identifier. |
| `routes` | dict[str, Callable[[dict[str, Any]], bool]] | Mapping of target-node-name to condition callable. |
| Type | Description |
|---|---|
| WorkflowBuilder | self for chaining. |
def add_edge( source: str, target: str, *, condition: Callable[[dict[str, Any]], bool] | None = None, label: str = '' ) -> WorkflowBuilder
Add a directed edge from source to target.
| Parameter | Type | Description |
|---|---|---|
| `source` | str | Source node name. |
| `target` | str | Target node name. |
| `condition` | Callable[[dict[str, Any]], bool] | None | Optional guard callable (state: dict) -> bool. |
| `label` | str | Optional human-readable label. |
| Type | Description |
|---|---|
| WorkflowBuilder | self for chaining. |
def set_entry(node: str) -> WorkflowBuilder
Set node as the workflow entry point.
| Parameter | Type | Description |
|---|---|---|
| `node` | str | Node name to start execution from. |
| Type | Description |
|---|---|
| WorkflowBuilder | self for chaining. |
def set_terminal( node: str, *, condition: Callable[[dict[str, Any]], bool] | None = None ) -> WorkflowBuilder
Mark node as a terminal node.
| Parameter | Type | Description |
|---|---|---|
| `node` | str | Node name to mark as terminal. |
| `condition` | Callable[[dict[str, Any]], bool] | None | Optional guard callable. |
| Type | Description |
|---|---|
| WorkflowBuilder | self for chaining. |
def configure(config: GraphConfig) -> WorkflowBuilder
Replace the default GraphConfig.
| Parameter | Type | Description |
|---|---|---|
| `config` | GraphConfig | New configuration. |
| Type | Description |
|---|---|
| WorkflowBuilder | self for chaining. |
def build() -> WorkflowEngine
Validate and build the WorkflowEngine.
| Type | Description |
|---|---|
| WorkflowEngine | Ready-to-execute workflow engine. |
| Exception | Description |
|---|---|
| GraphValidationError | If entry node is not set or graph has structural errors. |
WorkflowCheckpoint
Section titled “WorkflowCheckpoint”In-memory checkpoint store for workflow states.
| Parameter | Type | Description |
|---|---|---|
| `max_size` | Maximum number of checkpoints to keep. Oldest entries are evicted once the limit is reached. 0 means unlimited. |
def save( checkpoint_id: str, state: WorkflowState ) -> None
Persist a snapshot of state under checkpoint_id.
| Parameter | Type | Description |
|---|---|---|
| `checkpoint_id` | str | Unique identifier for this checkpoint. |
| `state` | WorkflowState | Workflow state to snapshot. |
Retrieve the state snapshot for checkpoint_id.
| Parameter | Type | Description |
|---|---|---|
| `checkpoint_id` | str | Identifier to look up. |
| Type | Description |
|---|---|
| dict[str, object] | None | The raw state dict if found, otherwise None. |
Remove a checkpoint entry.
| Parameter | Type | Description |
|---|---|---|
| `checkpoint_id` | str | Identifier to remove. |
| Type | Description |
|---|---|
| bool | True if an entry was removed, False if not found. |
WorkflowCompletedEvent
Section titled “WorkflowCompletedEvent”Emitted when a workflow instance reaches a terminal success state.
Attributes: workflow_id: Unique identifier of the workflow instance. workflow_name: Human-readable name of the workflow definition.
WorkflowCompletedHook
Section titled “WorkflowCompletedHook”Payload fired when a workflow instance reaches a terminal state.
Attributes:
workflow_id: Unique identifier of the workflow instance.
workflow_type: Type or name of the workflow definition.
succeeded: True if the workflow completed successfully.
WorkflowDefinition
Section titled “WorkflowDefinition”Immutable, versioned descriptor for a workflow.
| Parameter | Type | Description |
|---|---|---|
| `name` | Unique workflow name. | |
| `steps` | Ordered list of step descriptors (opaque; interpreted by the executor). Defaults to an empty list for graph-based workflows where nodes/edges are the canonical step representation. | |
| `version` | Definition schema version. Increment this whenever the definition structure changes incompatibly. Defaults to ``1``. |
WorkflowEdge
Section titled “WorkflowEdge”A directed edge in the workflow graph.
Edges connect a source node to a target node. An optional
condition callable gates the edge: if provided, the edge is
traversed only when condition(state) is truthy. Edges without
a condition are always traversed (unconditional/always edges).
For parallel branches, list multiple edges from the same source nodes;
WorkflowEngine will
collect all edges whose conditions are satisfied and execute them
concurrently when GraphConfig.parallel_branches is True.
Attributes:
source: Name of the originating node.
target: Name of the destination node.
condition: Optional callable (state: dict) -> bool.
When None the edge is unconditional.
label: Optional human-readable label for documentation/debugging.
is_parallel: Hint that this edge should be executed concurrently
with sibling edges from the same source. The engine enables
parallelism automatically when multiple conditions are satisfied;
this flag provides an explicit override.
Return True if this edge should be traversed given state.
| Parameter | Type | Description |
|---|---|---|
| `state` | dict[str, Any] | Current workflow state dict. |
| Type | Description |
|---|---|
| bool | ``True`` for unconditional edges or when the condition is met. |
WorkflowEngine
Section titled “WorkflowEngine”Async directed-graph workflow executor.
Construct via WorkflowBuilder.
| Parameter | Type | Description |
|---|---|---|
| `name` | Display name for logging and checkpointing. | |
| `nodes` | Dict mapping node name to node instance. | |
| `edges` | List of all edges in the graph. | |
| `entry_node` | Name of the starting node. | |
| `terminal_conditions` | Mapping of terminal node name to optional condition callable (state) -> bool. If None the node is always terminal. | |
| `config` | Runtime configuration. |
def __init__( name: str, nodes: dict[str, AbstractWorkflowNode], edges: list[WorkflowEdge], entry_node: str, terminal_conditions: dict[str, Any], config: GraphConfig | None = None, version: int = 1 ) -> None
Workflow display name used in logs and checkpoints.
Definition schema version — incremented when the graph structure changes incompatibly.
async def execute( input: str, *, config: GraphConfig | None = None, state: dict[str, Any] | None = None ) -> Result[GraphResult, GraphExecutionError]
Execute the workflow from the entry node.
| Parameter | Type | Description |
|---|---|---|
| `input` | str | User input string injected as state["input"]. |
| `config` | GraphConfig | None | Optional per-call config override. |
| `state` | dict[str, Any] | None | Optional pre-populated initial state. |
| Type | Description |
|---|---|
| Result[GraphResult, GraphExecutionError] | Ok(GraphResult) or Err(GraphExecutionError) on failure. |
async def resume( checkpoint_state: dict[str, Any], human_response: str, *, config: GraphConfig | None = None ) -> Result[GraphResult, GraphExecutionError]
Resume a workflow paused at a human-in-the-loop node.
| Parameter | Type | Description |
|---|---|---|
| `checkpoint_state` | dict[str, Any] | State dict snapshot from the checkpoint. |
| `human_response` | str | Human operator input. |
| `config` | GraphConfig | None | Optional per-call config override. |
| Type | Description |
|---|---|
| Result[GraphResult, GraphExecutionError] | Ok(GraphResult) or Err(GraphExecutionError) on failure. |
Validate the workflow graph structure.
| Exception | Description |
|---|---|
| GraphValidationError | If the graph has structural problems. |
WorkflowFailedEvent
Section titled “WorkflowFailedEvent”Emitted when a workflow instance terminates due to an unrecoverable error.
Attributes: workflow_id: Unique identifier of the workflow instance. workflow_name: Human-readable name of the workflow definition. error: Human-readable description of the failure reason.
WorkflowInstance
Section titled “WorkflowInstance”A single running (or paused) execution of a workflow definition.
| Parameter | Type | Description |
|---|---|---|
| `instance_id` | Unique identifier for this execution. | |
| `workflow_name` | Name of the workflow definition this instance belongs to. | |
| `definition_version` | The WorkflowDefinition.version recorded at instance-creation time. Used to detect incompatible definition changes on resume. |
def create( cls, definition: WorkflowDefinition ) -> WorkflowInstance
Create a new instance stamped with definition’s version.
| Parameter | Type | Description |
|---|---|---|
| `definition` | WorkflowDefinition | The workflow definition being started. |
| Type | Description |
|---|---|
| WorkflowInstance | A new WorkflowInstance whose definition_version matches ``definition.version``. |
def validate_definition_version(definition: WorkflowDefinition) -> None
Raise if definition’s version differs from the one this instance was started under.
Call this before executing any step when resuming a non-new instance to prevent silently running stale or incompatible steps.
| Parameter | Type | Description |
|---|---|---|
| `definition` | WorkflowDefinition | The workflow definition being used to resume this instance. |
| Exception | Description |
|---|---|
| WorkflowVersionMismatchError | If ``definition.version`` differs from the version recorded at instance creation. |
WorkflowModule
Section titled “WorkflowModule”Pipeline orchestration, bulk operations, saga state machines, and graph engine.
Call configure to configure the workflow subsystem.
Usage (defaults)
@module(imports=[WorkflowModule.configure()])class AppModule(Module): passUsage (configured)
from lexigram.workflow.config import BulkOperationConfig
@module( imports=[WorkflowModule.configure(BulkOperationConfig(batch_size=200))])class AppModule(Module): passdef configure( cls, config: Any | None = None, saga_store: SagaStoreProtocol | None = None ) -> DynamicModule
Create a WorkflowModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | Any | None | BulkOperationConfig or ``None`` for framework defaults. |
| `saga_store` | SagaStoreProtocol | None | Optional durable SagaStoreProtocol implementation. Defaults to in-memory. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def stub( cls, config: Any = None ) -> DynamicModule
Return an in-memory WorkflowModule for unit testing.
Uses in-memory saga store and default workflow configuration. No external state backends are required.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule with in-memory workflow state. |
WorkflowProvider
Section titled “WorkflowProvider”DI provider for the workflow subsystem.
Registers pipeline, bulk-operation, and saga infrastructure into
the container. All registrations use singleton scope so that
the same configured objects are shared across the application.
Attributes:
config: BulkOperationConfig controlling bulk and pipeline defaults.
saga_store: Optional SagaStoreProtocol implementation for
durable saga state. When omitted, sagas operate in-memory
only (suitable for development and single-process deployments).
def __init__( config: BulkOperationConfig | None = None, saga_store: SagaStoreProtocol | None = None, state_machine: StateMachineProtocol | None = None, db_provider: DatabaseProviderProtocol | None = None, state_table: str = 'workflow_state_transitions' ) -> None
Initialise the WorkflowProvider.
| Parameter | Type | Description |
|---|---|---|
| `config` | BulkOperationConfig | None | ``BulkOperationConfig`` for bulk and pipeline defaults. Defaults to ``BulkOperationConfig()`` (framework defaults). |
| `saga_store` | SagaStoreProtocol | None | Optional durable store for saga state. When ``None``, sagas run in-memory only. |
| `state_machine` | StateMachineProtocol | None | Optional pre-configured StateMachine instance to register under StateMachineProtocol. When ``None``, no state machine singleton is registered. |
| `db_provider` | DatabaseProviderProtocol | None | Optional database provider for state transition persistence. When supplied, a DatabaseStatePersistence is registered under StatePersistenceProtocol so that ``StateMachine`` instances can be resolved with durable persistence. |
| `state_table` | str | SQL table name used by ``DatabaseStatePersistence``. Defaults to ``"workflow_state_transitions"``. |
def from_config( cls, config: BulkOperationConfig, **context: Any ) -> Self
Create provider from config object.
async def register(container: ContainerRegistrarProtocol) -> None
Register workflow services into the DI container.
Bindings registered:
WorkflowProvider— self-reference for inspection.BulkOperationConfig— pipeline/bulk defaults.Pipeline/PipelineProtocol— default pipeline factory.SagaStoreProtocol— optional durable saga store.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerRegistrarProtocol | DI registrar provided by the framework. |
async def boot(container: ContainerResolverProtocol) -> None
Perform post-registration initialisation.
Resolves BulkOperationConfig and logs the active configuration so
that operators can confirm the expected settings on startup.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerResolverProtocol | DI resolver provided by the framework. |
Shut down the workflow provider and release any held resources.
Check provider health.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Maximum seconds to wait for health check response. |
| Type | Description |
|---|---|
| HealthCheckResult | HealthCheckResult with status and component details. |
WorkflowRunner
Section titled “WorkflowRunner”Execute a WorkflowEngine with retry and optional checkpointing.
| Parameter | Type | Description |
|---|---|---|
| `engine` | The WorkflowEngine to run. | |
| `max_retries` | Maximum attempts after the first failure. | |
| `retry_delay` | Seconds to wait between attempts. | |
| `checkpoint` | Optional WorkflowCheckpoint instance. |
def __init__( engine: WorkflowEngine, *, max_retries: int = 0, retry_delay: float = 1.0, checkpoint: Any | None = None ) -> None
async def run( input: str, *, state: dict[str, Any] | WorkflowState | None = None ) -> Result[GraphResult, GraphExecutionError]
Execute the workflow, retrying on transient failures.
| Parameter | Type | Description |
|---|---|---|
| `input` | str | Workflow input string. |
| `state` | dict[str, Any] | WorkflowState | None | Optional pre-populated WorkflowState. |
| Type | Description |
|---|---|
| Result[GraphResult, GraphExecutionError] | Ok(GraphResult) on success or Err(GraphExecutionError) on permanent failure. |
| Exception | Description |
|---|---|
| HumanInputRequiredError | When a HumanNode pauses execution. |
async def resume( human_response: str, *, state: WorkflowState, response_key: str = 'human_response' ) -> Result[GraphResult, GraphExecutionError]
Resume a paused HITL workflow by injecting the human response.
| Parameter | Type | Description |
|---|---|---|
| `human_response` | str | The operator response to inject into state. |
| `state` | WorkflowState | The WorkflowState from the paused point. |
| `response_key` | str | Key under which to store human_response. |
| Type | Description |
|---|---|
| Result[GraphResult, GraphExecutionError] | Ok(GraphResult) on success or Err(GraphExecutionError). |
WorkflowStartedEvent
Section titled “WorkflowStartedEvent”Emitted when a workflow instance begins execution.
Attributes: workflow_id: Unique identifier of the workflow instance. workflow_name: Human-readable name of the workflow definition.
WorkflowStartedHook
Section titled “WorkflowStartedHook”Payload fired when a workflow instance begins execution.
Attributes: workflow_id: Unique identifier of the workflow instance. workflow_type: Type or name of the workflow definition.
WorkflowState
Section titled “WorkflowState”Shared, mutable state container for a workflow execution.
All nodes read from and write to the same WorkflowState instance.
Each node’s execute() method receives the state as a plain
dict[str, Any] and returns a partial update dict that is merged
back via merge.
Keys set by the engine:
"input"— the original user input string"output"— the most recent node output (updated each step)"_iteration"— current traversal step count"_history"— ordered list of(node_name, output_dict)tuples
Example
from lexigram.logging import get_logger
logger = get_logger(__name__)state = WorkflowState(input="Summarise this document")state.merge({"summary": "...", "output": "..."})logger.info("workflow_output", output=state["output"])Return the value for key, or default if absent.
Return a shallow copy of the underlying state dict.
Merge update into state, overwriting existing keys.
Internal keys prefixed with _ are skipped from external
updates to prevent accidental manipulation of engine metadata.
Append a node execution record to the history list.
| Parameter | Type | Description |
|---|---|---|
| `node_name` | str | Name of the node that produced *output*. |
| `output` | dict[str, Any] | State update dict returned by the node. |
Current graph traversal step count.
Ordered list of (node_name, output_dict) tuples.
WorkflowStateTransitionedHook
Section titled “WorkflowStateTransitionedHook”Payload fired when a workflow instance transitions between states.
Attributes: workflow_id: Unique identifier of the workflow instance. from_state: State name being exited. to_state: State name being entered.
Functions
Section titled “Functions”conditional
Section titled “conditional”
def conditional( name: str, condition: Callable[[PipelineContext], Awaitable[Result[bool, Exception]]], true_step: PipelineStep, false_step: PipelineStep | None = None, dependencies: list[str] | None = None, timeout: float | None = None ) -> ConditionalStep
Create a conditional pipeline step.
parallel
Section titled “parallel”
def parallel( name: str, steps: list[PipelineStep], dependencies: list[str] | None = None, *, fail_fast: bool = True, timeout: float | None = None ) -> ParallelStep
Create a parallel execution step.
pipeline_step
Section titled “pipeline_step”
def pipeline_step(dependencies: list[str] | None = None) -> Callable[[Callable[[PipelineContext], Awaitable[Result[Any, Exception]]]], FunctionStep]
Decorator to convert a function into a pipeline step.
saga_step
Section titled “saga_step”
def saga_step( name: str | None = None, *, compensation: Callable[Ellipsis, Any] | None = None ) -> Callable[[F], F]
Mark an async function as a saga step with optional compensation.
A saga step participates in distributed transaction management. If the step fails and a compensation handler is provided, it will be invoked during rollback.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | None | Optional step name override. Defaults to the function name. |
| `compensation` | Callable[Ellipsis, Any] | None | Async callable to invoke for rollback on failure. |
Example
@saga_step(name=“reserve_inventory”, compensation=release_inventory) async def reserve_inventory(ctx: SagaContext) -> None: …
def step( name: str, func: Callable[[PipelineContext], Awaitable[Result[Any, Exception]]], dependencies: list[str] | None = None, timeout: float | None = None ) -> FunctionStep
Create a function-based pipeline step.
timeout is an optional per-step timeout in seconds; it is forwarded
to FunctionStep.
workflow
Section titled “workflow”
def workflow( name: str | None = None, *, timeout: float | None = None, retries: int = 0 ) -> Callable[[F], F]
Mark an async function as a workflow definition.
Attaches workflow metadata used by the execution engine for registration, timeout enforcement, and retry policy.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | None | Optional workflow name override. Defaults to the function name. |
| `timeout` | float | None | Maximum execution time in seconds. None means no limit. |
| `retries` | int | Number of retry attempts on failure. |
Example
@workflow(name=“onboarding”, timeout=300.0, retries=1) async def onboarding_workflow(ctx: WorkflowContext) -> None: …
Exceptions
Section titled “Exceptions”BulkOperationCancelledError
Section titled “BulkOperationCancelledError”Raised when a bulk operation is cancelled.
BulkOperationError
Section titled “BulkOperationError”Base exception for bulk operation errors.
BulkOperationTimeoutError
Section titled “BulkOperationTimeoutError”Raised when a bulk operation times out.
CycleDetectedError
Section titled “CycleDetectedError”Graph engine exceeded max_iterations (likely cycle).
| Parameter | Type | Description |
|---|---|---|
| `iterations` | Number of iterations completed before the limit. | |
| `node` | Name of the node that was about to execute again. |
GraphExecutionError
Section titled “GraphExecutionError”Base exception for graph workflow engine operations.
| Parameter | Type | Description |
|---|---|---|
| `message` | Human-readable error description. | |
| `node` | Optional name of the node where the error occurred. |
GraphTimeoutError
Section titled “GraphTimeoutError”Graph workflow execution exceeded total timeout.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | The configured timeout value in seconds. |
GraphValidationError
Section titled “GraphValidationError”Graph structure validation failed.
HumanInputRequiredError
Section titled “HumanInputRequiredError”Raised by HumanNode to pause execution awaiting human input.
| Parameter | Type | Description |
|---|---|---|
| `prompt` | Text/question to display to the human operator. | |
| `node` | Node that triggered the pause. | |
| `checkpoint_id` | Identifier for the stored checkpoint. |
NodeExecutionError
Section titled “NodeExecutionError”A graph node's execute() method failed.
| Parameter | Type | Description |
|---|---|---|
| `message` | Human-readable error description. | |
| `node` | Name of the failing node. | |
| `cause` | Optional underlying exception. |
PipelineExecutionError
Section titled “PipelineExecutionError”Error raised during pipeline execution.
PipelineStepError
Section titled “PipelineStepError”Error raised by a pipeline step.
WorkflowCompensationError
Section titled “WorkflowCompensationError”Raised when workflow compensation/rollback fails.
WorkflowError
Section titled “WorkflowError”Base exception for workflow orchestration errors.
WorkflowNotFoundError
Section titled “WorkflowNotFoundError”Raised when a workflow definition cannot be found.
WorkflowStateError
Section titled “WorkflowStateError”Raised when a workflow is in an invalid state for the requested operation.
WorkflowStepError
Section titled “WorkflowStepError”Raised when a workflow step fails to execute.
WorkflowTimeoutError
Section titled “WorkflowTimeoutError”Raised when a workflow or step exceeds its timeout.
WorkflowVersionMismatchError
Section titled “WorkflowVersionMismatchError”Raised when resuming a workflow instance with a mismatched definition version.
Resuming an in-flight instance with an incompatible definition version would silently execute stale steps or skip newly-added ones. This error forces callers to either migrate the instance or restart it under the new definition.
| Parameter | Type | Description |
|---|---|---|
| `workflow_name` | Name of the workflow definition. | |
| `expected_version` | Version stored in the running instance (set at creation time). | |
| `actual_version` | Version of the definition currently being used to resume the instance. |