API Reference
Protocols
Section titled “Protocols”Protocol for time sources — allows swapping real/fake clocks.
Classes
Section titled “Classes”AITestBed
Section titled “AITestBed”
AITestClient
Section titled “AITestClient”Test client that provides ergonomic helpers for exercising AI service boundaries.
Wraps a TestEnvironment (or AITestBed) and provides recording wrappers
for common AI operations — LLM completions, vector searches, and ML predictions —
so tests can make assertions about what the system under test invoked.
| Parameter | Type | Description |
|---|---|---|
| `test_bed` | The test environment that owns AI providers and test data. | |
| `max_tokens_per_run` | Maximum token budget for all LLM operations in this test run. Set to ``0`` to disable enforcement. Defaults to 10 000. |
property test_data() -> AITestData
Return the AITestData instance owned by the test bed.
Total tokens consumed by LLM completions in this run.
Configured maximum tokens per run (0 = unlimited).
Reset the token usage counter to zero.
Simulate an LLM completion, enforce the token budget, and record it.
Token cost is read from the mock response’s "tokens" field if present;
otherwise estimated as ceil(len(prompt.split()) * 1.3).
| Parameter | Type | Description |
|---|---|---|
| `prompt` | str | The prompt text sent to the (mock) LLM. **kwargs: Extra fields included in the recorded entry. |
| Type | Description |
|---|---|
| dict | The recorded completion dict. |
| Exception | Description |
|---|---|
| TokenBudgetExceededError | When the token budget is set and would be exceeded. |
Simulate a vector store search and record it in test_data.
Returns the mock response keyed "vector" if configured, otherwise
an empty results structure.
Simulate an ML model prediction and record it in test_data.
Returns the mock response keyed "ml" if configured, otherwise a default
confidence structure.
Assert that exactly expected LLM completions were recorded.
Assert that exactly expected vector searches were recorded.
Assert that exactly expected ML predictions were recorded.
AITestData
Section titled “AITestData”Container for AI-related test data and mock responses.
Tracks AI operations (LLM completions, vector searches, ML predictions, etc.) performed during a test and provides mock response configuration.
| Parameter | Type | Description |
|---|---|---|
| `prefix` | Prefix used when generating unique identifiers for test data. |
Record an LLM completion operation.
Record a vector store search operation.
Record an ML model prediction operation.
Record an AI pipeline stage operation.
Record a retrieval-augmented generation operation.
Record an AI agent action/operation.
Record a simulated time advance (for time-dependent AI logic).
Register a canned response for a given AI service key.
Retrieve the canned response for key, or None if not configured.
Clear all recorded operations and mock responses.
AdminResponse
Section titled “AdminResponse”Lightweight wrapper around a raw response for admin assertions.
Attributes:
status_code: HTTP status code.
headers: Response headers dict.
text: Response body as text.
json_data: Parsed JSON body (None if not JSON).
Return True if the response contains HX-Redirect header.
Return the HX-Redirect URL if present.
Return the HX-Trigger header value if present.
AdminTestClient
Section titled “AdminTestClient”Test client with helpers for lexigram-admin endpoints.
Wraps a Starlette TestClient (or any httpx-compatible sync client)
and provides assertion helpers and convenience methods for CRUD operations.
| Parameter | Type | Description |
|---|---|---|
| `client` | An httpx-compatible test client (e.g. Starlette ``TestClient``). | |
| `prefix` | Admin URL prefix (default ``"/admin"``). | |
| `default_headers` | Headers added to every request. |
def __init__( client: Any, prefix: str = '/admin', default_headers: dict[str, str] | None = None ) -> None
Build an admin resource URL.
| Parameter | Type | Description |
|---|---|---|
| `resource` | str | Resource name (e.g. ``"user"``). *parts: Additional path segments. |
| Type | Description |
|---|---|
| str | Full URL string, e.g. ``"/admin/user/123/edit"``. |
def list( resource: str, params: dict[str, str] | None = None ) -> AdminResponse
GET the list page for a resource.
def detail( resource: str, record_id: str ) -> AdminResponse
GET the detail page for a record.
def create( resource: str, data: dict[str, Any] ) -> AdminResponse
POST to create a new record.
def update( resource: str, record_id: str, data: dict[str, Any] ) -> AdminResponse
POST to update an existing record.
def delete( resource: str, record_id: str ) -> AdminResponse
POST (or DELETE) to delete a record.
def bulk_action( resource: str, action: str, ids: list[str] ) -> AdminResponse
POST a bulk action.
def search( resource: str, query: str ) -> AdminResponse
GET the list page with a search query.
def assert_status( resp: AdminResponse, expected: int ) -> None
Assert response has expected HTTP status code.
def assert_ok(resp: AdminResponse) -> None
Assert response is 200 OK.
def assert_redirect(resp: AdminResponse) -> None
Assert response is an HTMX or standard redirect.
def assert_unprocessable(resp: AdminResponse) -> None
Assert response is HTTP 422 (validation error).
def assert_contains( resp: AdminResponse, text: str ) -> None
Assert response body contains text.
def assert_htmx_trigger( resp: AdminResponse, event: str ) -> None
Assert HX-Trigger header contains event.
AppTestBed
Section titled “AppTestBed”Test harness that boots the application with optional DI overrides.
Attributes: app: The booted Application. container: The DI container for direct service resolution. client: HTTP test client backed by the real ASGI app.
async def from_factory( cls, factory: str | Any, overrides: dict[type, Any] | None = None ) -> AsyncIterator[AppTestBed]
Create a test bed from an application factory.
| Parameter | Type | Description |
|---|---|---|
| `factory` | str | Any | Either a dotted import string (``"my_app.app:create_app"``) or a callable that returns an Application. |
| `overrides` | dict[type, Any] | None | Optional dict mapping service types to replacement instances. Applied after providers register but before boot. |
| Type | Description |
|---|---|
| AsyncIterator[AppTestBed] | A fully booted AppTestBed. |
Example
async with AppTestBed.from_factory( "my_app.app:create_app", overrides={EmailService: MockEmailService()},) as bed: resp = await bed.client.get("/health") assert resp.status_code == 200async def from_app( cls, app: Any, overrides: dict[type, Any] | None = None ) -> AsyncIterator[AppTestBed]
Create a test bed from an already-constructed Application.
| Parameter | Type | Description |
|---|---|---|
| `app` | Any | An Application instance. |
| `overrides` | dict[type, Any] | None | Optional DI overrides (same as from_factory). |
| Type | Description |
|---|---|
| AsyncIterator[AppTestBed] | A fully booted AppTestBed. |
AsyncTestHelper
Section titled “AsyncTestHelper”Helper utilities for async testing.
AuditLoggerCompliance
Section titled “AuditLoggerCompliance”Compliance suite for AuditLoggerProtocol implementations.
Subclass and implement create_logger() to run all compliance tests.
Create the AuditLoggerProtocol implementation under test.
| Type | Description |
|---|---|
| Any | A fresh instance implementing AuditLoggerProtocol. |
log() completes without raising for a valid entry.
log() creates an entry that is returned by query().
log() stores extra metadata fields.
query() filters by actor_id.
query() filters by action name.
query() always returns a list, even when empty.
AuditStoreCompliance
Section titled “AuditStoreCompliance”Compliance suite for AuditStoreProtocol implementations.
Subclass and implement create_store() to run all compliance tests.
Create the AuditStoreProtocol implementation under test.
| Type | Description |
|---|---|
| Any | A fresh instance implementing AuditStoreProtocol. |
append() completes without raising for a valid entry.
append() and query() round-trip an audit entry.
count() returns correct count after appends.
query() filters by actor_id.
query() returns an empty list for unknown actor.
BlobStoreCompliance
Section titled “BlobStoreCompliance”Reusable test suite for any ``BlobStoreProtocol`` implementation.
Subclass and implement create_store:
.. code-block:: python
class TestMyBlobStore(BlobStoreCompliance): async def create_store(self): return LocalDriver(root="/tmp/test")Return a ready-to-use, empty BlobStoreProtocol under test.
upload then download returns the same bytes.
exists returns True after a file is uploaded.
exists returns False for a path that has never been uploaded.
delete removes the file; exists returns False afterwards.
list with a prefix returns paths of uploaded objects.
get_url returns a non-empty string.
health_check returns a HealthCheckResult.
CacheBackendCompliance
Section titled “CacheBackendCompliance”Reusable test suite for any ``CacheBackendProtocol`` implementation.
Subclass and implement create_backend:
.. code-block:: python
class TestRedisCache(CacheBackendCompliance): async def create_backend(self): return RedisCacheBackend("redis://localhost")Return a fresh, empty instance of the backend under test.
set then get returns the stored value.
get on a missing key returns the default.
get on a missing key returns None when no default is given.
delete returns True and removes the key.
delete on a missing key returns False.
clear removes all stored entries.
set on an existing key overwrites the value.
Values with TTL expire and are no longer returned.
Values without TTL persist and are accessible.
The backend accepts strings, ints, dicts, and lists.
ContainerFactory
Section titled “ContainerFactory”Factory for creating test containers with common provider registrations.
This factory provides a standardized way to create DI containers for testing that automatically register common protocols and mock implementations.
Example
factory = ContainerFactory() container = factory.create_test_container()
Container has mock providers registered for common protocols
Section titled “Container has mock providers registered for common protocols”
Create a test container with common protocol registrations.
| Type | Description |
|---|---|
| Any | Configured container with mock implementations for common protocols. |
ContainerTestFixture
Section titled “ContainerTestFixture”Isolated DI container scoped to a single test.
Wraps LexigramContainerHarness with automatic disposal and convenience helpers for registering overrides.
Intended to be used through the test_container pytest fixture so each test receives a fully isolated container with fresh mock registrations.
Typical usage (via pytest fixture)
async def test_my_service(test_container: ContainerTestFixture) -> None: test_container.mock(UserRepository, FakeUserRepository()) service = await test_container.get(UserService) result = await service.create(email="a@b.com") assert result.is_ok()Direct usage (as async context manager)
async with ContainerTestFixture() as fixture: fixture.mock(CacheBackendProtocol, FakeCache()) svc = await fixture.get(MyService)Resolve interface from the container.
| Parameter | Type | Description |
|---|---|---|
| `interface` | type[T] | The protocol or class to resolve. |
| Type | Description |
|---|---|
| T | The resolved instance. |
| Exception | Description |
|---|---|
| RuntimeError | If the fixture has already been disposed. |
Resolve interface, returning None if not registered.
| Parameter | Type | Description |
|---|---|---|
| `interface` | type[T] | The protocol or class to resolve. |
| Type | Description |
|---|---|
| T | None | The resolved instance, or ``None`` if not found. |
Register mock_obj as the singleton for interface.
This is the primary way to wire test doubles into the container.
| Parameter | Type | Description |
|---|---|---|
| `interface` | type[T] | The protocol or class to override. |
| `mock_obj` | Any | The mock/fake/stub to register. |
Register mock_obj for interface for the duration of the block.
Because each test receives a fresh container, this is equivalent to calling mock for the lifetime of the scope. It exists purely for readability in tests that want to be explicit about scope.
| Parameter | Type | Description |
|---|---|---|
| `interface` | type[T] | The protocol or class to override. |
| `mock_obj` | Any | The mock/fake/stub to register. |
| Type | Description |
|---|---|
| AsyncGenerator[None, None] | Nothing; the override is active for the body of the block. |
Dispose the container and release all resources.
Idempotent — safe to call multiple times.
property container() -> LexigramContainerHarness
Return the underlying LexigramContainerHarness.
Use this for advanced scenarios that require direct container access. Prefer get and mock for normal test usage.
DatabaseProviderCompliance
Section titled “DatabaseProviderCompliance”Reusable test suite for any ``DatabaseProviderProtocol`` implementation.
Subclass and implement create_provider:
.. code-block:: python
class TestMyProvider(DatabaseProviderCompliance): async def create_provider(self): return MyDatabaseProvider(dsn="sqlite+aiosqlite:///:memory:")Return a ready-to-use provider instance under test.
health_check returns a HealthCheckResult.
get_scoped_connection returns a connection within a scoped context.
Nested scoped_context blocks do not raise.
execute can run DDL statements without error.
Data inserted via execute is retrievable via fetch_one.
Data written inside a rolled-back transaction is not persisted.
DatabaseTestBed
Section titled “DatabaseTestBed”
def __init__( name: str = 'db-test-bed', connection_string: str = ':memory:', auto_cleanup: bool = True )
Create a test table with the given name and SQL schema string.
Seed test data into a table.
Clear all rows from a test table.
DatabaseTestClient
Section titled “DatabaseTestClient”
Drop tracked tables and clear table data.
DistributedLockCompliance
Section titled “DistributedLockCompliance”Compliance suite for DistributedLockProtocol implementations.
Subclass and implement create_lock() to run all compliance tests.
Create a DistributedLockProtocol implementation under test.
| Parameter | Type | Description |
|---|---|---|
| `key` | str | Unique resource key for the lock. |
| Type | Description |
|---|---|
| Any | A fresh DistributedLockProtocol instance. |
acquire() returns True when the lock is free.
release() returns True when the lock is currently held.
is_held() returns True after acquire().
is_held() returns False after release().
Second acquire() on the same key returns False while held.
acquire() returns True again after lock is released.
Context manager acquires on entry and releases on exit.
extend() returns True when the lock is currently held.
extend() returns False when the lock is not held.
acquire_blocking() returns True when the lock is free.
EventBusCompliance
Section titled “EventBusCompliance”Reusable test suite for any ``EventBusProtocol`` implementation.
Subclass and implement create_bus and create_event:
.. code-block:: python
class TestMyEventBus(EventBusCompliance): async def create_bus(self): return MyEventBus()
def create_event(self): return MyEvent(id="1")Return a fresh EventBusProtocol instance.
Return a new event instance for testing.
Subscribing a handler causes it to be called on publish.
Unsubscribing a handler prevents further delivery.
All subscribers for an event type are invoked.
A handler subscribed to one type does not receive another type.
Publishing when there are no subscribers does not raise.
FakeCache
Section titled “FakeCache”In-memory fake satisfying ``CacheBackendProtocol`` / ``CacheProtocol``.
Supports TTL expiry using time.monotonic.
Example
cache = FakeCache()await cache.set("key", "value", ttl=60)cache.assert_has_key("key")Return the stored value for key, or default if absent/expired.
Store value under key, optionally expiring after ttl seconds.
Delete key; return True if it existed.
Remove all entries.
Return a snapshot of currently stored keys.
Assert key is present in the cache.
Assert key maps to expected.
FakeClock
Section titled “FakeClock”Controllable clock for deterministic time-based testing.
Example
clock = FakeClock(datetime(2026, 1, 1, tzinfo=UTC))assert clock.now().year == 2026clock.advance(3600)assert clock.now().hour == 1Return the current fake time.
Return a monotonic counter (starts at 0).
Return Unix timestamp of the current fake time.
Move time forward by seconds.
Set the clock to a specific point in time.
Advance by exactly 1 second.
FakeCommandBus
Section titled “FakeCommandBus”Records dispatched commands for test assertions.
Satisfies a CommandBusProtocol interface: register() + dispatch().
Example
bus = FakeCommandBus()await bus.dispatch(CreateUser(email="a@example.com"))bus.assert_dispatched(CreateUser, count=1)Register handler for command_type.
Record command and invoke any registered handler.
All dispatched commands.
Return dispatched commands of command_type.
Assert command_type was dispatched, optionally count times.
Assert command_type was NOT dispatched.
Reset all recorded dispatches.
FakeConfig
Section titled “FakeConfig”In-memory config satisfying a ``ConfigProtocol``-like interface.
Supports dot-notation key access ("database.url") and optional
section retrieval with model instantiation.
Example
config = FakeConfig({"database": {"url": "sqlite:///:memory:"}})assert config.get("database.url") == "sqlite:///:memory:"config.set("cache.backend", "memory")The active deployment environment (returns test environment).
Return False for test environments.
Return False for test environments.
Return whether this is development.
Return whether this is testing.
Return whether this is staging.
Check whether a configuration section exists.
Retrieve key (dot-separated path) from config, returning default if absent.
Return the config section name, optionally instantiated as model_cls.
Set key (dot-separated path) to value — test-only helper.
FakeEventBus
Section titled “FakeEventBus”Records all published events for test assertions.
Satisfies the EventBusProtocol protocol from lexigram.contracts.events.
Example
bus = FakeEventBus()await bus.publish(UserCreated(user_id="abc"))bus.assert_published(UserCreated, count=1, user_id="abc")Register handler for event_type.
Remove handler for event_type.
Record event and dispatch to any registered handlers.
All published events.
Return published events matching event_type.
Assert that event_type was published.
Optionally verify count and attribute values.
Assert that event_type was NOT published.
Assert exactly one event_type was published.
Assert that events were published in the given positional order.
Checks that position i in the published event stream matches
event_types[i]. Use assert_published to verify events
that may appear anywhere in the stream.
| Parameter | Type | Description |
|---|
| Exception | Description |
|---|---|
| AssertionError | If any position does not match or the stream is shorter than the expected sequence. |
Example
await bus.publish(UserRegistered(user_id="1"))await bus.publish(EmailSent(user_id="1"))bus.assert_events_in_order(UserRegistered, EmailSent)Reset the recorded events list.
FakeLogger
Section titled “FakeLogger”Captures log entries for test assertions.
Satisfies LoggerProtocol from lexigram.contracts.core.logging.
Example
logger = FakeLogger()logger.info("user_created", user_id="abc")logger.assert_logged("info", "user_created")Capture a DEBUG-level log entry.
Capture an INFO-level log entry.
Capture a WARNING-level log entry.
Capture an ERROR-level log entry.
Capture a CRITICAL-level log entry.
Capture an EXCEPTION-level log entry.
def bind(**kwargs: Any) -> FakeLogger
Return a new FakeLogger sharing entries with merged context.
def unbind(*keys: str) -> FakeLogger
Return a new FakeLogger with specified context keys removed.
property entries() -> list[LogEntry]
All captured log entries.
Assert that a log entry with level containing msg_contains exists.
Assert no matching log entry exists.
Reset all captured entries.
FakeMetricsCollector
Section titled “FakeMetricsCollector”Records counter, gauge, and histogram observations for test assertions.
Example
metrics = FakeMetricsCollector()metrics.increment("requests.total")metrics.assert_counter_incremented("requests.total")Increment counter name by value.
Record gauge name = value.
Record a histogram observation for name.
Return the current value of counter name (0 if never incremented).
Assert counter name equals expected.
Assert counter name was incremented at least once.
Assert gauge name equals expected.
Reset all recorded observations.
FakeQueryBus
Section titled “FakeQueryBus”Records executed queries and returns configured canned results.
Example
bus = FakeQueryBus()bus.when(GetUser, return_value=user)result = await bus.execute(GetUser(user_id="123"))assert result == userRegister handler for query_type.
Configure a canned response to return for query_type.
Record and execute query, returning canned results first.
All executed queries.
Return executed queries of query_type.
Reset all recorded executions.
FakeStateStore
Section titled “FakeStateStore”In-memory fake satisfying ``StateStoreProtocol`` protocol.
Example
store = FakeStateStore()await store.set("session:abc", {"user_id": "u1"})value = await store.get("session:abc")Return the value for key, or None if absent.
Store value under key.
Delete key; return True if it existed.
Return True if key has a stored value.
Remove all stored entries.
FakeUnitOfWork
Section titled “FakeUnitOfWork”Tracks entity changes and events without persistence.
Optionally integrates with FakeEventBus to dispatch collected events on commit.
Example
async with FakeUnitOfWork() as uow: uow.register_new(user) uow.register_event(UserCreated(user_id=user.user_id))assert uow.committeddef __init__(event_bus: FakeEventBus | None = None) -> None
Mark entity as newly created.
Mark entity as modified.
Mark entity as deleted.
Queue event for dispatch on commit.
Return all queued events without consuming them.
Mark as committed and optionally publish queued events.
Mark as rolled back and clear all tracked changes.
FlagProviderCompliance
Section titled “FlagProviderCompliance”Reusable compliance suite for any ``FlagProvider`` implementation.
Subclass and implement create_provider. Override enabled_flag_name / disabled_flag_name to specify which flags the provider will return for enabled/disabled test cases.
The provider created by create_provider must have at least:
- one flag named
enabled_flag_namethat evaluates toTrue - one flag named
disabled_flag_namethat evaluates toFalse
Return a fully initialised FlagProvider for testing.
Name of a flag that the provider reports as enabled.
Name of a flag that the provider reports as disabled.
Name of a flag that does not exist in the provider.
get_flag returns a bool for a flag that exists in the provider.
get_flag on an unknown flag returns the default (or raises FlagNotFoundError).
An enabled flag evaluates to True.
A disabled flag evaluates to False.
Async evaluation via get_flag is awaitable and returns a bool.
default parameter is honoured when the flag is absent.
IntegrationEnvironment
Section titled “IntegrationEnvironment”A TestEnvironment pre-configured for integration tests.
Inherits the full TestEnvironment API (use_provider, fake,
override, resolve, async context-manager) and adds:
- with_database — boots a real database provider (defaults to SQLite in-memory so tests don’t need an external process).
- with_cache — boots a real cache provider (defaults to the in-memory cache backend so tests can run without Redis).
- with_all — combines database + cache in one call.
- Config override shortcut via
config=constructor argument.
None of the factory methods import the provider packages at class definition time — they use lazy imports so the test can collect and skip cleanly when a package isn’t installed.
def with_database( cls, url: str = 'sqlite+aiosqlite:///:memory:', *, name: str = 'integration-test', config: dict[str, Any] | None = None ) -> IntegrationEnvironment
Return an environment wired with a real database provider.
| Parameter | Type | Description |
|---|---|---|
| `url` | str | Database URL. Defaults to an in-memory SQLite database so tests run without any external process. |
| `name` | str | Environment name shown in log output. |
| `config` | dict[str, Any] | None | Additional config overrides (merged with the database URL). |
| Type | Description |
|---|---|
| IntegrationEnvironment | An ``IntegrationEnvironment`` with the database provider registered. |
| Exception | Description |
|---|---|
| ImportError | If ``lexigram-sql`` is not installed. |
def with_cache( cls, backend: str = 'memory', *, url: str | None = None, name: str = 'integration-test', config: dict[str, Any] | None = None ) -> IntegrationEnvironment
Return an environment wired with a real cache provider.
| Parameter | Type | Description |
|---|---|---|
| `backend` | str | Backend type — ``"memory"`` (default), ``"redis"``, or ``"memcached"``. |
| `url` | str | None | Backend connection URL (required for Redis / Memcached; omit for the in-memory backend). |
| `name` | str | Environment name. |
| `config` | dict[str, Any] | None | Additional config overrides. |
| Type | Description |
|---|---|
| IntegrationEnvironment | An ``IntegrationEnvironment`` with the cache provider registered. |
| Exception | Description |
|---|---|
| ImportError | If ``lexigram-cache`` is not installed. |
def with_all( cls, *, database_url: str = 'sqlite+aiosqlite:///:memory:', cache_backend: str = 'memory', cache_url: str | None = None, name: str = 'integration-test', config: dict[str, Any] | None = None ) -> IntegrationEnvironment
Return an environment with both database and cache providers.
This is a convenience factory combining with_database and with_cache. Both providers are registered before the environment is returned.
| Parameter | Type | Description |
|---|---|---|
| `database_url` | str | Passed to with_database. |
| `cache_backend` | str | Passed to with_cache. |
| `cache_url` | str | None | Passed to with_cache. |
| `name` | str | Environment name. |
| `config` | dict[str, Any] | None | Additional config overrides. |
| Exception | Description |
|---|---|
| ImportError | If any required extension package is missing. |
Set up the environment, injecting FakeConfig for config overrides.
If config was passed to the constructor, it is registered as an
override before the application boots so that it lands in the
container before it is frozen. The override is keyed by
ConfigProtocol when available, falling back to the FakeConfig
class itself.
IntegrationTestConfig
Section titled “IntegrationTestConfig”Configuration for integration tests.
All values read from environment variables with Docker Compose defaults.
Attributes: postgres_dsn: SQLAlchemy-style async DSN for PostgreSQL. postgres_dsn_raw: Plain asyncpg DSN (no +asyncpg driver prefix). redis_url: Redis connection URL. kafka_bootstrap: Kafka bootstrap servers. minio_endpoint: MinIO endpoint (host:port). minio_access_key: MinIO access key. minio_secret_key: MinIO secret key. elasticsearch_url: Elasticsearch base URL. mongodb_dsn: MongoDB connection string. qdrant_url: Qdrant HTTP URL. neo4j_url: Neo4j bolt URL. neo4j_auth: Neo4j auth string (user/password).
def from_env(cls) -> IntegrationTestConfig
Create config from current environment variables.
| Type | Description |
|---|---|
| IntegrationTestConfig | IntegrationTestConfig populated from environment with Docker Compose defaults. |
LexigramContainerHarness
Section titled “LexigramContainerHarness”Standardized DI container for isolated component testing.
This container automatically registers common mock providers and mock component implementations (LockStore, PubSubProtocol, etc.) to provide an stable baseline for unit and integration tests.
Usage
container = LexigramContainerHarness()# Common mocks are already registered
# Add your concrete provider for testingcontainer.register(MyProvider())
# Override specific mocks if neededcontainer.singleton(DatabaseProviderProtocol, MockDB())
service = await container.resolve(MyService)Context manager to temporarily override a dependency.
Example
with container.override(UserService, MockUserService()): …
Register a mock implementation directly as a singleton.
Resolve interface from the container, wrapping the outcome in Result.
This is the Result-returning counterpart to resolve. Use
it when you need to assert that a dependency fails to resolve — for
example to verify that a required binding is absent or mis-configured.
| Type | Description |
|---|---|
| Result[T, Exception] | ``Ok(instance)`` on success, ``Err(exception)`` if resolution raises for any reason (missing binding, circular dependency, etc.). |
Example
result = await container.try_resolve(MyService)assert result.is_err()assert isinstance(result.unwrap_err(), ResolutionError)Register implementation as protocol with runtime protocol validation.
If protocol is decorated with @runtime_checkable, this method
verifies that implementation is an instance of protocol before
registering it. Non-runtime-checkable protocols are accepted as-is
(mypy / pyright catch mismatches at type-check time).
| Parameter | Type | Description |
|---|---|---|
| `protocol` | type[T] | The contract / protocol type to register. |
| `implementation` | Any | The concrete object implementing *protocol*. |
| Exception | Description |
|---|---|
| TypeError | When *protocol* is ``@runtime_checkable`` and *implementation* does not satisfy ``isinstance`` check. |
Example
container.bind(CacheBackendProtocol, FakeCache())LogEntry
Section titled “LogEntry”A single captured log entry.
MiddlewareCompliance
Section titled “MiddlewareCompliance”Reusable compliance suite for any ``MiddlewareProtocol`` implementation.
Subclass and implement create_middleware. All tests exercise the
async __call__(context, next) contract defined by
MiddlewareProtocol.
The before / after / error terminology maps to the three observable
phases of a __call__-style middleware:
- before — code that runs prior to invoking
next - after — code that runs after
nextreturns - error — behaviour when the downstream
nextraises an exception
Return a fresh instance of the middleware under test.
Return a minimal request context suitable for the middleware.
Middleware is invoked before the downstream handler is called.
Verified by asserting that __call__ is awaitable and executes
without error when a valid context and next are provided.
Middleware receives and can observe the result from the downstream handler.
The result returned by __call__ must equal the value returned by
the inner next handler (pass-through behaviour is the minimum
requirement; middleware may wrap or transform but must not discard).
An exception raised by the downstream handler propagates through the middleware.
A middleware that does not explicitly handle errors must let the exception bubble up. If the middleware catches and re-raises the exception that is also acceptable, provided the original exception type is preserved (or wrapped in a framework-specific exception).
The context object passed to next is the same (or an enriched) version.
The middleware must not silently drop the context before forwarding it to the inner handler.
MockProvider
Section titled “MockProvider”Base class for mock providers in testing.
async def register(container: Container) -> None
No-op register for mock providers.
async def boot(container: Container) -> None
MockVectorStore
Section titled “MockVectorStore”Mock vector store for testing.
Provides simple in-memory storage with simulated similarity search. Does not require actual embeddings - uses text similarity instead.
Example
store = MockVectorStore() await store.add([ … Document(text=“Python is a programming language”, id=“1”), … Document(text=“JavaScript is for web development”, id=“2”) … ]) results = await store.search( … query_vector=[], # Ignored in mock … top_k=1 … )
Initialize mock vector store.
| Parameter | Type | Description |
|---|---|---|
| `config` | Any | None | Optional vector store configuration **kwargs: Dependencies, including: - similarity_threshold: Minimum similarity score for results - dimension_size: Dimension size of embeddings to validate |
Add documents to store.
| Parameter | Type | Description |
|---|---|---|
| `documents` | list[Document] | Documents to add |
| Type | Description |
|---|---|
| Result[list[str], VectorStoreError] | ``Ok(list[str])`` with document IDs on success. |
async def batch_upsert( documents: list[Document], batch_size: int = 100 ) -> Result[int, VectorStoreError]
Upsert documents in batches.
| Parameter | Type | Description |
|---|---|---|
| `documents` | list[Document] | List of documents to upsert. |
| `batch_size` | int | Number of documents per batch. defaults to 100. |
| Type | Description |
|---|---|
| Result[int, VectorStoreError] | ``Ok(int)`` with the total count of documents upserted, or ``Err(VectorStoreError)`` on failure. |
async def search( query_vector: list[float] | None = None, query: list[float] | str | None = None, k: int | None = None, top_k: int | None = None, filter: dict | None = None, filters: dict[str, Any] | None = None, filter_: dict | None = None, **kwargs: Any ) -> Result[list[SearchResult], VectorStoreError]
Search for similar documents.
Uses simple text matching instead of vector similarity. This is sufficient for testing most RAG workflows.
| Parameter | Type | Description |
|---|---|---|
| `query_vector` | list[float] | None | Query embedding (ignored in mock) k/top_k: Number of results filters/filter_: Metadata filters |
| Type | Description |
|---|---|
| Result[list[SearchResult], VectorStoreError] | ``Ok(list[SearchResult])`` on success. |
Delete documents by ID.
| Parameter | Type | Description |
|---|---|---|
| `ids` | list[str] | Document IDs to delete |
| Type | Description |
|---|---|
| Result[int, VectorStoreError] | ``Ok(int)`` with the count of deleted documents. |
Get total number of documents.
| Type | Description |
|---|---|
| int | Document count |
Clear all documents.
Close store.
Perform health check.
| Type | Description |
|---|---|
| HealthCheckResult | Structured health check result. |
async def add_texts( texts: list[str], embeddings: list[list[float]] | None = None, metadatas: list[dict[str, Any]] | None = None, collection_name: str | None = None ) -> list[str]
Convenience wrapper to add raw texts with optional embeddings/metadata.
| Exception | Description |
|---|---|
| VectorStoreError | If adding documents fails (unwraps the Result). |
MockVectorStoreWithErrors
Section titled “MockVectorStoreWithErrors”Mock vector store that can simulate errors.
Useful for testing error handling.
Example
store = MockVectorStoreWithErrors(fail_on_search=True) await store.search([0.1, 0.2], top_k=5) # Returns Err(VectorStoreError)
def __init__( fail_on_add: bool = False, fail_on_search: bool = False, fail_on_delete: bool = False, error_rate: float | None = None, error_message: str = 'Mock vector store error' )
Initialize error-simulating mock.
| Parameter | Type | Description |
|---|---|---|
| `fail_on_add` | bool | Whether to fail on add() |
| `fail_on_search` | bool | Whether to fail on search() |
| `fail_on_delete` | bool | Whether to fail on delete() |
| `error_rate` | float | None | Probabilistic error rate (0-1), compatible with fixtures |
| `error_message` | str | Error message to raise |
Add with possible error.
async def search( query_vector: list[float] | None = None, query: list[float] | str | None = None, k: int | None = None, top_k: int | None = None, filter: dict | None = None, filters: dict[str, Any] | None = None, filter_: dict | None = None, **kwargs: Any ) -> Result[list[SearchResult], VectorStoreError]
Search with possible error.
Delete with possible error.
MockVectorStoreWithSimilarity
Section titled “MockVectorStoreWithSimilarity”Mock vector store with actual similarity calculation.
Uses cosine similarity on provided embeddings if available, falls back to text matching otherwise.
Example
store = MockVectorStoreWithSimilarity() await store.add([ … Document( … text=“Python programming”, … embedding=[0.1, 0.2, 0.3], … id=“1” … ) … ]) results = await store.search( … query_vector=[0.1, 0.2, 0.3], … top_k=1 … ) results[0].score # High similarity 1.0
async def search( query_vector: list[float] | None = None, query: list[float] | str | None = None, k: int | None = None, top_k: int | None = None, filter: dict | None = None, filters: dict[str, Any] | None = None, filter_: dict | None = None, **kwargs: Any ) -> Result[list[SearchResult], VectorStoreError]
Search with actual similarity calculation.
| Parameter | Type | Description |
|---|---|---|
| `query_vector` | list[float] | None | Query embedding vector k/top_k: Number of results filters/filter_: Metadata filters |
| Type | Description |
|---|---|
| Result[list[SearchResult], VectorStoreError] | ``Ok(list[SearchResult])`` sorted by similarity. |
QueueBackendCompliance
Section titled “QueueBackendCompliance”Compliance suite for queue backend implementations.
Subclass and implement create_backend() to run all compliance tests.
Create the queue backend implementation under test.
| Parameter | Type | Description |
|---|---|---|
| `queue_name` | str | Name of the queue to use for testing. |
| Type | Description |
|---|---|
| Any | A fresh queue backend instance. |
enqueue() returns a message ID string.
dequeue() returns the message that was enqueued.
dequeue() returns None when no messages are available.
ack() completes without error for an in-flight message.
nack() with requeue=True makes the message available again.
nack() with requeue=False permanently discards the message.
RepositoryCompliance
Section titled “RepositoryCompliance”Reusable test suite for any ``RepositoryProtocol[T]`` implementation.
Verifies that the repository satisfies the standard persistence contract: save, get, delete, list.
Subclass and implement create_repository and create_entity:
.. code-block:: python
class TestInMemoryUserRepo(RepositoryCompliance[User]): async def create_repository(self): return InMemoryUserRepository()
def create_entity(self, **overrides): return User(id=str(uuid4()), name="Alice")Return a fresh, empty repository instance.
Return a new entity instance suitable for persistence.
save then get returns the entity.
get on a non-existent id returns None.
delete removes the entity and returns True.
delete on a non-existent id returns False.
list returns all saved entities.
Saving an entity with the same id replaces the old version.
SearchEngineCompliance
Section titled “SearchEngineCompliance”Reusable test suite for any ``SearchEngineProtocol`` implementation.
Subclass and implement create_engine:
.. code-block:: python
class TestMySearch(SearchEngineCompliance): async def create_engine(self): return MySearchEngine(url="http://localhost:9200")Return a ready-to-use search engine under test.
create_index and delete_index do not raise errors.
index_document succeeds and the document is searchable.
index_many successfully indexes multiple documents.
delete_document removes an indexed document without error.
search returns an object with a hits or results attribute.
health_check returns a HealthCheckResult.
ServiceProbe
Section titled “ServiceProbe”Async probes for common external services.
Each method attempts a real network connection and returns True
if the service is reachable, False otherwise. All probes
suppress all exceptions — they are purely availability indicators,
not functional tests.
Example
@pytest.fixtureasync def redis_client(): if not await ServiceProbe.check_redis(): pytest.skip("Redis not available") # ... connect and return clientReturn True if a Redis server is reachable at url.
Return True if a PostgreSQL server is reachable at dsn.
Return True if an Elasticsearch HTTP endpoint is reachable.
Return True if a RabbitMQ AMQP broker is reachable.
Return True if a Meilisearch HTTP endpoint is reachable.
Return True if an SMTP server is accepting connections.
Return True if a Kafka broker is reachable.
| Parameter | Type | Description |
|---|---|---|
| `bootstrap` | str | Kafka bootstrap server address. |
| Type | Description |
|---|---|
| bool | True if a broker connection succeeds. |
Return True if a MinIO endpoint is reachable.
| Parameter | Type | Description |
|---|---|---|
| `endpoint` | str | MinIO endpoint in host:port format. |
| Type | Description |
|---|---|
| bool | True if the HTTP endpoint responds. |
Return True if a MongoDB server is reachable.
| Parameter | Type | Description |
|---|---|---|
| `dsn` | str | MongoDB connection string. |
| Type | Description |
|---|---|
| bool | True if the server responds to a ping. |
Return True if a Qdrant vector store is reachable.
| Parameter | Type | Description |
|---|---|---|
| `url` | str | Qdrant HTTP URL. |
| Type | Description |
|---|---|
| bool | True if the healthz endpoint responds with 200. |
Return True if a Neo4j graph database is reachable.
| Parameter | Type | Description |
|---|---|---|
| `url` | str | Neo4j bolt URL. |
| Type | Description |
|---|---|
| bool | True if a TCP connection to the bolt port succeeds. |
SnapshotAsserter
Section titled “SnapshotAsserter”Assert that a value matches a stored snapshot.
On first run (no snapshot file exists), the current value is persisted to
disk and the test is skipped via pytest.skip. On subsequent runs the
value is compared to the stored snapshot and the test fails if they differ.
| Parameter | Type | Description |
|---|---|---|
| `snapshot_dir` | Directory where snapshot files are stored. Defaults to a ``__snapshots__`` subdirectory of the current working directory. |
Example
asserter = SnapshotAsserter(Path("tests/__snapshots__"))asserter.assert_match("create_user_response", response_body)Assert value matches the stored snapshot for name.
Creates the snapshot and skips the test on first call.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Unique snapshot identifier (used as the filename stem). |
| `value` | Any | The value to compare. Must be JSON-serialisable. |
| Exception | Description |
|---|---|
| SnapshotMismatchError | If the value does not match the stored snapshot. |
Persist value as the stored snapshot for name.
| Parameter | Type | Description |
|---|---|---|
| `name` | str | Unique snapshot identifier. |
| `value` | Any | The value to store. |
| Type | Description |
|---|---|
| Path | Path to the written snapshot file. |
Delete the snapshot file for name.
| Type | Description |
|---|---|
| bool | ``True`` if the file existed and was deleted; ``False`` otherwise. |
Return True if a snapshot for name exists on disk.
SystemClock
Section titled “SystemClock”Real system clock satisfying the Clock protocol.
Return the current UTC time.
Return the monotonic clock value.
Return the current Unix timestamp.
TaskQueueCompliance
Section titled “TaskQueueCompliance”Reusable test suite for any ``TaskQueueProtocol`` implementation.
Subclass and implement create_queue:
.. code-block:: python
class TestMyQueue(TaskQueueCompliance): async def create_queue(self): return MemoryTaskQueue()Return a fresh, empty instance of the queue under test.
enqueue returns the task id.
dequeue returns the task that was enqueued.
dequeue on an empty queue returns None.
ack on a known in-flight task completes without error.
ack on an unknown task id is a safe no-op.
nack with requeue=True makes the task available for dequeue again.
nack with requeue=False permanently discards the task.
nack on an unknown task id is a safe no-op.
get_task_count returns the number of tasks waiting to be dequeued.
clear removes all pending tasks.
TaskTestBed
Section titled “TaskTestBed”Test environment pre-wired with mock task queue and executor.
Unlike the generic TestEnvironment,
TaskTestBed overrides setup and teardown to manage
the mock components directly without starting a full Application.
Attributes:
mock_queue: In-memory MockTaskQueue instance created during setup.
mock_executor: MockTaskExecutor instance created during setup.
Example
bed = TaskTestBed()await bed.setup()
client = TaskTestClient(bed)task_ids = await client.enqueue_test_tasks()...
await bed.teardown()Initialise mock components and pre-populate the queue.
Creates a fresh MockTaskQueue
and MockTaskExecutor, then
enqueues the three sample tasks returned by
sample_tasks.
Does not start an Application or DI container.
Clear the mock queue and reset state.
Safe to call even if setup was never called.
Return all tasks currently waiting in the mock queue.
| Type | Description |
|---|---|
| list[Any] | A list of JobProtocol objects, in enqueue order. |
| Exception | Description |
|---|---|
| RuntimeError | If called before setup. |
TaskTestClient
Section titled “TaskTestClient”High-level task-system test helper.
Wraps a TaskTestBed and provides convenience methods for testing the task lifecycle without a running application or external queue backend.
Attributes:
test_bed: The underlying TaskTestBed that owns the mocks.
provider: The currently active MockTasksProvider, or
None when no provider is running.
Example
bed = TaskTestBed()await bed.setup()
client = TaskTestClient(bed)
async with client.task_context() as provider: task_ids = await client.enqueue_test_tasks() ...
await bed.teardown()def __init__(test_bed: TaskTestBed) -> None
Create and start a MockTasksProvider bound to the test bed.
Sets self.provider and returns the new provider so callers can
assert on its identity.
| Type | Description |
|---|---|
| MockTasksProvider | The newly created MockTasksProvider. |
Shut down the active provider and clear self.provider.
No-op if no provider is running.
Enqueue the three standard sample tasks into the mock queue.
Creates JobProtocol instances from sample_tasks and enqueues them.
| Type | Description |
|---|---|
| list[str] | A list of three task-id strings in enqueue order. |
Execute task via the mock executor.
| Parameter | Type | Description |
|---|---|---|
| `task` | Any | A JobProtocol (or any object compatible with MockTaskExecutor). |
| Type | Description |
|---|---|
| MockTaskResult[dict[str, Any]] | A MockTaskResult describing the outcome. |
Async context manager that starts a provider and stops it on exit.
| Type | Description |
|---|---|
| AsyncGenerator[MockTasksProvider, None] | The active MockTasksProvider. |
Example
async with client.task_context() as provider: assert provider is not Noneassert client.provider is NoneTaskTestData
Section titled “TaskTestData”Static collection of canned task/job fixtures for testing.
All methods return plain dict objects so that callers can use them without importing task-specific model classes. The dicts are intentionally simple and cover the same task names that MockTaskExecutor recognises.
Return three representative task definitions.
| Type | Description |
|---|---|
| list[dict[str, Any]] | A list of three task dicts covering email_notification, data_processing, and cleanup_job. |
Return two representative background-job definitions.
| Type | Description |
|---|---|
| list[dict[str, Any]] | A list with batch_import and maintenance dicts. |
Return two scheduled-job definitions.
| Type | Description |
|---|---|
| list[dict[str, Any]] | A list with daily_backup and hourly_cleanup dicts. |
TestAssertions
Section titled “TestAssertions”Custom assertion helpers for testing.
def assert_eventually_true( condition_func: Callable[[], bool], timeout: float = 5.0, message: str = 'Condition never became true' ) -> None
def assert_dict_contains_subset( subset: dict[str, Any], superset: dict[str, Any], message: str | None = None ) -> None
TestDataFactory
Section titled “TestDataFactory”Factory for generating test data.
TestEnvironment
Section titled “TestEnvironment”Isolated test environment for provider testing.
test = False
The TestEnvironment provides a controlled testing environment for Lexigram providers and applications. It manages the application lifecycle, dependency injection container, and provider registration for tests.
Attributes: name: Name of the test environment. app: The Application instance, if created. container: The DI container for service resolution. providers: Dictionary of registered providers. mock_providers: Dictionary of registered mock providers.
Example
Creating an environment
env = TestEnvironment("my-test")
# Add providersenv.use_provider(MyProvider())
# Add mock providersenv.use_mock_provider(MockDatabaseProvider())
# Override servicesenv.override_service(DbService, MockDbService)
# Run the testasync with env.run(): service = await env.container.resolve(MyService)Using with pytest fixtures
@pytest.fixtureasync def test_env(): env = TestEnvironment("test") async with env.run(): yield envdef __init__(name_or_app: str | Application = 'test-bed') -> None
def use_provider(provider: Provider) -> TestEnvironment
Add a provider to the test bed.
def use_mock_provider(provider: MockProvider) -> TestEnvironment
Add a mock provider to the test bed.
def override( interface: type, implementation: Any ) -> TestEnvironment
Override a service registration.
def add_fixture( name: str, fixture_func: Callable ) -> TestEnvironment
Add a test fixture.
def fake(contract: type) -> TestEnvironment
Register the well-known fake for contract and return self.
The fake-registry maps core protocol types to their in-memory doubles
from lexigram.testing.fakes. Raises :exc:ValueError if no
fake is registered for the given contract.
Example
env.fake(FakeEventBus).fake(FakeLogger)Return the fake instance registered for contract.
Raises :exc:ValueError if contract has no registered fake.
Use this in tests to inspect recorded interactions after the
code under test has run.
Example
bus = env.get_fake(FakeEventBus)bus.assert_published(UserCreated)async def setup() -> Application
Setup the test environment.
If called on an already-running environment, automatically tears down and
restarts if new providers have been added since the last setup — enabling
the pattern of incremental provider registration followed by a second
setup() call.
Teardown the test environment.
Hook for subclasses to tear down providers (no-op default).
async def context() -> AsyncGenerator[TestEnvironment, None]
Context manager for test bed lifecycle.
def get_provider(name: str) -> Provider | None
Get a provider by name.
def get_mock_provider(name: str) -> MockProvider | None
Get a mock provider by name.
Resolve a service from the container synchronously.
Resolve a service from the container asynchronously.
Get health status of all providers.
def create_mock( provider_class: type[MockProvider], **kwargs: Any ) -> MockProvider
Create a mock provider instance.
TestingModule
Section titled “TestingModule”Testing utilities and fixtures module.
Provides test doubles, container overrides, and lifecycle trackers for use in tests.
Usage
@module(imports=[TestingModule.configure()])class TestAppModule(Module): passdef configure( cls, **kwargs: Any ) -> DynamicModule
Create a TestingModule for use in test suites.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
VectorStoreCompliance
Section titled “VectorStoreCompliance”Compliance suite for VectorStoreProtocol implementations.
Subclass and implement create_store() to run all compliance tests. The store is connected before tests and disconnected after.
Create a connected VectorStoreProtocol implementation under test.
| Type | Description |
|---|---|
| Any | A connected VectorStoreProtocol instance. |
health_check() returns a healthy result.
list_collections() returns a list.
create_collection() creates a collection visible via collection_exists().
delete_collection() removes the collection.
collection_exists() returns False for a non-existent collection.
upsert() increases count in the collection.
search() returns results after upsert.
get() retrieves vectors by their IDs.
delete() removes vectors by ID.
WebTestBed
Section titled “WebTestBed”TestBed specialized for web applications.
Extends TestEnvironment to add
web-specific functionality. Accepts a WebProvider or an Application
and exposes a fully functional HTTP client for making requests against
the mounted ASGI app.
After setup, the following attributes are available:
bed.client: A ~WebTestClient backed by the real ASGI app.bed.web_provider: The WebProvider instance.bed.container: The DI container (for overrides and resolution).
| Parameter | Type | Description |
|---|---|---|
| `provider_or_app` | A WebProvider instance or a Lexigram Application. | |
| `name` | Name of the test environment (defaults to "web-test-bed"). | |
| `raise_server_exceptions` | Whether the test client re-raises server errors. Defaults to True. Set to False to inspect 5xx responses as assertions. |
def __init__( provider_or_app: WebProvider | Application, name: str = 'web-test-bed', raise_server_exceptions: bool = True ) -> None
Boot the application and wire up the test client.
def override( interface: type, implementation: Any ) -> WebTestBed
Override a DI registration for testing (fluent interface).
Must be called before setup. Returns self for chaining.
Perform a GET request via the test client.
Perform a POST request via the test client.
Perform a PUT request via the test client.
Perform a DELETE request via the test client.
Perform a PATCH request via the test client.
WebTestClient
Section titled “WebTestClient”Test client for testing web applications.
Wraps Starlette’s TestClient to provide a seamless testing experience for Lexigram web applications. It automatically extracts the underlying ASGI application from a Lexigram Application instance and provides assertion-aware TestResponse objects.
Note This class was renamed from
TestClientto avoid pytest collecting it as a test class.
def __init__( app: Any, base_url: str = 'http://testserver', raise_server_exceptions: bool = True, **kwargs: Any ) -> None
Initialize the WebTestClient.
| Parameter | Type | Description |
|---|---|---|
| `app` | Any | A Lexigram Application, WebProvider, or raw ASGI application. |
| `base_url` | str | The base URL for requests. |
| `raise_server_exceptions` | bool | Whether to raise exceptions that occur in the ASGI app. **kwargs: Additional arguments passed to the underlying Starlette TestClient. |
def as_user(user: Any) -> WebTestClient
Simulate an authenticated user for subsequent requests.
| Parameter | Type | Description |
|---|---|---|
| `user` | Any | The user object (must have an 'id' attribute or be convertible to str). |
| Type | Description |
|---|---|
| WebTestClient | self for chaining. |
Perform a GET request.
Perform an OPTIONS request.
Perform a HEAD request.
Perform a POST request.
Perform a PUT request.
Perform a PATCH request.
Perform a DELETE request.
def request( method: str, url: str, headers: dict[str, str] | None = None, **kwargs: Any ) -> TestResponse
Perform a generic HTTP request.
Perform a WebSocket connection.
Functions
Section titled “Functions”assert_all_ok
Section titled “assert_all_ok”
Assert every Result in results is Ok and return the unwrapped values.
Example
values = assert_all_ok([service.find(id) for id in ids])assert_err
Section titled “assert_err”
Assert result is Err, optionally checking the error type.
Short alias combining assert_result_err and assert_result_err_type. Intended for tests that want concise, idiomatic assertions
error = assert_err(service.find_user("missing"), UserNotFound)assert error.user_id == "missing"| Parameter | Type | Description |
|---|---|---|
| `result` | Result[Any, E] | The ``Result`` to inspect. |
| `error_type` | type[E] | None | When provided, asserts that the error is an instance of this type. |
| Type | Description |
|---|---|
| E | The unwrapped ``Err`` value. |
| Exception | Description |
|---|---|
| AssertionError | If *result* is ``Ok``, or if *error_type* is given and the error is not an instance of that type. |
assert_healthy
Section titled “assert_healthy”
Assert every health check in results passed.
Example
results = await health_checker.run_all()assert_healthy(results)assert_ok
Section titled “assert_ok”
Assert result is Ok and return the inner value.
Short alias for assert_result_ok. Intended for tests that want concise, idiomatic assertions
user = assert_ok(service.find_user("123"))assert user.email == "test@example.com"| Parameter | Type | Description |
|---|---|---|
| `result` | Result[T, E] | The ``Result`` to inspect. |
| Type | Description |
|---|---|
| T | The unwrapped ``Ok`` value. |
| Exception | Description |
|---|---|
| AssertionError | If *result* is ``Err``. |
assert_result_err
Section titled “assert_result_err”
Unwrap and return the Err value; raise AssertionError if Ok.
Example
error = assert_result_err(service.create_user(bad_data))assert "email" in str(error)assert_result_err_contains
Section titled “assert_result_err_contains”
Assert Result is Err and str(error) contains substring.
Example
assert_result_err_contains(result, "not found")assert_result_err_type
Section titled “assert_result_err_type”
Assert the result is Err with an error of the given type.
Example
assert_result_err_type(result, ValidationError)assert_result_maps_to
Section titled “assert_result_maps_to”
Assert the Ok value transforms to expected via mapper.
Example
assert_result_maps_to(result, lambda u: u.email, "user@example.com")assert_result_ok
Section titled “assert_result_ok”
Unwrap and return the Ok value; raise AssertionError if Err.
Example
user = assert_result_ok(service.create_user(data))assert user.name == "Jo"assert_result_ok_value
Section titled “assert_result_ok_value”
Assert Result is Ok with a specific value.
Example
assert_result_ok_value(result, expected_user)make_resource_record
Section titled “make_resource_record”
Create a minimal resource record dict for testing.
Provides sensible defaults so callers only need to specify the fields relevant to the test.
| Parameter | Type | Description |
|---|---|---|
| `resource_type` | str | Record type tag (stored in ``_type``). **fields: Field values to override defaults. |
| Type | Description |
|---|---|
| dict[str, Any] | Dict representing the resource record. |
Example
user = make_resource_record("user", name="Alice", email="a@b.com")# {"id": "test-user-1", "_type": "user", "name": "Alice", ...}override
Section titled “override”
def override( container: Container, interface: type[T], implementation: Any ) -> Iterator[None]
Context manager to temporarily override a dependency in the container.
Example
with override(container, UserService, MockUserService()): # UserService resolves to MockUserService here …
Exceptions
Section titled “Exceptions”SnapshotMismatchError
Section titled “SnapshotMismatchError”Raised when a value does not match the stored snapshot.