Skip to content
GitHub

Architecture

Internal design of the lexigram-sql package.


flowchart LR
    subgraph Extensions["lexigram-*"]
        Web["lexigram-web · admin · auth · ..."]
    end
    subgraph Contracts["lexigram-contracts"]
        DP[DatabaseProviderProtocol<br/>UnitOfWorkProtocol<br/>ConnectionPoolProtocol<br/>MigrationRunnerProtocol]
    end
    subgraph SQL["lexigram-sql"]
        DS[DatabaseService · GenericRepository<br/>SimpleUnitOfWork · MigrationRunnerAdapter]
    end
    subgraph Drivers["async drivers"]
        PG[(PostgreSQL · asyncpg)]
        SL[(SQLite · aiosqlite)]
        MY[(MySQL · aiomysql)]
    end
    Extensions -->|resolve via DI| DP
    DP -->|implemented by| DS
    DS --> PG & SL & MY

lexigram-sql is an infrastructure-layer extension providing async SQL database access. Other extensions consume DatabaseProviderProtocol from contracts through the container — never lexigram-sql directly.


flowchart BT
    CS[Config<br/>DatabaseConfig · PoolConfig · BackendConfig]
    DP[DatabaseProvider / DatabaseModule]
    DS[DatabaseService]
    DRV[Backend Drivers<br/>Postgres · SQLite · MySQL]
    QRY[Query Layer<br/>AsyncQueryBuilder · PredicateCompiler · Operator]
    REPO[Repository Layer<br/>SQLRepository · GenericRepository · CachedRepository]
    UOW[Unit of Work<br/>AbstractUnitOfWork · TransactionManager · IdentityMap]
    MIG[Migrations<br/>AlembicManager · SimpleMigrationManager · CLI]
    STO[Stores<br/>StateStore · SecretStore · LockStore]

    CS --> DP --> DS
    DS --> DRV & QRY & UOW & MIG
    REPO --> QRY
    UOW --> REPO
    DRV --> STO
ModulePurpose
di/provider.pyDatabaseProvider — registers all database services
config.pyDatabaseConfig, DatabasePoolConfig, NamedDatabaseConfig
providers/database_service.pyDatabaseService — unified facade over driver backends
providers/*_provider.pyPostgresProvider, SQLiteProvider, MySQLProvider
repositories/base.pySQLRepository — mixin-based repository base
repositories/generic_repository.pyGenericRepository[T, TKey] — typed CRUD
repositories/cached.pyCachedRepository — cache-aside decorator
query/builder.pyQueryBuilder / Query — immutable query descriptor
query/compiler.pyPredicateCompiler — filter → parameterized SQL
query/operators.pyOperatorEq, Gt, Lt, In, Like, Between
unit_of_work/base.pyAbstractUnitOfWork — change tracking, events, outbox
unit_of_work/manager.pySimpleTransactionManagerContextVar-scoped txn
migrations/api.pyAlembicManager — Alembic async wrapper
migrations/runner.pyMigrationRunnerAdapter — protocol-compliant runner
migrations/manager.pySimpleMigrationManager — version-tracked applier
cli/commands.pydb CLI: status, migrate run/rollback/history
stores/DatabaseStateStore, DatabaseSecretStore, DatabaseLockStore
exceptions.py31 leaf exceptions under DatabaseError (LEX_ERR_SQL_001031)
hooks.pySQLConnectionReadyHook, SQLTransactionBegunHook, SQLTransactionEndedHook
identifiers.pyTable, Column, Schema — type-safe SQL identifiers

sequenceDiagram
    participant App as Application
    participant DP as DatabaseProvider
    participant C as Container
    participant DS as DatabaseService

    App->>DP: DatabaseProvider(config)
    App->>DP: register(container)
    alt single backend
        DP->>DS: DatabaseService(config)
        DP->>C: singleton(DatabaseProviderProtocol, DS)
        DP->>C: scoped(UnitOfWorkProtocol, DS.get_uow)
    else multi backend
        loop each NamedDatabaseConfig
            DP->>DS: DatabaseService(backend)
            DP->>C: singleton(DatabaseProviderProtocol, name=entry.name)
        end
        DP->>C: singleton(DatabaseProviderProtocol, primary) [unnamed]
    end
    DP->>C: register shared stores & admin widgets
    App->>DP: boot(container)
    DP->>DS: .boot() [parallel multi-backend]
    App->>DP: shutdown()
    DP->>DS: .shutdown()
PhaseWhat happens
register()Registers DatabaseService, protocol bindings, admin widgets, shared stores. Multi-backend: named bindings per backend.
boot()Connects all backends (parallel in multi-backend). Wires optional observability and resilience.
shutdown()Disconnects backends in reverse registration order.
health_check()Aggregate health across all backends.

DatabaseProvider runs at ProviderPriority.INFRASTRUCTURE (10).


SimpleTransactionManager uses a ContextVar for per-task isolation:

async with provider.transaction():
await provider.execute_query("INSERT INTO users ...")
# auto-commit or rollback on exception

AbstractUnitOfWork extends this with change tracking and event collection:

OperationMethod
Track new entityregister_new(entity)
Track modified entityregister_dirty(entity)
Track deleted entityregister_deleted(entity)
Collect domain eventsregister_event(event)

On commit(): collect_events()_flush() (persist changes) → write events to outbox or publish via event bus → reset tracking state. IdentityMap ensures one entity instance per row per unit of work.

flowchart LR
    SVC[Service] -->|async with uow| COMMIT[commit]
    SVC --> NEW[register_new] & DIRTY[register_dirty] & EVT[register_event]
    COMMIT --> FLUSH[_flush]
    FLUSH --> TX[(Transaction)]
    COMMIT -->|with outbox| OUTBOX[(Outbox Table)] --> BUS[Event Bus]
    COMMIT -->|without outbox| BUS

SimpleUnitOfWork provides the SQL-backed _flush() that delegates to DatabaseProviderProtocol.


AbstractRepository[T, TKey]
└── SQLRepository[T, TKey] (mixin-based)
├── _ReadMixin find_by_id · find_many · paginate
├── _WriteMixin create · update · delete · soft_delete
├── _FilterMixin apply filters
├── _AdvancedMixin bulk_insert · upsert
└── _RLSMixin row-level security
└── GenericRepository[T, TKey]
└── CachedRepository
└── AppendLogRepository
class UserRepository(GenericRepository[User, str]):
def __init__(self, provider: DatabaseProviderProtocol):
super().__init__(provider, table_name="users", entity_class=User)
class UserService:
def __init__(self, repo: UserRepository):
self.repo = repo
async def find_active(self) -> list[User]:
return await self.repo.find_many(status="active", offset=0, limit=100)

Query building with AsyncQueryBuilder:

from lexigram.sql.query import AsyncQueryBuilder, Operator
query = (AsyncQueryBuilder().select("id", "name").from_table("users")
.where("status", Operator.EQ, "active").order_by("created_at", "DESC").limit(20))
sql, params = query.build()

PredicateCompiler translates filter expressions (FieldEq, FieldGt, FieldIn, etc.) from contracts into parameterized SQL.


flowchart LR
    subgraph CLI["lexigram db migrate"]
        RUN[run] --> RB[rollback] --> HIST[history]
    end
    subgraph Adapter[Adapter Layer]
        RA[MigrationRunnerAdapter<br/>MigrationRunnerProtocol]
    end
    subgraph Core[Core]
        SMM[SimpleMigrationManager<br/>apply_pending · get_applied · initialize_table]
        AB[AlembicManager<br/>upgrade · downgrade · create_revision]
    end
    subgraph DB[(Database)]
        MT[ schema_migrations ]
    end
    CLI --> RA --> SMM --> DB
    AB --> DB
Terminal window
lexigram db migrate run # Apply pending migrations
lexigram db migrate rollback --steps 1 # Roll back one migration
lexigram db migrate history # Show migration history
lexigram db status # Database and migration status

BackendProviderDriverKey Features
PostgreSQLPostgresProviderasyncpgSSL, FTS, JSONB, LISTEN/NOTIFY
SQLiteSQLiteProvideraiosqliteWAL, in-memory, foreign keys
MySQLMySQLProvideraiomysqlTLS, charset config, FTS

Backend selection by URL prefix (postgresql:// → PostgresProvider, etc.). Multi-backend via DatabaseConfig.backends resolved via Annotated[DatabaseProviderProtocol, "analytics"].


ProtocolPurposeImplementation
DatabaseProviderProtocolPrimary DB interfaceDatabaseService
ConnectionPoolProtocolPool managementSimpleConnectionPool, backend pools
QueryLoggerProtocolSQL query loggingConsoleQueryLogger, FileQueryLogger
UnitOfWorkProtocolTransaction + change trackingAbstractUnitOfWork, SimpleUnitOfWork
MigrationManagerProtocolMigration managementSimpleMigrationManager
MigrationRunnerProtocolMigration executionMigrationRunnerAdapter
StateStoreProtocolState persistenceDatabaseStateStore
AsyncSecretStoreProtocolSecret storageDatabaseSecretStore
LockStoreProtocolDistributed locksDatabaseLockStore
IdGeneratorProtocolEntity ID generationResolved optionally
TracerProtocolDistributed tracingResolved optionally
MetricsCollectorProtocolRender/query metricsResolved optionally
ResiliencePipelineFactoryProtocolCircuit breaker, retryResolved optionally
HookRegistryProtocolDB lifecycle hooksResolved optionally

Database errors extend DatabaseError from contracts. All 31 leaf types carry _code strings (LEX_ERR_SQL_001031). Sub-hierarchy: DatabaseConnectionError (refused/timeout/pool), QueryError (syntax/binding), IntegrityError (duplicate key/fk/null/check), TransactionError (serialization/deadlock/rollback), SchemaError (table/column not found), RepositoryError, UnitOfWorkError, DriverError, LockError, DatabaseTimeoutError.


PointMechanism
Custom backendImplement provider interface, register via DatabaseService
Custom migrationSubclass SimpleMigrationManager, override apply_pending_migrations()
Custom query loggerImplement QueryLoggerProtocol, register in container
Custom repositorySubclass SQLRepository or GenericRepository
Custom filter operatorAdd to Operator enum + PredicateCompiler dispatch
Custom dialectImplement SQLDialect from contracts, register with PredicateCompiler
Row-level securityConfigure RowLevelSecurityPolicy + ScopeColumn, pass to repository
Admin widgetRegister widget handler in SqlAdminContributor
DB lifecycle hooksSubscribe to SQLConnectionReadyHook etc. via HookRegistryProtocol
ResilienceDatabaseResilienceHandler auto-wires circuit breaker/retry if registered
class MyRepository(GenericRepository[MyEntity, str]):
def __init__(self, provider: DatabaseProviderProtocol):
super().__init__(provider, table_name="my_entities", entity_class=MyEntity)
async def find_by_custom(self, value: str) -> list[MyEntity]:
result = await self.provider.execute_query(
"SELECT * FROM my_entities WHERE custom_field = $1", [value]
)
return [self._row_to_entity(row) for row in result.rows]

@module()
class DatabaseModule(Module):
@classmethod
def configure(cls, config=None, migration_dir="migrations") -> DynamicModule:
return DynamicModule(
module=cls, providers=[DatabaseProvider(config=config, migration_dir=migration_dir)],
exports=[DatabaseProviderProtocol, UnitOfWorkProtocol],
)
@classmethod
def scope(cls, *repositories: type) -> DynamicModule:
"""Register repo classes for injection without new DB connections."""
...
@classmethod
def stub(cls, config=None) -> DynamicModule:
"""In-memory SQLite backend for testing."""
...

Usage: @module(imports=[DatabaseModule.configure("postgresql://...")]) or combined with scope() for feature modules.


constants.py defines:

SymbolValue
ENV_PREFIXLEX_SQL__
DEFAULT_POOL_MIN_SIZE1
DEFAULT_POOL_MAX_SIZE10
DEFAULT_MIGRATIONS_DIRmigrations
DEFAULT_MIGRATIONS_TABLEschema_migrations
BACKEND_SQLITEsqlite
BACKEND_POSTGRESpostgres
BACKEND_MYSQLmysql
DEFAULT_PAGE_SIZE20
MAX_PAGE_SIZE1000