Skip to content
GitHubDiscord

Workflow (lexigram-workflow)

Workflow orchestration for the Lexigram Framework (pipelines, bulk ops, sagas, graph engine)


lexigram-workflow provides workflow orchestration, state machines, and saga pattern for modeling complex, long-running business processes. It supports durable persistence of transition history, optimistic locking, multi-level approval chains, distributed transaction coordination with automatic rollback, and a graph engine for traversing directed graphs. All services are wired via WorkflowProvider, which registers workflow protocols with the DI container.


Terminal window
uv add lexigram-workflow
from lexigram import Application
from lexigram.di.module import Module, module
# Import the module from the package
from lexigram.workflow import WorkflowModule
@module(imports=[WorkflowModule.configure()])
class AppModule(Module):
pass
app = Application(modules=[AppModule])
if __name__ == "__main__":
app.run()

Zero-config usage: Call WorkflowModule.configure() with no arguments to use defaults.

application.yaml
workflow:
batch_size: 10
max_concurrency: 5
timeout: 300.0
retry_attempts: 3
enable_progress_tracking: true
pipeline_timeout: 300.0
Section titled “Option 2 — Profiles + Environment Variables (recommended)”
Terminal window
export LEX_WORKFLOW__ENABLED=true
# Environment variables for each field
from lexigram.workflow.config import BulkOperationConfig
from lexigram.workflow import WorkflowModule
config = BulkOperationConfig(batch_size=10, max_concurrency=5, retry_attempts=3)
WorkflowModule.configure(config=config)
FieldDefaultEnv varDescription
batch_size10LEX_WORKFLOW__BATCH_SIZEItems processed per batch during bulk operations
max_concurrency5LEX_WORKFLOW__MAX_CONCURRENCYMaximum parallel operations in a bulk run
timeout300.0LEX_WORKFLOW__TIMEOUTOperation timeout in seconds
retry_attempts3LEX_WORKFLOW__RETRY_ATTEMPTSAutomatic retry count on step failure
retry_delay1.0LEX_WORKFLOW__RETRY_DELAYSeconds to wait between retry attempts
enable_progress_trackingtrueLEX_WORKFLOW__ENABLE_PROGRESS_TRACKINGTrack and report bulk operation progress
pipeline_timeout300.0LEX_WORKFLOW__PIPELINE_TIMEOUTDefault pipeline execution timeout in seconds
MethodDescription
WorkflowModule.configure(config, saga_store)Configure with explicit BulkOperationConfig
WorkflowModule.stub()Minimal config for testing
  • State machine — Declarative states and transitions with entry/exit hooks
  • Durable persistence — Transition history persisted to DB via StatePersistenceProtocol
  • Optimistic locking — Prevents concurrent transition conflicts
  • State recovery — Rebuild machine state from persisted history on restart
  • Approval chains — Multi-level approval flows with role and threshold rules
  • Sagas — Distributed transaction coordination with automatic rollback
  • Pipelines — Step-based sequential pipelines with error handling
  • Bulk operations — Apply an operation to many entities in a supervised batch
  • Guard conditions — Transition guards as async def can_confirm(self) -> bool
  • Event hookson_enter_*, on_exit_*, on_transition lifecycle callbacks
async with Application.boot(modules=[WorkflowModule.stub()]) as app:
# your test code
...
FileWhat it contains
src/lexigram/workflow/module.pyWorkflowModule class with factory methods
src/lexigram/workflow/di/provider.pyWorkflowProvider — wires workflow protocols into DI container
src/lexigram/workflow/config.pyBulkOperationConfig and GraphConfig
src/lexigram/workflow/state/State machine implementation with transitions and hooks
src/lexigram/workflow/saga/Saga pattern implementation with compensating transactions
src/lexigram/workflow/pipeline/Pipeline executor for chaining steps
src/lexigram/workflow/approval/Approval chain and levels