Skip to content
GitHubDiscord

API Reference

MongoDB aggregation accumulator operators.

Add a field with a default value to all documents missing it.

Example

AddField(collection="users", field="is_active", default_value=True)
def __init__(
    collection: str,
    field: str,
    default_value: Any
) -> None

Initialize add field operation.

Parameters
ParameterTypeDescription
`collection`strTarget collection name.
`field`strField name to add.
`default_value`AnyDefault value to set for the new field.
async def execute(store: DocumentStoreProtocol) -> None

Add the field to documents that don’t have it.


MongoDB aggregation stage operators.

Build MongoDB aggregation pipelines with a fluent API.

Usage

pipeline = (
AggregationPipeline()
.match({"status": "active"})
.group(
"$department",
count={"$sum": 1},
avg_salary={"$avg": "$salary"},
)
.sort("count", descending=True)
.limit(10)
.project(department="$_id", count=1, avg_salary=1)
.build()
)
async for doc in collection.aggregate(pipeline):
...
def __init__() -> None
def match(filter: dict[str, Any]) -> AggregationPipeline

Filter documents ($match stage).

def group(
    by: str | dict[str, Any] | None,
    **accumulators: Any
) -> AggregationPipeline

Group documents ($group stage).

Parameters
ParameterTypeDescription
`by`str | dict[str, Any] | NoneGroup key expression (e.g. ``"$department"``). **accumulators: Accumulator expressions (e.g. ``count={"$sum": 1}``).
def project(**fields: Any) -> AggregationPipeline

Reshape documents ($project stage).

def sort(
    field: str,
    *,
    descending: bool = False
) -> AggregationPipeline

Sort documents ($sort stage).

def limit(count: int) -> AggregationPipeline

Limit results ($limit stage).

def skip(count: int) -> AggregationPipeline

Skip results ($skip stage).

def unwind(
    path: str,
    *,
    preserve_null: bool = False
) -> AggregationPipeline

Deconstruct array field ($unwind stage).

def lookup(
    from_collection: str,
    local_field: str,
    foreign_field: str,
    as_field: str
) -> AggregationPipeline

Join with another collection ($lookup stage).

def add_fields(**fields: Any) -> AggregationPipeline

Add computed fields ($addFields stage).

def facet(**facets: list[dict[str, Any]]) -> AggregationPipeline

Multi-facet aggregation ($facet stage).

def count(field_name: str = 'count') -> AggregationPipeline

Count documents ($count stage).

def build() -> list[dict[str, Any]]

Compile the pipeline to a list of stage dicts.


MongoDB comparison operators.

Create an index on a collection.

Example

CreateIndex(
collection="users",
keys=[("email", 1)],
unique=True,
name="idx_users_email",
)
def __init__(
    collection: str,
    keys: list[tuple[str, int]],
    *,
    unique: bool = False,
    name: str | None = None
) -> None

Initialize create index operation.

Parameters
ParameterTypeDescription
`collection`strTarget collection name.
`keys`list[tuple[str, int]]Index key specification (field, direction) pairs.
`unique`boolWhether the index enforces uniqueness.
`name`str | NoneOptional custom index name.
async def execute(store: DocumentStoreProtocol) -> None

Create the index on the specified collection.


Fluent builder for document store queries.

Provides a chainable API that compiles to MongoDB-compatible filter expressions while remaining driver-agnostic at the builder interface level.

Usage

query = (
DocumentQueryBuilder()
.where("status", "active")
.where_gt("age", 18)
.where_in("role", ["admin", "moderator"])
.sort_by("created_at", descending=True)
.skip(20)
.limit(10)
.select("name", "email", "role")
.build()
)
async for doc in collection.find(
query.filter,
projection=query.projection,
sort=query.sort,
skip=query.skip,
limit=query.limit,
):
...
def __init__() -> None
def where(
    field: str,
    value: Any
) -> DocumentQueryBuilder

Exact match filter.

def where_ne(
    field: str,
    value: Any
) -> DocumentQueryBuilder

Not-equal filter.

def where_gt(
    field: str,
    value: Any
) -> DocumentQueryBuilder

Greater-than filter.

def where_gte(
    field: str,
    value: Any
) -> DocumentQueryBuilder

Greater-than-or-equal filter.

def where_lt(
    field: str,
    value: Any
) -> DocumentQueryBuilder

Less-than filter.

def where_lte(
    field: str,
    value: Any
) -> DocumentQueryBuilder

Less-than-or-equal filter.

def where_between(
    field: str,
    low: Any,
    high: Any
) -> DocumentQueryBuilder

Range filter (inclusive on both ends).

def where_in(
    field: str,
    values: list[Any]
) -> DocumentQueryBuilder

In-set filter.

def where_not_in(
    field: str,
    values: list[Any]
) -> DocumentQueryBuilder

Not-in-set filter.

def where_exists(
    field: str,
    exists: bool = True
) -> DocumentQueryBuilder

Field existence filter.

def where_type(
    field: str,
    bson_type: str
) -> DocumentQueryBuilder

BSON type filter.

def where_regex(
    field: str,
    pattern: str,
    options: str = ''
) -> DocumentQueryBuilder

Regular expression filter.

def where_text(search: str) -> DocumentQueryBuilder

Full-text search filter.

def and_where(*conditions: dict[str, Any]) -> DocumentQueryBuilder

Logical AND of multiple conditions.

def or_where(*conditions: dict[str, Any]) -> DocumentQueryBuilder

Logical OR of multiple conditions.

def select(*fields: str) -> DocumentQueryBuilder

Include only the specified fields.

def exclude(*fields: str) -> DocumentQueryBuilder

Exclude the specified fields.

def sort_by(
    field: str,
    *,
    descending: bool = False
) -> DocumentQueryBuilder

Add a sort key.

def skip(count: int) -> DocumentQueryBuilder

Skip the first count results.

def limit(count: int) -> DocumentQueryBuilder

Limit results to count.

def build() -> DocumentQuery

Compile the builder state into a DocumentQuery.


Drop an entire collection.

Example

DropCollection(collection="legacy_events")
def __init__(collection: str) -> None

Initialize drop collection operation.

Parameters
ParameterTypeDescription
`collection`strCollection name to drop.
async def execute(store: DocumentStoreProtocol) -> None

Drop the collection from the store.


Drop an index from a collection.

Example

DropIndex(collection="users", name="idx_users_email")
def __init__(
    collection: str,
    name: str
) -> None

Initialize drop index operation.

Parameters
ParameterTypeDescription
`collection`strTarget collection name.
`name`strIndex name to drop.
async def execute(store: DocumentStoreProtocol) -> None

Drop the index from the specified collection.


AWS DynamoDB configuration.

MongoDB logical operators.

Migration was applied successfully.

Migration failed.

Manage schema migrations for a document store.

Tracks applied migrations in a _migrations collection and executes pending ones in version order.

Example

manager = MigrationManager(store)
manager.add("001", "Create users index", CreateIndex(
collection="users",
keys=[("email", 1)],
unique=True,
))
manager.add("002", "Create events compound index", CreateIndex(
collection="events",
keys=[("stream_id", 1), ("stream_version", 1)],
unique=True,
))
await manager.migrate()
def __init__(
    store: DocumentStoreProtocol,
    *,
    migrations_collection: str = MIGRATIONS_COLLECTION
) -> None

Initialize migration manager.

Parameters
ParameterTypeDescription
`store`DocumentStoreProtocolThe document store to manage.
`migrations_collection`strName of the collection tracking applied migrations.
def add(
    version: str,
    description: str,
    operation: MigrationOperation
) -> MigrationManager

Register a migration to be applied.

Parameters
ParameterTypeDescription
`version`strUnique version identifier (e.g. ``"001"``).
`description`strHuman-readable description of the migration.
`operation`MigrationOperationThe migration operation to execute.
Returns
TypeDescription
MigrationManagerSelf for chaining.
async def get_applied_versions() -> set[str]

Return the set of already-applied migration versions.

async def migrate() -> list[str]

Apply all pending migrations that haven’t been applied yet.

Returns
TypeDescription
list[str]List of version strings that were applied.
async def status() -> list[dict[str, Any]]

Return migration status for all registered migrations.

Returns
TypeDescription
list[dict[str, Any]]List of dicts with ``version``, ``description``, and ``applied`` status.

MongoDB-specific configuration.

Configuration for a single named NoSQL backend.

Used in NoSQLConfig.backends to declare multiple document stores that the framework registers as named DI bindings.

Example

backends:

  • name: primary driver: mongodb mongodb: uri: mongodb://localhost:27017 database: app
  • name: analytics driver: mongodb mongodb: uri: mongodb://analytics-host:27017 database: analytics
Parameters
ParameterTypeDescription
`name`Unique backend identifier. Used as the Named() DI key.
`primary`Whether this is the primary backend. Primary backends also receive the unnamed DocumentStoreProtocol binding.
`driver`NoSQL driver. One of 'mongodb' or 'firestore'.
`mongodb`MongoDB-specific connection config.
`firestore`Firestore-specific connection config.

Top-level NoSQL configuration.

Loaded from the nosql: key in application.yaml, with environment variable overrides via LEX_NOSQL__* prefix.

def validate_for_environment(env: Environment | None = None) -> list[ConfigIssue]

Check config is safe for the target environment.

def from_named(
    cls,
    entry: NamedNoSQLConfig,
    base: NoSQLConfig | None = None
) -> NoSQLConfig

Build a single-backend NoSQLConfig from a NamedNoSQLConfig entry.

Used internally by NoSQLProvider to create per-backend configs from a multi-backend declaration.

Parameters
ParameterTypeDescription
`entry`NamedNoSQLConfigThe named backend entry to materialise.
`base`NoSQLConfig | NoneOptional base config to inherit top-level settings from.
Returns
TypeDescription
NoSQLConfigA NoSQLConfig configured for the single named backend.

NoSQL database connected.

Payload fired when a NoSQL backend connection is established.

Attributes: backend: Identifier of the backend that connected (e.g. "mongodb").


NoSQL database disconnected.

Payload fired when a NoSQL backend connection is closed.

Attributes: backend: Identifier of the backend that disconnected.


Document store integration (MongoDB, and future drivers).

Registers DocumentStoreProtocol for constructor injection.

Call configure to configure and register the document-store backend, or stub for an isolated setup with no external service dependencies.

Usage

from lexigram.nosql.config import NoSQLConfig
@module(
imports=[NoSQLModule.configure(NoSQLConfig(driver="mongodb"))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: NoSQLConfig | Any | None = None,
    enable_ttl: bool = True
) -> DynamicModule

Create a NoSQLModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`NoSQLConfig | Any | NoneNoSQLConfig or ``None`` to use defaults (reads from environment variables).
`enable_ttl`boolEnable TTL (time-to-live) index support for documents. Defaults to ``True``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def scope(
    cls,
    *repositories: type
) -> DynamicModule

Scope document repository classes into a feature module.

Registers the given repository classes as providers and exports them for constructor injection within the feature. The parent module graph must already include NoSQLModule.configure — this does not create a new document store connection.

Uses the anonymous token pattern so both configure() and scope() can coexist in the same compiled graph without a ModuleDuplicateError.

Example

@module(
imports=[
NoSQLModule.configure(config),
NoSQLModule.scope(ProductRepository, CategoryRepository),
]
)
class CatalogFeatureModule(Module):
pass
Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule scoped to this feature.
def stub(
    cls,
    config: NoSQLConfig | None = None
) -> DynamicModule

Create a NoSQLModule suitable for unit and integration testing.

Uses in-memory or minimal backends with no external service dependencies.

Parameters
ParameterTypeDescription
`config`NoSQLConfig | NoneOptional NoSQLConfig override. Uses safe defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Register NoSQL services into the DI container.

Reads NoSQLConfig, creates the appropriate driver, and registers it as DocumentStoreProtocol.

Supports single-backend and multi-backend (NoSQLConfig.backends) modes. In multi-backend mode each entry is registered under its name via container.singleton(name=entry.name). The primary backend (primary=True or the first entry) also receives the unnamed bindings for backward compatibility.

def __init__(config: NoSQLConfig | None = None) -> None
def from_config(
    cls,
    config: NoSQLConfig,
    **context
) -> NoSQLProvider

Factory method for DI container setup.

async def register(container: ContainerRegistrarProtocol) -> None

Register the NoSQL services.

async def boot(container: ContainerResolverProtocol) -> None

Boot phase — connect to all document stores.

In multi-backend mode all stores are connected in parallel via asyncio.gather. In single-backend mode the existing sequential boot is preserved.

async def shutdown() -> None

Shutdown phase — disconnect from all document stores in reverse order.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check provider health across all registered backends.

In multi-backend mode the overall status is the worst individual status.

Parameters
ParameterTypeDescription
`timeout`floatMaximum seconds to wait for health check response.
Returns
TypeDescription
HealthCheckResultHealthCheckResult with status and component details.

Rename a field across all documents in a collection.

Example

RenameField(collection="users", old_name="username", new_name="name")
def __init__(
    collection: str,
    old_name: str,
    new_name: str
) -> None

Initialize rename field operation.

Parameters
ParameterTypeDescription
`collection`strTarget collection name.
`old_name`strCurrent field name.
`new_name`strNew field name.
async def execute(store: DocumentStoreProtocol) -> None

Rename the field in all documents.


MongoDB update operators.

Requested document does not exist.

Document failed schema validation.

Insert/update violated a unique constraint.

Failed to connect to the document store.

Base exception for all NoSQL operations.

Multi-document transaction failed.