Skip to content
GitHub

Architecture

Internal design of the lexigram-workflow package.


flowchart LR
    EV[Events<br/>lexigram-events] -->|trigger| WF[Workflow<br/>lexigram-workflow]
    TK[Tasks<br/>lexigram-tasks] -->|embed| WF
    WF -->|events| EV
    UI[Web UI<br/>lexigram-ui] -->|start| WF
    WF --> LG[lexigram<br/>DI · Config · Primitives]

lexigram-workflow provides the orchestration layer for multi-step business processes: pipeline execution (sequential/parallel step chains), directed-graph workflow engines (node-based DAG traversal with conditional routing), saga orchestration (compensating transactions), state machines, and bulk batch processing. Controllers and event handlers invoke workflows, which orchestrate domain services.


flowchart BT
    subgraph Core[Core Primitives]
        DEF[WorkflowDefinition · versioned]
        INST[WorkflowInstance · runtime]
        PIPE[TransformPipe · fluent]
    end
    subgraph Graph[Graph Engine]
        GB[WorkflowBuilder]
        GE[WorkflowEngine · DAG executor]
        GN[AbstractWorkflowNode · 6 types]
        GE_[WorkflowEdge · conditional]
        GS[WorkflowState · shared]
    end
    subgraph Pipeline[Pipeline]
        PL[Pipeline · step orchestrator]
        PS[FunctionStep · ConditionalStep · ParallelStep]
        PC[PipelineContext]
    end
    subgraph Saga[Saga]
        AS[AbstractSaga]
        SS[SagaStep · action+compensation]
    end
    subgraph Bulk[Bulk Operations]
        BO[BulkOperation · batch+concurrency+retry]
        BM[BulkOperationMetrics]
    end
    subgraph Exec[Execution]
        WR[WorkflowRunner · retry+resume]
        WC[WorkflowCheckpoint]
    end
    subgraph State[State Machine]
        SM[StateMachine · FSM]
        SP[DatabaseStatePersistence]
    end

    Core --> Graph & Pipeline
    Graph & Pipeline --> Exec
    Exec --> Saga & Bulk
    Graph --> State

Two orchestration models coexist:

  1. Pipeline — linear/conditional/parallel steps with explicit dependency ordering. Best for deterministic business processes (validation → enrichment → persistence).
  2. Graph — directed-graph workflow with conditional edge routing and state merging. Best for AI agent workflows, human-in-the-loop, and non-deterministic branching.

Both share WorkflowRunner and checkpointing, and can be composed together.


sequenceDiagram
    actor User
    participant Inst as WorkflowInstance
    participant Eng as WorkflowEngine
    participant Node as Node
    participant State as WorkflowState

    User->>Inst: WorkflowInstance.create(definition)
    Inst->>Inst: Stamp definition_version
    User->>Eng: execute(input)
    Eng->>State: WorkflowState(input)
    loop each step
        Eng->>Eng: Resolve next nodes via edges
        Eng->>Node: execute(state)
        Node-->>Eng: output dict
        Eng->>State: merge(output)
        alt human_in_loop
            Eng-->>User: HumanInputRequiredError
            User->>Eng: resume(response)
        end
        Eng->>Eng: Check terminal condition
    end
    Eng-->>User: Ok(GraphResult) or Err

Lifecycle: Definition (name + version) → Instance (stamps version) → Build (nodes, edges, entry/terminal) → Execute (graph traversal) → Pause/Resume (HumanNode raises HumanInputRequiredError) → Complete (terminal node reached). On resume, WorkflowInstance.validate_definition_version() detects incompatible definition changes.


Pipeline orchestrates steps with dependency ordering:

StepPurposeExecution
FunctionStepUser-provided callableDependency-ordered sequential
ConditionalStepBranch on predicate, sub-stepSequential within branch
ParallelStepMultiple sub-steps concurrentlyasyncio.gather, fail-fast
  1. Topological sort resolves execution order from declared dependencies
  2. Each step checks should_skip(), executes with optional timeout/resilience
  3. On failure: on_error() may recover; fail_fast aborts on first failure
  4. Results stored in PipelineContext; checkpointed to optional CacheBackendProtocol
  5. cleanup() runs after each step
pipeline = Pipeline("onboarding", steps=[
FunctionStep("validate", validate_input),
ConditionalStep("route", is_premium,
FunctionStep("premium", handle_premium),
FunctionStep("standard", handle_standard),
),
ParallelStep("notify", [
FunctionStep("email", send_email),
FunctionStep("sms", send_sms),
]),
])
result = await pipeline.execute(context)

A lightweight, immutable, generically-typed pipe for inline data transformation (separate from Pipeline):

result = await (TransformPipe[Request]()
.pipe(validate).pipe_if(lambda r: r.is_authenticated, enrich)
.tap(logger.info).catch(error_handler).execute(request))

NodeNodeTypePurposeInputOutput
AgentNodeAGENTExecute a Lexigram agentstate[input_key]state[output_key]
GateNodeGATENo-op routing nodeNone{}
HumanNodeHUMANPause for human inputstate[input_key]state[output_key]
LLMNodeLLMLLM call with prompt templateRendered promptstate[output_key]
ToolNodeTOOLExecute a ToolProtocolstate[input_key]state[output_key]
SubworkflowNodeSUBWORKFLOWEmbed nested WorkflowEnginestate[input_key]state[output_key]
CustomCUSTOMSubclass AbstractWorkflowNodeSubclass-definedSubclass-defined

Edges support conditional routing via WorkflowEdge(is_active=lambda state: ...). Multiple active edges from the same source execute in parallel when GraphConfig.parallel_branches is enabled.


flowchart LR
    subgraph Execution
        direction LR
        S1[Step 1] --> S2[Step 2] --> S3[Step 3]
    end
    subgraph Compensation
        direction LR
        C3[Compensate 3] --> C2[Compensate 2] --> C1[Compensate 1]
    end
    S3 -.->|fail| C3

Orchestration saga pattern: steps execute in registration order. Each SagaStep has an action and optional compensation. On failure, compensation runs in reverse order for all completed steps.

PENDING → RUNNING → COMPLETED (all steps succeed)
PENDING → RUNNING → COMPENSATING → FAILED (step failure)
SagaStep PropertyDescription
actionAsync callable → Result[Any, Error]
compensationOptional async callable to undo action
max_retriesPer-step retries before failing (default: 1)
retry_delayDelay between retries (default: 1s)
idempotentWhether compensation is safe to retry (default: True)

State persists via SagaStoreProtocol. AbstractSaga includes VERSION checking on state load for schema evolution.


BulkOperation[T, R] is a generic batch processor:

FeatureImplementation
BatchingItems chunked by configurable batch_size
Concurrencyasyncio.Semaphore limits parallel batches
RetryExponential backoff with configurable attempts
Circuit breakerOptional CircuitBreakerProtocol wrapper
Checkpoint/resumeCache-backed persistence of completed batch IDs
ProgressCallbacks + BulkOperationMetrics
Cancellationcancel()BulkOperationCancelledError
Conveniencebulk_map(), bulk_filter(), bulk_reduce()
operation = BulkOperation[int, str](
config=BulkOperationConfig(batch_size=100, max_concurrency=5),
processor=process_batch,
)
async for r in operation.execute(items, operation_id="op-1"):
...

sequenceDiagram
    participant App as Application
    participant P as WorkflowProvider
    participant C as Container

    App->>P: WorkflowProvider(config, saga_store, ...)
    App->>P: register(registrar)
    P->>C: singleton(PipelineProtocol, Pipeline)
    P->>C: singleton(BulkOperationConfig, config)
    P->>C: singleton(GraphConfig, GraphConfig)
    opt state_machine provided
        P->>C: singleton(StateMachineProtocol, sm)
    end
    opt db_provider provided
        P->>C: singleton(StatePersistenceProtocol, DatabaseStatePersistence)
    end
    opt saga_store provided
        P->>C: singleton(SagaStoreProtocol, store)
    end
    App->>P: boot(resolver)
    P->>C: resolve config, log active settings
    App->>App: Ready

All registrations use singleton scope. Priority: DOMAIN (after infrastructure, before application). Config key: "workflow", typed as BulkOperationConfig. The provider accepts optional SagaStoreProtocol, StateMachineProtocol, and DatabaseProviderProtocol for state persistence.


ProtocolImplemented BySource
PipelineProtocolPipelinelexigram.contracts.workflow
PipelineStepProtocolFunctionStep, ConditionalStep, ParallelSteplexigram.contracts.workflow
PipelineContextProtocolPipelineContextlexigram.contracts.workflow
SagaProtocolAbstractSagalexigram.contracts.workflow
SagaStoreProtocolUser-providedlexigram.contracts.workflow
StateMachineProtocolStateMachinelexigram.contracts.workflow
StatePersistenceProtocolDatabaseStatePersistencelexigram.contracts.workflow
WorkflowGraphProtocolWorkflowBuilderlexigram.contracts.workflow
WorkflowNodeProtocolAbstractWorkflowNodelexigram.contracts.workflow
ExecutionProtocolWorkflowEnginelexigram.contracts.workflow
BulkProcessorProtocolBulkOperationlexigram.contracts.workflow
ApprovalProtocolUser-providedlexigram.contracts.workflow
CacheBackendProtocolUser-provided (checkpoints)lexigram.contracts.infra.cache
CircuitBreakerProtocolUser-providedlexigram.contracts.infra.resilience
DatabaseProviderProtocolUser-provided (state)lexigram.contracts.data

Shared types (SagaStep, SagaStepError, StateTransitionRecord) and enums (SagaState) live in lexigram.contracts.workflow.


PointMechanismExample
Custom node typeSubclass AbstractWorkflowNode, implement execute()SlackNotifyNode, WebhookNode
Custom pipeline stepSubclass PipelineStepDatabaseTransactionStep
Custom sagaSubclass AbstractSaga, define steps + get_id()/is_completed()OrderSaga, PaymentSaga
Durable saga storeImplement SagaStoreProtocol, pass to WorkflowModule.configure()PostgresSagaStore
State persistencePass DatabaseProviderProtocol to WorkflowProviderDatabaseStatePersistence
Checkpoint backendImplement CacheBackendProtocol, pass to Pipeline/BulkOperationRedisCheckpointStore
Circuit breakerImplement CircuitBreakerProtocol, pass to BulkOperationResilienceCircuitBreaker
Decorators@workflow(), @saga_step()Metadata attachment for discovery
Lifecycle hooksRegister via HookRegistryProtocolMetrics, audit, notifications
EventsSubscribe via EventBusProtocolWorkflowStartedEvent, etc.
CLI commandsAutomatic via lexigram.cli.contributors entry pointWorkflowCliContributor