Workflow (lexigram-workflow)
Workflow orchestration for the Lexigram Framework (pipelines, bulk ops, sagas, graph engine)
Overview
Section titled “Overview”lexigram-workflow provides workflow orchestration, state machines, and saga pattern for modeling complex, long-running business processes. It supports durable persistence of transition history, optimistic locking, multi-level approval chains, distributed transaction coordination with automatic rollback, and a graph engine for traversing directed graphs. All services are wired via WorkflowProvider, which registers workflow protocols with the DI container.
Install
Section titled “Install”uv add lexigram-workflowQuick Start
Section titled “Quick Start”from lexigram import Applicationfrom lexigram.di.module import Module, module
# Import the module from the packagefrom lexigram.workflow import WorkflowModule
@module(imports=[WorkflowModule.configure()])class AppModule(Module): pass
app = Application(modules=[AppModule])if __name__ == "__main__": app.run()Configuration
Section titled “Configuration”Zero-config usage: Call
WorkflowModule.configure()with no arguments to use defaults.
Option 1 — YAML file
Section titled “Option 1 — YAML file”workflow: batch_size: 10 max_concurrency: 5 timeout: 300.0 retry_attempts: 3 enable_progress_tracking: true pipeline_timeout: 300.0Option 2 — Profiles + Environment Variables (recommended)
Section titled “Option 2 — Profiles + Environment Variables (recommended)”export LEX_WORKFLOW__ENABLED=true# Environment variables for each fieldOption 3 — Python
Section titled “Option 3 — Python”from lexigram.workflow.config import BulkOperationConfigfrom lexigram.workflow import WorkflowModule
config = BulkOperationConfig(batch_size=10, max_concurrency=5, retry_attempts=3)WorkflowModule.configure(config=config)Config reference
Section titled “Config reference”| Field | Default | Env var | Description |
|---|---|---|---|
batch_size | 10 | LEX_WORKFLOW__BATCH_SIZE | Items processed per batch during bulk operations |
max_concurrency | 5 | LEX_WORKFLOW__MAX_CONCURRENCY | Maximum parallel operations in a bulk run |
timeout | 300.0 | LEX_WORKFLOW__TIMEOUT | Operation timeout in seconds |
retry_attempts | 3 | LEX_WORKFLOW__RETRY_ATTEMPTS | Automatic retry count on step failure |
retry_delay | 1.0 | LEX_WORKFLOW__RETRY_DELAY | Seconds to wait between retry attempts |
enable_progress_tracking | true | LEX_WORKFLOW__ENABLE_PROGRESS_TRACKING | Track and report bulk operation progress |
pipeline_timeout | 300.0 | LEX_WORKFLOW__PIPELINE_TIMEOUT | Default pipeline execution timeout in seconds |
Module Factory Methods
Section titled “Module Factory Methods”| Method | Description |
|---|---|
WorkflowModule.configure(config, saga_store) | Configure with explicit BulkOperationConfig |
WorkflowModule.stub() | Minimal config for testing |
Key Features
Section titled “Key Features”- State machine — Declarative states and transitions with entry/exit hooks
- Durable persistence — Transition history persisted to DB via
StatePersistenceProtocol - Optimistic locking — Prevents concurrent transition conflicts
- State recovery — Rebuild machine state from persisted history on restart
- Approval chains — Multi-level approval flows with role and threshold rules
- Sagas — Distributed transaction coordination with automatic rollback
- Pipelines — Step-based sequential pipelines with error handling
- Bulk operations — Apply an operation to many entities in a supervised batch
- Guard conditions — Transition guards as
async def can_confirm(self) -> bool - Event hooks —
on_enter_*,on_exit_*,on_transitionlifecycle callbacks
Testing
Section titled “Testing”async with Application.boot(modules=[WorkflowModule.stub()]) as app: # your test code ...Key Source Files
Section titled “Key Source Files”| File | What it contains |
|---|---|
src/lexigram/workflow/module.py | WorkflowModule class with factory methods |
src/lexigram/workflow/di/provider.py | WorkflowProvider — wires workflow protocols into DI container |
src/lexigram/workflow/config.py | BulkOperationConfig and GraphConfig |
src/lexigram/workflow/state/ | State machine implementation with transitions and hooks |
src/lexigram/workflow/saga/ | Saga pattern implementation with compensating transactions |
src/lexigram/workflow/pipeline/ | Pipeline executor for chaining steps |
src/lexigram/workflow/approval/ | Approval chain and levels |