Skip to content
GitHubDiscord

API Reference

Abstract base class for orchestration sagas.

Subclasses declare their saga identity (get_id) and terminal condition (is_completed). Steps are registered via add_step and the full sequence is driven by execute.

Type Parameters: T: The saga state / context type (unused by the base class but carried for subclass typing convenience).

Lifecycle: PENDING → RUNNING → COMPLETED (all steps succeed) PENDING → RUNNING → COMPENSATING → FAILED (a step fails; already-completed steps are unwound)

def __init__(store: SagaStoreProtocol | None = None) -> None
def get_version() -> int

Return the current schema version for this saga class.

Returns
TypeDescription
intThe integer version declared on the class.
def is_compatible_with(stored_version: int) -> bool

Check if stored state version is compatible with this class.

Override for custom migration logic (e.g. allow loading V1 in V2).

Parameters
ParameterTypeDescription
`stored_version`intVersion number found in the persisted state.
Returns
TypeDescription
bool``True`` when the stored version matches the class VERSION.
def get_id() -> str

Return the stable identifier for this saga instance.

Returns
TypeDescription
strA string that uniquely identifies this saga process.
def is_completed() -> bool

Return whether the saga has reached a terminal state.

Returns
TypeDescription
bool``True`` when no further steps remain (success *or* fully compensated failure).
def add_step(step: SagaStep) -> None

Register a step to be executed as part of this saga.

Steps are executed in the order they are added.

Parameters
ParameterTypeDescription
`step`SagaStepThe step to append to the execution sequence.
def was_compensated(step_name: str) -> bool

Return whether step_name has been successfully compensated.

Parameters
ParameterTypeDescription
`step_name`strThe SagaStep.name to check.
Returns
TypeDescription
bool``True`` if the step's compensation action ran successfully.
def mark_compensated(step_name: str) -> None

Manually record step_name as compensated.

Useful when compensation is performed externally (e.g., a separate process handles the rollback) and the saga state needs to be updated.

Parameters
ParameterTypeDescription
`step_name`strThe SagaStep.name to mark as compensated.
async def execute() -> Result[None, SagaError]

Execute all registered steps in registration order.

On first step failure, compensation is triggered for all previously completed steps (in reverse order) and an Err(SagaError(...)) is returned.

Returns
TypeDescription
Result[None, SagaError]``Ok(None)`` when all steps complete successfully. ``Err(SagaError)`` when any step fails (compensation already attempted before returning).
async def compensate() -> None

Manually trigger compensation for all completed steps.

Useful when the calling code detects a failure condition outside the normal execute flow.


Abstract base class for all workflow nodes.

All built-in and custom nodes inherit from this class. The only required override is execute.

Parameters
ParameterTypeDescription
`name`Unique node identifier within the workflow graph.
`node_type`The NodeType label for this node.
def __init__(
    name: str,
    node_type: NodeType = NodeType.CUSTOM
) -> None
property name() -> str

Unique node identifier.

property node_type() -> NodeType

Type label for this node.

async def execute(state: dict[str, Any]) -> dict[str, Any]

Execute this node and return a state-update dict.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current shared workflow state (read-only reference; mutations are applied via the returned dict).
Returns
TypeDescription
dict[str, Any]Dict of key-value pairs to merge into the shared state. Return an empty dict if no state updates are needed.

Workflow node that executes a Lexigram agent.
Parameters
ParameterTypeDescription
`name`Node identifier.
`agent`Object implementing AgentProtocol.
`executor`Optional AgentExecutorProtocol instance.
`input_key`State key to read the agent input from.
`output_key`State key where the agent response is stored.
def __init__(
    name: str,
    *,
    agent: Any,
    executor: Any | None = None,
    input_key: str = 'input',
    output_key: str = 'output'
) -> None
async def execute(state: dict[str, Any]) -> dict[str, Any]

Run the agent with the value of state[input_key].

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current workflow state.
Returns
TypeDescription
dict[str, Any]Dict with {output_key: agent_response_text}.

Result of processing a single batch.
property duration() -> float

Get the duration of this batch.

property success_count() -> int

Get the number of successful items in this batch.

property error_count() -> int

Get the number of failed items in this batch.

property is_successful() -> bool

Check if the batch completed successfully.


Structured error information for a failed batch.

Attributes: batch_id: The ID of the batch that failed. error: Human-readable error message. error_type: The fully-qualified type name of the exception. retry_count: Number of retry attempts made before giving up. timestamp: Monotonic timestamp of the failure.


Manages bulk operations with batching, concurrency control, and error handling.
def __init__(
    config: BulkOperationConfig | None = None,
    processor: Callable[[list[T]], Awaitable[list[R]]] | None = None,
    circuit_breaker: CircuitBreakerProtocol | None = None,
    checkpoint_store: CacheBackendProtocol | None = None,
    *,
    on_progress: Callable[[int, int, str], Awaitable[None]] | None = None,
    on_complete: Callable[[int, int, str], Awaitable[None]] | None = None,
    on_error: Callable[[int, int, str], Awaitable[None]] | None = None
) -> None
property state() -> BulkOperationState

Get the current operation state.

property metrics() -> BulkOperationMetrics

Get operation metrics.

def add_progress_callback(callback: Callable[[BulkOperationMetrics], Awaitable[None]]) -> None

Add a progress tracking callback.

async def clear_checkpoint(operation_id: str) -> None

Remove a stored checkpoint, allowing the operation to restart from scratch.

Parameters
ParameterTypeDescription
`operation_id`strUnique identifier for the bulk operation run to clear.
async def execute(
    items: list[T] | AsyncIterable[T],
    processor: Callable[[list[T]], Awaitable[list[R]]] | None = None,
    operation_id: str | None = None
) -> AsyncIterator[BulkBatchResult[T, R]]

Execute bulk operation on items.

Parameters
ParameterTypeDescription
`items`list[T] | AsyncIterable[T]Items to process — either a list or an async iterable.
`processor`Callable[[list[T]], Awaitable[list[R]]] | NoneOptional processor to override the one set at construction.
`operation_id`str | NoneStable identifier used for checkpoint/resume. If supplied and a ``checkpoint_store`` was provided at construction, completed batches from a previous interrupted run are automatically skipped.
async def cancel() -> None

Cancel the bulk operation.

async def shutdown() -> None

Shut down the bulk operation, cancelling any in-progress execution.

Safe to call even when the operation is not running. Ensures the operation reaches a terminal state so resources can be reclaimed.


Configuration for bulk operations and pipeline execution.

Attributes: batch_size: Number of items per batch. max_concurrency: Maximum number of concurrent operations. timeout: Operation timeout in seconds. retry_attempts: Number of retry attempts. retry_delay: Delay between retries in seconds. enable_progress_tracking: Whether to track progress. circuit_breaker_config: Optional circuit breaker configuration. pipeline_timeout: Default pipeline execution timeout in seconds.


Metrics for bulk operations.
def record_start(
    total_items: int,
    total_batches: int
) -> None

Record the start of the operation.

def record_end() -> None

Record the end of the operation.

def record_batch_result(result: BulkBatchResult) -> None

Update metrics with result from a processed batch.

def record_error(error: Exception) -> None

Record an unexpected operation-level error.

property duration() -> float | None

Get the total duration of the operation.

property success_rate() -> float

Get the success rate as a percentage.

property throughput() -> float

Get items processed per second.


States for bulk operations.

Pipeline step that executes conditionally based on a predicate.
def __init__(
    name: str,
    condition: Callable[[PipelineContext], Result[bool, Exception] | Awaitable[Result[bool, Exception]]],
    true_step: PipelineStep,
    false_step: PipelineStep | None = None,
    dependencies: list[str] | None = None,
    timeout: float | None = None
) -> None
async def execute(context: PipelineContext) -> Result[Any, Exception]

Execute the appropriate step based on condition.

async def should_skip(context: PipelineContext) -> Result[bool, Exception]

Conditional steps should not be skipped.


Formats and inspects the execution trace of a completed workflow run.
Parameters
ParameterTypeDescription
`result`The completed GraphResult to inspect.
def __init__(result: GraphResult) -> None
def as_dict() -> dict[str, Any]

Return the full execution trace as a JSON-serialisable dict.

Returns
TypeDescription
dict[str, Any]Dict with keys iterations, duration_ms, terminated_at, and nodes.
def as_text() -> str

Return a multi-line human-readable execution trace.

Returns
TypeDescription
strA string with one line per node showing its outcome.
def failed_nodes() -> list[NodeResult]

Return node results that terminated with an error.

Returns
TypeDescription
list[NodeResult]Filtered list of failed NodeResult entries.
def succeeded_nodes() -> list[NodeResult]

Return node results that completed successfully.

Returns
TypeDescription
list[NodeResult]Filtered list of successful NodeResult entries.
def total_node_time_ms() -> float

Return cumulative node execution time in milliseconds.


Pipeline step that executes a function.
def __init__(
    name: str,
    func: Callable[[PipelineContext], Result[Any, Exception] | Awaitable[Result[Any, Exception]]],
    dependencies: list[str] | None = None,
    skip_condition: Callable[[PipelineContext], Result[bool, Exception] | Awaitable[Result[bool, Exception]]] | None = None,
    error_handler: Callable[[PipelineContext, Exception], Result[Any, Exception] | Awaitable[Result[Any, Exception]]] | None = None,
    cleanup_func: Callable[[PipelineContext], Result[None, Exception] | Awaitable[Result[None, Exception]]] | None = None,
    timeout: float | None = None
) -> None
async def execute(context: PipelineContext) -> Result[Any, Exception]

Execute the function, honoring optional per-step timeout.

async def should_skip(context: PipelineContext) -> Result[bool, Exception]

Check skip condition.

async def on_error(
    context: PipelineContext,
    error: Exception
) -> Result[Any, Exception]

Handle errors.

async def cleanup(context: PipelineContext) -> Result[None, Exception]

Cleanup resources.


Routing node that forces the graph to branch on state values.
Parameters
ParameterTypeDescription
`name`Node identifier.
`routes`Mapping of target-node-name to a (state) -> bool predicate.
def __init__(
    name: str,
    *,
    routes: dict[str, Callable[[dict[str, Any]], bool]] | None = None
) -> None
async def execute(state: dict[str, Any]) -> dict[str, Any]

Return empty dict — routing is handled by WorkflowEdge conditions.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current workflow state (not mutated here).
Returns
TypeDescription
dict[str, Any]Empty dict — gate nodes produce no output.

Configuration for the workflow graph engine.

Attributes: enabled: Enable the graph workflow subsystem. max_iterations: Maximum graph traversal steps (anti-infinite-loop guard). node_timeout: Per-node execution timeout in seconds. 0 = no limit. total_timeout: Total workflow timeout in seconds. 0 = no limit. checkpoint_enabled: Persist state checkpoints for resumption after failures. parallel_branches: Execute independent parallel branches via asyncio.gather. max_parallel_branches: Max parallel branches at once. 0 = unlimited.

def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Check config is safe for the target environment.

Parameters
ParameterTypeDescription
`env`Environment | NoneTarget deployment environment.
Returns
TypeDescription
list[ConfigIssue]List of configuration issues (empty if all OK).

Aggregate result of a complete graph workflow execution.

Attributes: final_state: The workflow state after all nodes have executed. node_results: Ordered list of per-node results. iterations: Total graph traversal steps taken. duration_ms: Total wall-clock execution time. terminated_at: Name of the terminal node that ended execution.

property output() -> Any

Return the output key from final_state, or None.

property succeeded() -> bool

True when all executed nodes succeeded.


Pause the workflow to collect a human response.
Parameters
ParameterTypeDescription
`name`Node identifier (doubles as checkpoint_id).
`prompt`Question or instruction to display to the operator. Supports {key} substitution from state.
`output_key`State key where the human response will be stored.
`resume_key`State key the engine populates with the human answer when WorkflowEngine.resume() is called.
def __init__(
    name: str,
    *,
    prompt: str = 'Human input required.',
    output_key: str = 'human_response',
    resume_key: str | None = None
) -> None
async def execute(state: dict[str, Any]) -> dict[str, Any]

Return human response if resumed, otherwise pause execution.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current workflow state.
Returns
TypeDescription
dict[str, Any]Dict with {output_key: human_response} when resumed.
Raises
ExceptionDescription
HumanInputRequiredErrorWhen no prior human response is in state.

Workflow node that makes a raw LLM call.
Parameters
ParameterTypeDescription
`name`Node identifier.
`llm`LLMClientProtocol implementation.
`prompt_template`Prompt string with optional {key} variables.
`system_prompt`Optional system message prepended to the call.
`output_key`State key where the completion is stored.
`model`Optional model override.
`temperature`Sampling temperature override.
`max_tokens`Max output token override.
def __init__(
    name: str,
    *,
    llm: Any,
    prompt_template: str = '{input}',
    system_prompt: str = '',
    output_key: str = 'output',
    model: str | None = None,
    temperature: float | None = None,
    max_tokens: int | None = None
) -> None
async def execute(state: dict[str, Any]) -> dict[str, Any]

Render the prompt from state and call the LLM.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current workflow state used for prompt substitution.
Returns
TypeDescription
dict[str, Any]Dict with {output_key: completion_text}.

Result produced by a single workflow graph node execution.

Attributes: node_name: Name of the node that produced this result. output: State update dict returned by the node. duration_ms: Wall-clock duration of the node execution. error: Error message if the node raised; None on success. skipped: True when the node was bypassed by conditional routing.

property succeeded() -> bool

True when the node completed without error and was not skipped.


Enumeration of built-in workflow node types.

Pipeline step that executes multiple steps in parallel.
def __init__(
    name: str,
    steps: list[PipelineStep],
    dependencies: list[str] | None = None,
    *,
    fail_fast: bool = True,
    timeout: float | None = None
) -> None
async def execute(context: PipelineContext) -> Result[list[Any], Exception]

Execute all steps in parallel.


Declarative pipeline for composing business workflows.
def __init__(
    name: str,
    steps: list[PipelineStep],
    checkpoint_store: CacheBackendProtocol | None = None
) -> None
def add_step(step: PipelineStep) -> None

Append a step to this pipeline.

Parameters
ParameterTypeDescription
`step`PipelineStepThe PipelineStep to append. Validates pipeline structure after addition.
Raises
ExceptionDescription
ValueErrorIf the new step creates duplicate names or broken deps.
def to_dot() -> str

Return a Graphviz DOT representation of the pipeline execution graph.

Nodes represent pipeline steps; directed edges represent declared dependencies (dep → step). The output can be rendered with dot -Tpng or pasted into an online Graphviz viewer.

Returns
TypeDescription
strA string containing valid Graphviz DOT source for the pipeline.

Example

dot_src = pipeline.to_dot()
# dot -Tsvg -o pipeline.svg <<< "$(python -c '...')"
async def execute(
    initial_context: PipelineContext | None = None,
    fail_fast: bool = True
) -> Result[PipelineContext, Exception]

Execute the pipeline.


Context passed through pipeline execution, accumulating step results and metadata.
def get_step_result(
    step_name: str,
    default: Any = None
) -> Any

Get result from a completed step.

Parameters
ParameterTypeDescription
`step_name`strThe name of the step whose result to retrieve.
`default`AnyValue to return if the step has no recorded result.
Returns
TypeDescription
AnyThe recorded step result, or *default* if not found.
def set_step_result(
    step_name: str,
    result: Any
) -> None

Set result for a step.

Parameters
ParameterTypeDescription
`step_name`strThe name of the step.
`result`AnyThe result value to record.
def add_metadata(
    key: str,
    value: Any
) -> None

Add metadata to the context.

Parameters
ParameterTypeDescription
`key`strMetadata key.
`value`AnyMetadata value.
def get_metadata(
    key: str,
    default: Any = None
) -> Any

Get metadata from the context.

Parameters
ParameterTypeDescription
`key`strMetadata key to look up.
`default`AnyValue to return if the key is absent.
Returns
TypeDescription
AnyThe metadata value, or *default* if not found.
property duration() -> float

Total duration of pipeline execution in seconds.

property failed_steps() -> list[StepExecutionResult]

List of failed step results.

property successful_steps() -> list[StepExecutionResult]

List of successfully completed step results.

def has_failed() -> bool

Check if any steps in the pipeline failed.

Returns
TypeDescription
bool``True`` if at least one step has a FAILED status.

Abstract base class for pipeline steps.

Non-execute helpers are deliberately simple: they return plain values and may raise exceptions. This keeps user implementations lightweight and avoids the noise of wrapping every result in Result. Helpers are responsible for handling their own errors; the pipeline driver will catch and convert them into failed step records if necessary.

Steps may specify an optional timeout (in seconds). When set, the execute invocation will be wrapped with asyncio.wait_for so that a long-running or hung step is interrupted and treated as a failure. The timeout does not apply to should_skip, on_error or cleanup helpers.

def __init__(
    name: str,
    dependencies: list[str] | None = None,
    timeout: float | None = None,
    resilience: ResiliencePipelineProtocol | None = None
) -> None
async def execute(context: PipelineContext) -> Result[Any, Exception]

Execute the pipeline step.

Parameters
ParameterTypeDescription
`context`PipelineContextThe shared pipeline context carrying inter-step data.
Returns
TypeDescription
Result[Any, Exception]A ``Result`` wrapping the step's output or a failure.
async def should_skip(context: PipelineContext) -> Result[bool, Exception]

Determine whether to skip this step.

Parameters
ParameterTypeDescription
`context`PipelineContextThe current pipeline context.
Returns
TypeDescription
Result[bool, Exception]``Ok(True)`` to skip, ``Ok(False)`` to execute, or ``Err(e)`` on failure while evaluating the condition.

The default implementation always returns Ok(False).

async def on_error(
    context: PipelineContext,
    error: Exception
) -> Any

Handle errors raised during execute.

Parameters
ParameterTypeDescription
`context`PipelineContextThe current pipeline context.
`error`ExceptionThe exception that caused the step to fail.

The default behaviour is to re-raise the error, which causes the step (and pipeline) to fail. Custom handlers may return a value to recover and continue the pipeline.

async def cleanup(context: PipelineContext) -> Result[None, Exception]

Cleanup resources after step execution.

Parameters
ParameterTypeDescription
`context`PipelineContextThe current pipeline context.

Called regardless of success or failure. The default implementation does nothing. Exceptions are logged but not propagated.


Descriptor for a single saga orchestration step.

Shared across lexigram-workflow and lexigram-events via contracts.

The action callable must return Result[Any, SagaStepError]. The compensation callable is called only when the saga is unwinding and may raise exceptions freely (infrastructure failures in compensation should propagate).

Attributes: name: Unique step identifier within the saga. action: Async callable returning Result[Any, SagaStepError]. compensation: Async callable to undo the step on failure. max_retries: Maximum retry attempts if action returns Err. retry_delay: Seconds to wait between retry attempts. idempotent: Whether the compensation is safe to call multiple times.


Result of executing a single pipeline step.

Pipeline step execution status.

Embed a nested WorkflowEngine as a single node in a parent workflow.
Parameters
ParameterTypeDescription
`name`Node identifier.
`workflow`A WorkflowEngine instance to nest.
`input_key`State key to read as the subworkflow input value.
`output_key`State key where the subworkflow result is stored.
def __init__(
    name: str,
    *,
    workflow: Any,
    input_key: str = 'input',
    output_key: str = 'output'
) -> None
async def execute(state: dict[str, Any]) -> dict[str, Any]

Execute the nested workflow with the current parent state.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current parent workflow state.
Returns
TypeDescription
dict[str, Any]Dict with {output_key: subworkflow_output} on success or {output_key: "Subworkflow error: ..."} on failure.

Workflow node that executes a Lexigram tool.
Parameters
ParameterTypeDescription
`name`Node identifier.
`tool`Object implementing ToolProtocol with async execute().
`input_key`State key to read the tool primary input from.
`output_key`State key where the tool result is stored.
`extra_keys`Additional state keys forwarded to the tool as arguments.
def __init__(
    name: str,
    *,
    tool: Any,
    input_key: str = 'input',
    output_key: str = 'output',
    extra_keys: list[str] | None = None
) -> None
async def execute(state: dict[str, Any]) -> dict[str, Any]

Call the tool with state-derived arguments.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current workflow state.
Returns
TypeDescription
dict[str, Any]Dict with {output_key: tool_result}.

Composable transformation pipe.

Each step in the pipe receives the output of the previous step and returns the (possibly transformed) value. Steps can be sync or async callables.

The pipe is immutable — calling pipe returns a new TransformPipe instance with the added step.

def __init__(
    steps: list[Callable[Ellipsis, Any]] | None = None,
    error_handler: Callable[[Exception, Any], Any] | None = None
) -> None
def pipe(step: Callable[[T], U | Awaitable[U]]) -> TransformPipe[U]

Add a transformation step to the pipe.

The supplied step should accept the current value (of type T) and return either a new value of type U or an awaitable producing U. The returned pipe has type TransformPipe[U] so that subsequent calls are type safe.

Parameters
ParameterTypeDescription
`step`Callable[[T], U | Awaitable[U]]A callable (sync or async) that takes a value and returns the transformed value.
Returns
TypeDescription
TransformPipe[U]A new ``TransformPipe`` with the step appended.
def tap(side_effect: Callable[[T], Any]) -> TransformPipe[T]

Add a side-effect step that observes but does not transform the value.

The side-effect callable receives the current value. Its return value is ignored and the original value is forwarded unchanged.

Parameters
ParameterTypeDescription
`side_effect`Callable[[T], Any]A callable (sync or async) for observation/logging.
Returns
TypeDescription
TransformPipe[T]A new ``TransformPipe`` with the tap step appended.
def pipe_if(
    predicate: Callable[[T], bool],
    step: Callable[[T], U | Awaitable[U]]
) -> TransformPipe[U | T]

Add a conditional transformation step.

The step is only applied when predicate(value) returns True. Otherwise the value passes through unchanged.

Parameters
ParameterTypeDescription
`predicate`Callable[[T], bool]A callable that decides whether to apply *step*.
`step`Callable[[T], U | Awaitable[U]]The transformation to apply when the predicate holds.
Returns
TypeDescription
TransformPipe[U | T]A new ``TransformPipe`` with the conditional step appended.
def catch(handler: Callable[[Exception, T], Any]) -> TransformPipe[T]

Set a raw exception recovery handler for the pipe.

When any step raises and a handler is set, the handler receives (exception, last_value) and its return value becomes the pipe result, swallowing the exception.

Warning catch() operates outside the Result pattern — it intercepts raw exceptions rather than returning Err(…). Prefer designing pipe steps to return Result[T, E] and handling errors at the call-site via result.match() or result.and_then() (async) / result.and_then_sync() when possible. Use catch() only for coarse-grained last-resort recovery (e.g. logging and substituting a sentinel value) where propagating exceptions would crash the surrounding pipeline.

Parameters
ParameterTypeDescription
`handler`Callable[[Exception, T], Any]A callable ``(exception, value) -> recovery_value``.
Returns
TypeDescription
TransformPipe[T]A new ``TransformPipe`` with the error handler configured.
async def execute(
    value: T,
    **kwargs: Any
) -> T

Execute the pipe by passing the value through all steps.

Parameters
ParameterTypeDescription
`value`TThe initial input value. **kwargs: Additional keyword arguments forwarded to each step.
Returns
TypeDescription
TThe final transformed value.

Fluent builder for WorkflowEngine.
Parameters
ParameterTypeDescription
`name`Workflow identifier used in logs and checkpoints.
`config`Default runtime configuration.
def __init__(
    name: str,
    config: GraphConfig | None = None,
    version: int = 1
) -> None
def add_node(
    name: str,
    *,
    node: AbstractWorkflowNode | None = None,
    agent: Any | None = None,
    tool: Any | None = None,
    llm: Any | None = None,
    prompt: str = '',
    human_prompt: str = '',
    subworkflow: Any | None = None,
    executor: Any | None = None,
    output_key: str = 'output'
) -> WorkflowBuilder

Register a node in the workflow graph.

Parameters
ParameterTypeDescription
`name`strUnique node identifier.
`node`AbstractWorkflowNode | NonePre-built AbstractWorkflowNode instance.
`agent`Any | NoneAgentProtocol wrapped in AgentNode.
`tool`Any | NoneToolProtocol wrapped in ToolNode.
`llm`Any | NoneLLMClientProtocol wrapped in LLMNode.
`prompt`strPrompt template for llm nodes.
`human_prompt`strPrompt shown to the human for human nodes.
`subworkflow`Any | NoneNested workflow for SubworkflowNode.
`executor`Any | NoneAgent executor required for agent nodes.
`output_key`strState key to store the node primary output under.
Returns
TypeDescription
WorkflowBuilderself for chaining.
Raises
ExceptionDescription
ValueErrorIf name is already registered or no node type is given.
def add_gate(
    name: str,
    routes: dict[str, Callable[[dict[str, Any]], bool]]
) -> WorkflowBuilder

Add a GateNode with conditional outgoing edges.

Parameters
ParameterTypeDescription
`name`strGate node identifier.
`routes`dict[str, Callable[[dict[str, Any]], bool]]Mapping of target-node-name to condition callable.
Returns
TypeDescription
WorkflowBuilderself for chaining.
def add_edge(
    source: str,
    target: str,
    *,
    condition: Callable[[dict[str, Any]], bool] | None = None,
    label: str = ''
) -> WorkflowBuilder

Add a directed edge from source to target.

Parameters
ParameterTypeDescription
`source`strSource node name.
`target`strTarget node name.
`condition`Callable[[dict[str, Any]], bool] | NoneOptional guard callable (state: dict) -> bool.
`label`strOptional human-readable label.
Returns
TypeDescription
WorkflowBuilderself for chaining.
def set_entry(node: str) -> WorkflowBuilder

Set node as the workflow entry point.

Parameters
ParameterTypeDescription
`node`strNode name to start execution from.
Returns
TypeDescription
WorkflowBuilderself for chaining.
def set_terminal(
    node: str,
    *,
    condition: Callable[[dict[str, Any]], bool] | None = None
) -> WorkflowBuilder

Mark node as a terminal node.

Parameters
ParameterTypeDescription
`node`strNode name to mark as terminal.
`condition`Callable[[dict[str, Any]], bool] | NoneOptional guard callable.
Returns
TypeDescription
WorkflowBuilderself for chaining.
def configure(config: GraphConfig) -> WorkflowBuilder

Replace the default GraphConfig.

Parameters
ParameterTypeDescription
`config`GraphConfigNew configuration.
Returns
TypeDescription
WorkflowBuilderself for chaining.
def build() -> WorkflowEngine

Validate and build the WorkflowEngine.

Returns
TypeDescription
WorkflowEngineReady-to-execute workflow engine.
Raises
ExceptionDescription
GraphValidationErrorIf entry node is not set or graph has structural errors.

In-memory checkpoint store for workflow states.
Parameters
ParameterTypeDescription
`max_size`Maximum number of checkpoints to keep. Oldest entries are evicted once the limit is reached. 0 means unlimited.
def __init__(max_size: int = 512) -> None
def save(
    checkpoint_id: str,
    state: WorkflowState
) -> None

Persist a snapshot of state under checkpoint_id.

Parameters
ParameterTypeDescription
`checkpoint_id`strUnique identifier for this checkpoint.
`state`WorkflowStateWorkflow state to snapshot.
def load(checkpoint_id: str) -> dict[str, object] | None

Retrieve the state snapshot for checkpoint_id.

Parameters
ParameterTypeDescription
`checkpoint_id`strIdentifier to look up.
Returns
TypeDescription
dict[str, object] | NoneThe raw state dict if found, otherwise None.
def delete(checkpoint_id: str) -> bool

Remove a checkpoint entry.

Parameters
ParameterTypeDescription
`checkpoint_id`strIdentifier to remove.
Returns
TypeDescription
boolTrue if an entry was removed, False if not found.

Emitted when a workflow instance reaches a terminal success state.

Attributes: workflow_id: Unique identifier of the workflow instance. workflow_name: Human-readable name of the workflow definition.


Payload fired when a workflow instance reaches a terminal state.

Attributes: workflow_id: Unique identifier of the workflow instance. workflow_type: Type or name of the workflow definition. succeeded: True if the workflow completed successfully.


Immutable, versioned descriptor for a workflow.
Parameters
ParameterTypeDescription
`name`Unique workflow name.
`steps`Ordered list of step descriptors (opaque; interpreted by the executor). Defaults to an empty list for graph-based workflows where nodes/edges are the canonical step representation.
`version`Definition schema version. Increment this whenever the definition structure changes incompatibly. Defaults to ``1``.

A directed edge in the workflow graph.

Edges connect a source node to a target node. An optional condition callable gates the edge: if provided, the edge is traversed only when condition(state) is truthy. Edges without a condition are always traversed (unconditional/always edges).

For parallel branches, list multiple edges from the same source nodes; WorkflowEngine will collect all edges whose conditions are satisfied and execute them concurrently when GraphConfig.parallel_branches is True.

Attributes: source: Name of the originating node. target: Name of the destination node. condition: Optional callable (state: dict) -> bool. When None the edge is unconditional. label: Optional human-readable label for documentation/debugging. is_parallel: Hint that this edge should be executed concurrently with sibling edges from the same source. The engine enables parallelism automatically when multiple conditions are satisfied; this flag provides an explicit override.

def is_active(state: dict[str, Any]) -> bool

Return True if this edge should be traversed given state.

Parameters
ParameterTypeDescription
`state`dict[str, Any]Current workflow state dict.
Returns
TypeDescription
bool``True`` for unconditional edges or when the condition is met.

Async directed-graph workflow executor.

Construct via WorkflowBuilder.

Parameters
ParameterTypeDescription
`name`Display name for logging and checkpointing.
`nodes`Dict mapping node name to node instance.
`edges`List of all edges in the graph.
`entry_node`Name of the starting node.
`terminal_conditions`Mapping of terminal node name to optional condition callable (state) -> bool. If None the node is always terminal.
`config`Runtime configuration.
def __init__(
    name: str,
    nodes: dict[str, AbstractWorkflowNode],
    edges: list[WorkflowEdge],
    entry_node: str,
    terminal_conditions: dict[str, Any],
    config: GraphConfig | None = None,
    version: int = 1
) -> None
property name() -> str

Workflow display name used in logs and checkpoints.

property version() -> int

Definition schema version — incremented when the graph structure changes incompatibly.

async def execute(
    input: str,
    *,
    config: GraphConfig | None = None,
    state: dict[str, Any] | None = None
) -> Result[GraphResult, GraphExecutionError]

Execute the workflow from the entry node.

Parameters
ParameterTypeDescription
`input`strUser input string injected as state["input"].
`config`GraphConfig | NoneOptional per-call config override.
`state`dict[str, Any] | NoneOptional pre-populated initial state.
Returns
TypeDescription
Result[GraphResult, GraphExecutionError]Ok(GraphResult) or Err(GraphExecutionError) on failure.
async def resume(
    checkpoint_state: dict[str, Any],
    human_response: str,
    *,
    config: GraphConfig | None = None
) -> Result[GraphResult, GraphExecutionError]

Resume a workflow paused at a human-in-the-loop node.

Parameters
ParameterTypeDescription
`checkpoint_state`dict[str, Any]State dict snapshot from the checkpoint.
`human_response`strHuman operator input.
`config`GraphConfig | NoneOptional per-call config override.
Returns
TypeDescription
Result[GraphResult, GraphExecutionError]Ok(GraphResult) or Err(GraphExecutionError) on failure.
def validate() -> None

Validate the workflow graph structure.

Raises
ExceptionDescription
GraphValidationErrorIf the graph has structural problems.

Emitted when a workflow instance terminates due to an unrecoverable error.

Attributes: workflow_id: Unique identifier of the workflow instance. workflow_name: Human-readable name of the workflow definition. error: Human-readable description of the failure reason.


A single running (or paused) execution of a workflow definition.
Parameters
ParameterTypeDescription
`instance_id`Unique identifier for this execution.
`workflow_name`Name of the workflow definition this instance belongs to.
`definition_version`The WorkflowDefinition.version recorded at instance-creation time. Used to detect incompatible definition changes on resume.
def create(
    cls,
    definition: WorkflowDefinition
) -> WorkflowInstance

Create a new instance stamped with definition’s version.

Parameters
ParameterTypeDescription
`definition`WorkflowDefinitionThe workflow definition being started.
Returns
TypeDescription
WorkflowInstanceA new WorkflowInstance whose definition_version matches ``definition.version``.
def validate_definition_version(definition: WorkflowDefinition) -> None

Raise if definition’s version differs from the one this instance was started under.

Call this before executing any step when resuming a non-new instance to prevent silently running stale or incompatible steps.

Parameters
ParameterTypeDescription
`definition`WorkflowDefinitionThe workflow definition being used to resume this instance.
Raises
ExceptionDescription
WorkflowVersionMismatchErrorIf ``definition.version`` differs from the version recorded at instance creation.

Pipeline orchestration, bulk operations, saga state machines, and graph engine.

Call configure to configure the workflow subsystem.

Usage (defaults)

@module(imports=[WorkflowModule.configure()])
class AppModule(Module):
pass

Usage (configured)

from lexigram.workflow.config import BulkOperationConfig
@module(
imports=[WorkflowModule.configure(BulkOperationConfig(batch_size=200))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: Any | None = None,
    saga_store: SagaStoreProtocol | None = None
) -> DynamicModule

Create a WorkflowModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneBulkOperationConfig or ``None`` for framework defaults.
`saga_store`SagaStoreProtocol | NoneOptional durable SagaStoreProtocol implementation. Defaults to in-memory.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(
    cls,
    config: Any = None
) -> DynamicModule

Return an in-memory WorkflowModule for unit testing.

Uses in-memory saga store and default workflow configuration. No external state backends are required.

Returns
TypeDescription
DynamicModuleA DynamicModule with in-memory workflow state.

DI provider for the workflow subsystem.

Registers pipeline, bulk-operation, and saga infrastructure into the container. All registrations use singleton scope so that the same configured objects are shared across the application.

Attributes: config: BulkOperationConfig controlling bulk and pipeline defaults. saga_store: Optional SagaStoreProtocol implementation for durable saga state. When omitted, sagas operate in-memory only (suitable for development and single-process deployments).

def __init__(
    config: BulkOperationConfig | None = None,
    saga_store: SagaStoreProtocol | None = None,
    state_machine: StateMachineProtocol | None = None,
    db_provider: DatabaseProviderProtocol | None = None,
    state_table: str = 'workflow_state_transitions'
) -> None

Initialise the WorkflowProvider.

Parameters
ParameterTypeDescription
`config`BulkOperationConfig | None``BulkOperationConfig`` for bulk and pipeline defaults. Defaults to ``BulkOperationConfig()`` (framework defaults).
`saga_store`SagaStoreProtocol | NoneOptional durable store for saga state. When ``None``, sagas run in-memory only.
`state_machine`StateMachineProtocol | NoneOptional pre-configured StateMachine instance to register under StateMachineProtocol. When ``None``, no state machine singleton is registered.
`db_provider`DatabaseProviderProtocol | NoneOptional database provider for state transition persistence. When supplied, a DatabaseStatePersistence is registered under StatePersistenceProtocol so that ``StateMachine`` instances can be resolved with durable persistence.
`state_table`strSQL table name used by ``DatabaseStatePersistence``. Defaults to ``"workflow_state_transitions"``.
def from_config(
    cls,
    config: BulkOperationConfig,
    **context: Any
) -> Self

Create provider from config object.

async def register(container: ContainerRegistrarProtocol) -> None

Register workflow services into the DI container.

Bindings registered:

  • WorkflowProvider — self-reference for inspection.
  • BulkOperationConfig — pipeline/bulk defaults.
  • Pipeline / PipelineProtocol — default pipeline factory.
  • SagaStoreProtocol — optional durable saga store.
Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolDI registrar provided by the framework.
async def boot(container: ContainerResolverProtocol) -> None

Perform post-registration initialisation.

Resolves BulkOperationConfig and logs the active configuration so that operators can confirm the expected settings on startup.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolDI resolver provided by the framework.
async def shutdown() -> None

Shut down the workflow provider and release any held resources.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check provider health.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for health check response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult with status and component details.

Execute a WorkflowEngine with retry and optional checkpointing.
Parameters
ParameterTypeDescription
`engine`The WorkflowEngine to run.
`max_retries`Maximum attempts after the first failure.
`retry_delay`Seconds to wait between attempts.
`checkpoint`Optional WorkflowCheckpoint instance.
def __init__(
    engine: WorkflowEngine,
    *,
    max_retries: int = 0,
    retry_delay: float = 1.0,
    checkpoint: Any | None = None
) -> None
async def run(
    input: str,
    *,
    state: dict[str, Any] | WorkflowState | None = None
) -> Result[GraphResult, GraphExecutionError]

Execute the workflow, retrying on transient failures.

Parameters
ParameterTypeDescription
`input`strWorkflow input string.
`state`dict[str, Any] | WorkflowState | NoneOptional pre-populated WorkflowState.
Returns
TypeDescription
Result[GraphResult, GraphExecutionError]Ok(GraphResult) on success or Err(GraphExecutionError) on permanent failure.
Raises
ExceptionDescription
HumanInputRequiredErrorWhen a HumanNode pauses execution.
async def resume(
    human_response: str,
    *,
    state: WorkflowState,
    response_key: str = 'human_response'
) -> Result[GraphResult, GraphExecutionError]

Resume a paused HITL workflow by injecting the human response.

Parameters
ParameterTypeDescription
`human_response`strThe operator response to inject into state.
`state`WorkflowStateThe WorkflowState from the paused point.
`response_key`strKey under which to store human_response.
Returns
TypeDescription
Result[GraphResult, GraphExecutionError]Ok(GraphResult) on success or Err(GraphExecutionError).

Emitted when a workflow instance begins execution.

Attributes: workflow_id: Unique identifier of the workflow instance. workflow_name: Human-readable name of the workflow definition.


Payload fired when a workflow instance begins execution.

Attributes: workflow_id: Unique identifier of the workflow instance. workflow_type: Type or name of the workflow definition.


Shared, mutable state container for a workflow execution.

All nodes read from and write to the same WorkflowState instance. Each node’s execute() method receives the state as a plain dict[str, Any] and returns a partial update dict that is merged back via merge.

Keys set by the engine:

  • "input" — the original user input string
  • "output" — the most recent node output (updated each step)
  • "_iteration" — current traversal step count
  • "_history" — ordered list of (node_name, output_dict) tuples

Example

from lexigram.logging import get_logger
logger = get_logger(__name__)
state = WorkflowState(input="Summarise this document")
state.merge({"summary": "...", "output": "..."})
logger.info("workflow_output", output=state["output"])
def __init__(
    input: str = '',
    *,
    initial: dict[str, Any] | None = None
) -> None
def get(
    key: str,
    default: Any = None
) -> Any

Return the value for key, or default if absent.

def as_dict() -> dict[str, Any]

Return a shallow copy of the underlying state dict.

def merge(update: dict[str, Any]) -> None

Merge update into state, overwriting existing keys.

Internal keys prefixed with _ are skipped from external updates to prevent accidental manipulation of engine metadata.

def record(
    node_name: str,
    output: dict[str, Any]
) -> None

Append a node execution record to the history list.

Parameters
ParameterTypeDescription
`node_name`strName of the node that produced *output*.
`output`dict[str, Any]State update dict returned by the node.
property iteration() -> int

Current graph traversal step count.

property history() -> list[tuple[str, dict[str, Any]]]

Ordered list of (node_name, output_dict) tuples.


Payload fired when a workflow instance transitions between states.

Attributes: workflow_id: Unique identifier of the workflow instance. from_state: State name being exited. to_state: State name being entered.


def conditional(
    name: str,
    condition: Callable[[PipelineContext], Awaitable[Result[bool, Exception]]],
    true_step: PipelineStep,
    false_step: PipelineStep | None = None,
    dependencies: list[str] | None = None,
    timeout: float | None = None
) -> ConditionalStep

Create a conditional pipeline step.


def parallel(
    name: str,
    steps: list[PipelineStep],
    dependencies: list[str] | None = None,
    *,
    fail_fast: bool = True,
    timeout: float | None = None
) -> ParallelStep

Create a parallel execution step.


def pipeline_step(dependencies: list[str] | None = None) -> Callable[[Callable[[PipelineContext], Awaitable[Result[Any, Exception]]]], FunctionStep]

Decorator to convert a function into a pipeline step.


def saga_step(
    name: str | None = None,
    *,
    compensation: Callable[Ellipsis, Any] | None = None
) -> Callable[[F], F]

Mark an async function as a saga step with optional compensation.

A saga step participates in distributed transaction management. If the step fails and a compensation handler is provided, it will be invoked during rollback.

Parameters
ParameterTypeDescription
`name`str | NoneOptional step name override. Defaults to the function name.
`compensation`Callable[Ellipsis, Any] | NoneAsync callable to invoke for rollback on failure.
Returns
TypeDescription
Callable[[F], F]Decorator that attaches saga metadata to the function.

Example

@saga_step(name=“reserve_inventory”, compensation=release_inventory) async def reserve_inventory(ctx: SagaContext) -> None: …


def step(
    name: str,
    func: Callable[[PipelineContext], Awaitable[Result[Any, Exception]]],
    dependencies: list[str] | None = None,
    timeout: float | None = None
) -> FunctionStep

Create a function-based pipeline step.

timeout is an optional per-step timeout in seconds; it is forwarded to FunctionStep.


def workflow(
    name: str | None = None,
    *,
    timeout: float | None = None,
    retries: int = 0
) -> Callable[[F], F]

Mark an async function as a workflow definition.

Attaches workflow metadata used by the execution engine for registration, timeout enforcement, and retry policy.

Parameters
ParameterTypeDescription
`name`str | NoneOptional workflow name override. Defaults to the function name.
`timeout`float | NoneMaximum execution time in seconds. None means no limit.
`retries`intNumber of retry attempts on failure.
Returns
TypeDescription
Callable[[F], F]Decorator that attaches workflow metadata to the function.

Example

@workflow(name=“onboarding”, timeout=300.0, retries=1) async def onboarding_workflow(ctx: WorkflowContext) -> None: …


Raised when a bulk operation is cancelled.
def __init__(message: str = 'Bulk operation cancelled') -> None

Base exception for bulk operation errors.
def __init__(message: str = 'Bulk operation error') -> None

Raised when a bulk operation times out.
def __init__(message: str = 'Bulk operation timed out') -> None

Graph engine exceeded max_iterations (likely cycle).
Parameters
ParameterTypeDescription
`iterations`Number of iterations completed before the limit.
`node`Name of the node that was about to execute again.
def __init__(
    iterations: int,
    *,
    node: str | None = None
) -> None

Base exception for graph workflow engine operations.
Parameters
ParameterTypeDescription
`message`Human-readable error description.
`node`Optional name of the node where the error occurred.
def __init__(
    message: str,
    *,
    node: str | None = None
) -> None

Graph workflow execution exceeded total timeout.
Parameters
ParameterTypeDescription
`timeout`The configured timeout value in seconds.
def __init__(timeout: float) -> None

Graph structure validation failed.

Raised by HumanNode to pause execution awaiting human input.
Parameters
ParameterTypeDescription
`prompt`Text/question to display to the human operator.
`node`Node that triggered the pause.
`checkpoint_id`Identifier for the stored checkpoint.
def __init__(
    prompt: str,
    *,
    node: str | None = None,
    checkpoint_id: str | None = None
) -> None

A graph node's execute() method failed.
Parameters
ParameterTypeDescription
`message`Human-readable error description.
`node`Name of the failing node.
`cause`Optional underlying exception.
def __init__(
    message: str,
    *,
    node: str | None = None,
    cause: BaseException | None = None
) -> None

Error raised during pipeline execution.
def __init__(
    step_name: str,
    error: Exception,
    details: dict[str, Any] | None = None,
    **kwargs: Any
) -> None

Error raised by a pipeline step.
def __init__(
    message: str = 'Pipeline step error',
    **kwargs: Any
) -> None

Raised when workflow compensation/rollback fails.

Base exception for workflow orchestration errors.
def __init__(
    message: str = 'Workflow error',
    **kwargs: Any
) -> None

Raised when a workflow definition cannot be found.
def __init__(workflow_id: str) -> None

Raised when a workflow is in an invalid state for the requested operation.

Raised when a workflow step fails to execute.
def __init__(
    step_name: str,
    reason: str
) -> None

Raised when a workflow or step exceeds its timeout.

Raised when resuming a workflow instance with a mismatched definition version.

Resuming an in-flight instance with an incompatible definition version would silently execute stale steps or skip newly-added ones. This error forces callers to either migrate the instance or restart it under the new definition.

Parameters
ParameterTypeDescription
`workflow_name`Name of the workflow definition.
`expected_version`Version stored in the running instance (set at creation time).
`actual_version`Version of the definition currently being used to resume the instance.
def __init__(
    workflow_name: str,
    expected_version: int,
    actual_version: int
) -> None