Skip to content
GitHub

Workflows & Sagas

lexigram-workflow provides three orchestration primitives: pipelines for sequential step execution, sagas for long-running processes with compensation, and graph workflows for DAG-based execution with branching and gates.

For the full configuration reference, workflow graph DSL, and advanced saga patterns, see the lexigram-workflow package docs.


A Pipeline executes steps in registration order. Each step receives a shared PipelineContext and produces a Result:

from lexigram.result import Result, Ok, Err
from lexigram.workflow import Pipeline, FunctionStep, PipelineContext
class OrderPipeline:
def __init__(self) -> None:
self._pipeline = Pipeline()
def build(self) -> None:
self._pipeline.add_step(FunctionStep(
name="validate_order", func=self.validate, timeout=10.0,
))
self._pipeline.add_step(FunctionStep(
name="charge_payment", func=self.charge,
dependencies=["validate_order"], timeout=30.0,
))
self._pipeline.add_step(FunctionStep(
name="send_confirmation", func=self.confirm,
dependencies=["charge_payment"], timeout=5.0,
))
async def validate(self, ctx: PipelineContext) -> Result[str, str]:
return Ok("validated") if ctx.metadata.get("order") else Err("No order")
async def charge(self, ctx: PipelineContext) -> Result[str, str]:
return Ok("charged")
async def confirm(self, ctx: PipelineContext) -> Result[str, str]:
return Ok("confirmed")
from lexigram.workflow import ConditionalStep, ParallelStep
conditional = ConditionalStep(
name="check_risk",
condition=lambda ctx: ctx.metadata.get("amount", 0) > 10000,
true_step=FunctionStep(name="require_review", func=review),
false_step=FunctionStep(name="auto_approve", func=approve),
)
parallel = ParallelStep(
name="notify_all", fail_fast=False, timeout=15.0,
steps=[
FunctionStep(name="email", func=send_email),
FunctionStep(name="sms", func=send_sms),
],
)

A saga coordinates a multi-step process with compensating actions. Extend AbstractSaga and add steps:

from lexigram.result import Result, Ok, Err
from lexigram.workflow import AbstractSaga, SagaStep
class BookingSaga(AbstractSaga):
def __init__(self) -> None:
super().__init__()
self.add_step(SagaStep(
name="reserve_flight",
action=self.reserve_flight,
compensation=self.cancel_flight,
max_retries=3, idempotent=True,
))
self.add_step(SagaStep(
name="book_hotel",
action=self.book_hotel,
compensation=self.cancel_hotel,
))
async def reserve_flight(self) -> Result[str, str]:
return Ok("FL-123")
async def cancel_flight(self) -> None:
await self._flight_api.cancel("FL-123")
async def book_hotel(self) -> Result[str, str]:
return Err("no rooms available")
async def cancel_hotel(self) -> None:
pass
result = await saga.execute()
if result.is_err():
print(f"Compensated: {saga.was_compensated()}")

Sagas transition through SagaState: PENDING → RUNNING → COMPLETED or PENDING → RUNNING → COMPENSATING → FAILED. The state is persisted via SagaStoreProtocol:

from lexigram.contracts.workflow.protocols import SagaStoreProtocol, SagaState
class SagaPersistence:
def __init__(self, store: SagaStoreProtocol) -> None:
self._store = store
async def save_checkpoint(self, saga_id: str, state: SagaState, data: dict) -> None:
await self._store.save(saga_id, state, data)
async def resume(self, saga_id: str) -> SagaState | None:
result = await self._store.load(saga_id)
return result[0] if result else None

For DAG-based workflows, use WorkflowBuilder and WorkflowEngine. Build a directed graph of nodes with gates and edges:

from lexigram.workflow import WorkflowBuilder, WorkflowEngine
class DocumentApprovalFlow:
def __init__(self) -> None:
self._builder = WorkflowBuilder()
def build(self) -> WorkflowEngine:
b = self._builder
b.add_node("start", entry_point=True)
b.add_node("validate")
b.add_node("review")
b.add_node("approve")
b.add_node("reject")
b.add_node("publish")
b.add_gate("is_valid", condition=lambda s: s.get("valid", False))
b.add_edge("start", "validate")
b.add_edge("validate", "is_valid")
b.add_edge("is_valid", "review")
b.add_edge("is_valid", "reject")
b.add_edge("review", "approve")
b.add_edge("approve", "publish")
b.set_entry("start")
b.set_terminal("notify")
b.set_terminal("reject")
return b.build()
async def run(self, document: dict) -> Result[str, str]:
return await self.build().execute(document)

The engine supports checkpointing:

engine = WorkflowEngine(
workflow=my_workflow,
checkpoint_enabled=True,
max_iterations=50,
node_timeout=120.0,
)
result = await engine.execute({"doc_id": "DOC-001"})
result = await engine.resume(last_checkpoint, {"approved": True})

StateMachineProtocol provides finite-state-machine orchestration within workflows:

from lexigram.contracts.workflow.protocols import StateMachineProtocol
class OrderStateMachine:
def __init__(self, machine: StateMachineProtocol) -> None:
self._machine = machine
async def process_event(self, event: str) -> str:
if not self._machine.can_transition(event):
raise ValueError(f"Cannot transition from {self._machine.current_state}")
return await self._machine.transition(event)
@property
def state(self) -> str:
return self._machine.current_state

Process items in batches with retries and progress tracking:

from lexigram.workflow import BulkOperationConfig, BulkOperation
config = BulkOperationConfig(
batch_size=10, max_concurrency=5,
timeout=300.0, retry_attempts=3,
)
operation = BulkOperation(config=config)
result = await operation.execute(items)

from lexigram import Application
from lexigram.workflow import WorkflowModule, BulkOperationConfig
app = Application(name="my-app")
app.add_module(WorkflowModule.configure(
config=BulkOperationConfig(batch_size=50, max_concurrency=10),
))
application.yaml
workflow:
batch_size: 10
max_concurrency: 5
timeout: 300.0
retry_attempts: 3
pipeline_timeout: 600.0
graph:
enabled: true
max_iterations: 25
node_timeout: 120.0
checkpoint_enabled: true
parallel_branches: true

WorkflowModule.stub() uses in-memory stores with no external dependencies:

from lexigram import Application
from lexigram.workflow import WorkflowModule, Pipeline, FunctionStep
async def test_pipeline_executes_steps() -> None:
async with Application.boot(modules=[WorkflowModule.stub()]) as app:
pipeline = Pipeline()
pipeline.add_step(FunctionStep(name="step1", func=lambda ctx: "done"))
result = await pipeline.execute({})
assert result.is_ok()