Skip to content
GitHub

API Reference

Protocol for time sources — allows swapping real/fake clocks.
now
def now() -> datetime
monotonic
def monotonic() -> float
time
def time() -> float

__init__
def __init__(
    name: str = 'ai-test',
    **kwargs
) -> None

Test client that provides ergonomic helpers for exercising AI service boundaries.

Wraps a TestEnvironment (or AITestBed) and provides recording wrappers for common AI operations — LLM completions, vector searches, and ML predictions — so tests can make assertions about what the system under test invoked.

Parameters
ParameterTypeDescription
`test_bed`The test environment that owns AI providers and test data.
`max_tokens_per_run`Maximum token budget for all LLM operations in this test run. Set to ``0`` to disable enforcement. Defaults to 10 000.
__init__
def __init__(
    test_bed: object,
    *,
    max_tokens_per_run: int = 10000
) -> None
test_data
property test_data() -> AITestData

Return the AITestData instance owned by the test bed.

tokens_used
property tokens_used() -> int

Total tokens consumed by LLM completions in this run.

token_budget
property token_budget() -> int

Configured maximum tokens per run (0 = unlimited).

reset_token_budget
def reset_token_budget() -> None

Reset the token usage counter to zero.

complete_with_llm
async def complete_with_llm(
    prompt: str,
    **kwargs: object
) -> dict

Simulate an LLM completion, enforce the token budget, and record it.

Token cost is read from the mock response’s "tokens" field if present; otherwise estimated as ceil(len(prompt.split()) * 1.3).

Parameters
ParameterTypeDescription
`prompt`strThe prompt text sent to the (mock) LLM. **kwargs: Extra fields included in the recorded entry.
Returns
TypeDescription
dictThe recorded completion dict.
Raises
ExceptionDescription
TokenBudgetExceededErrorWhen the token budget is set and would be exceeded.
search_vector_store
async def search_vector_store(
    query: str,
    **kwargs: object
) -> dict

Simulate a vector store search and record it in test_data.

Returns the mock response keyed "vector" if configured, otherwise an empty results structure.

predict_with_ml
async def predict_with_ml(
    input_data: object,
    **kwargs: object
) -> dict

Simulate an ML model prediction and record it in test_data.

Returns the mock response keyed "ml" if configured, otherwise a default confidence structure.

assert_llm_completions_count
def assert_llm_completions_count(expected: int) -> None

Assert that exactly expected LLM completions were recorded.

assert_vector_searches_count
def assert_vector_searches_count(expected: int) -> None

Assert that exactly expected vector searches were recorded.

assert_ml_predictions_count
def assert_ml_predictions_count(expected: int) -> None

Assert that exactly expected ML predictions were recorded.


Lightweight wrapper around a raw response for admin assertions.

Attributes: status_code: HTTP status code. headers: Response headers dict. text: Response body as text. json_data: Parsed JSON body (None if not JSON).

is_htmx_redirect
def is_htmx_redirect() -> bool

Return True if the response contains HX-Redirect header.

htmx_redirect_url
def htmx_redirect_url() -> str | None

Return the HX-Redirect URL if present.

htmx_trigger
def htmx_trigger() -> str | None

Return the HX-Trigger header value if present.


Test client with helpers for lexigram-admin endpoints.

Wraps a Starlette TestClient (or any httpx-compatible sync client) and provides assertion helpers and convenience methods for CRUD operations.

Parameters
ParameterTypeDescription
`client`An httpx-compatible test client (e.g. Starlette ``TestClient``).
`prefix`Admin URL prefix (default ``"/admin"``).
`default_headers`Headers added to every request.
__init__
def __init__(
    client: Any,
    prefix: str = '/admin',
    default_headers: dict[str, str] | None = None
) -> None
url
def url(
    resource: str,
    *parts: str
) -> str

Build an admin resource URL.

Parameters
ParameterTypeDescription
`resource`strResource name (e.g. ``"user"``). *parts: Additional path segments.
Returns
TypeDescription
strFull URL string, e.g. ``"/admin/user/123/edit"``.
list
def list(
    resource: str,
    params: dict[str, str] | None = None
) -> AdminResponse

GET the list page for a resource.

detail
def detail(
    resource: str,
    record_id: str
) -> AdminResponse

GET the detail page for a record.

create
def create(
    resource: str,
    data: dict[str, Any]
) -> AdminResponse

POST to create a new record.

update
def update(
    resource: str,
    record_id: str,
    data: dict[str, Any]
) -> AdminResponse

POST to update an existing record.

delete
def delete(
    resource: str,
    record_id: str
) -> AdminResponse

POST (or DELETE) to delete a record.

bulk_action
def bulk_action(
    resource: str,
    action: str,
    ids: list[str]
) -> AdminResponse

POST a bulk action.

search
def search(
    resource: str,
    query: str
) -> AdminResponse

GET the list page with a search query.

assert_status
def assert_status(
    resp: AdminResponse,
    expected: int
) -> None

Assert response has expected HTTP status code.

assert_ok
def assert_ok(resp: AdminResponse) -> None

Assert response is 200 OK.

assert_redirect
def assert_redirect(resp: AdminResponse) -> None

Assert response is an HTMX or standard redirect.

assert_unprocessable
def assert_unprocessable(resp: AdminResponse) -> None

Assert response is HTTP 422 (validation error).

assert_contains
def assert_contains(
    resp: AdminResponse,
    text: str
) -> None

Assert response body contains text.

assert_htmx_trigger
def assert_htmx_trigger(
    resp: AdminResponse,
    event: str
) -> None

Assert HX-Trigger header contains event.


Test harness that boots the application with optional DI overrides.

Attributes: app: The booted Application. container: The DI container for direct service resolution. client: HTTP test client backed by the real ASGI app.

__init__
def __init__() -> None
from_factory
async def from_factory(
    cls,
    factory: str | Any,
    overrides: dict[type, Any] | None = None
) -> AsyncIterator[AppTestBed]

Create a test bed from an application factory.

Parameters
ParameterTypeDescription
`factory`str | AnyEither a dotted import string (``"my_app.app:create_app"``) or a callable that returns an Application.
`overrides`dict[type, Any] | NoneOptional dict mapping service types to replacement instances. Applied after providers register but before boot.
Yields
TypeDescription
AsyncIterator[AppTestBed]A fully booted AppTestBed.

Example

async with AppTestBed.from_factory(
"my_app.app:create_app",
overrides={EmailService: MockEmailService()},
) as bed:
resp = await bed.client.get("/health")
assert resp.status_code == 200
async with AppTestBed.from_factory(
"my_app.app:create_app",
overrides={EmailService: MockEmailService()},
) as bed:
resp = await bed.client.get("/health")
assert resp.status_code == 200
from_app
async def from_app(
    cls,
    app: Any,
    overrides: dict[type, Any] | None = None
) -> AsyncIterator[AppTestBed]

Create a test bed from an already-constructed Application.

Parameters
ParameterTypeDescription
`app`AnyAn Application instance.
`overrides`dict[type, Any] | NoneOptional DI overrides (same as from_factory).
Yields
TypeDescription
AsyncIterator[AppTestBed]A fully booted AppTestBed.

Helper utilities for async testing.
wait_for_condition
async def wait_for_condition(
    condition_func: Callable[[], bool],
    timeout: float = 5.0,
    interval: float = 0.1
) -> bool
collect_async_results
async def collect_async_results(coros: list[Coroutine[Any, Any, Any]]) -> list[Any]
run_with_timeout
async def run_with_timeout(
    coro: Coroutine[Any, Any, Any],
    timeout: float
) -> Any

Compliance suite for AuditLoggerProtocol implementations.

Subclass and implement create_logger() to run all compliance tests.

create_logger
async def create_logger() -> Any

Create the AuditLoggerProtocol implementation under test.

Returns
TypeDescription
AnyA fresh instance implementing AuditLoggerProtocol.
test_log_does_not_raise
async def test_log_does_not_raise() -> None

log() completes without raising for a valid entry.

test_log_creates_queryable_entry
async def test_log_creates_queryable_entry() -> None

log() creates an entry that is returned by query().

test_log_with_metadata
async def test_log_with_metadata() -> None

log() stores extra metadata fields.

test_query_by_actor
async def test_query_by_actor() -> None

query() filters by actor_id.

test_query_by_action
async def test_query_by_action() -> None

query() filters by action name.

test_query_returns_list
async def test_query_returns_list() -> None

query() always returns a list, even when empty.


Compliance suite for AuditStoreProtocol implementations.

Subclass and implement create_store() to run all compliance tests.

create_store
async def create_store() -> Any

Create the AuditStoreProtocol implementation under test.

Returns
TypeDescription
AnyA fresh instance implementing AuditStoreProtocol.
test_append_does_not_raise
async def test_append_does_not_raise() -> None

append() completes without raising for a valid entry.

test_append_and_query_round_trip
async def test_append_and_query_round_trip() -> None

append() and query() round-trip an audit entry.

test_count_reflects_appended
async def test_count_reflects_appended() -> None

count() returns correct count after appends.

test_query_by_actor
async def test_query_by_actor() -> None

query() filters by actor_id.

test_query_empty_returns_list
async def test_query_empty_returns_list() -> None

query() returns an empty list for unknown actor.


Reusable test suite for any ``BlobStoreProtocol`` implementation.

Subclass and implement create_store:

.. code-block:: python

class TestMyBlobStore(BlobStoreCompliance):
async def create_store(self):
return LocalDriver(root="/tmp/test")
create_store
async def create_store() -> Any

Return a ready-to-use, empty BlobStoreProtocol under test.

test_upload_and_download
async def test_upload_and_download() -> None

upload then download returns the same bytes.

test_exists_after_upload
async def test_exists_after_upload() -> None

exists returns True after a file is uploaded.

test_not_exists_for_missing_path
async def test_not_exists_for_missing_path() -> None

exists returns False for a path that has never been uploaded.

test_delete_removes_file
async def test_delete_removes_file() -> None

delete removes the file; exists returns False afterwards.

test_list_returns_uploaded_paths
async def test_list_returns_uploaded_paths() -> None

list with a prefix returns paths of uploaded objects.

test_get_url_returns_string
async def test_get_url_returns_string() -> None

get_url returns a non-empty string.

test_health_check_returns_result
async def test_health_check_returns_result() -> None

health_check returns a HealthCheckResult.


Reusable test suite for any ``CacheBackendProtocol`` implementation.

Subclass and implement create_backend:

.. code-block:: python

class TestRedisCache(CacheBackendCompliance):
async def create_backend(self):
return RedisCacheBackend("redis://localhost")
create_backend
async def create_backend() -> Any

Return a fresh, empty instance of the backend under test.

test_set_and_get
async def test_set_and_get() -> None

set then get returns the stored value.

test_get_missing_returns_default
async def test_get_missing_returns_default() -> None

get on a missing key returns the default.

test_get_missing_returns_none_without_default
async def test_get_missing_returns_none_without_default() -> None

get on a missing key returns None when no default is given.

test_delete_existing_key
async def test_delete_existing_key() -> None

delete returns True and removes the key.

test_delete_missing_key
async def test_delete_missing_key() -> None

delete on a missing key returns False.

test_clear
async def test_clear() -> None

clear removes all stored entries.

test_overwrite_existing_key
async def test_overwrite_existing_key() -> None

set on an existing key overwrites the value.

test_ttl_expiry
async def test_ttl_expiry() -> None

Values with TTL expire and are no longer returned.

test_no_ttl_does_not_expire
async def test_no_ttl_does_not_expire() -> None

Values without TTL persist and are accessible.

test_stores_various_value_types
async def test_stores_various_value_types() -> None

The backend accepts strings, ints, dicts, and lists.


Factory for creating test containers with common provider registrations.

This factory provides a standardized way to create DI containers for testing that automatically register common protocols and mock implementations.

Example

factory = ContainerFactory()
container = factory.create_test_container()
# Container has mock providers registered for common protocols
__init__
def __init__() -> None
create_test_container
def create_test_container() -> Any

Create a test container with common protocol registrations.

Returns
TypeDescription
AnyConfigured container with mock implementations for common protocols.

Isolated DI container scoped to a single test.

Wraps LexigramContainerHarness with automatic disposal and convenience helpers for registering overrides.

Intended to be used through the test_container pytest fixture so each test receives a fully isolated container with fresh mock registrations.

Typical usage (via pytest fixture)

async def test_my_service(test_container: ContainerTestFixture) -> None:
test_container.mock(UserRepository, FakeUserRepository())
service = await test_container.get(UserService)
result = await service.create(email="a@b.com")
assert result.is_ok()
async def test_my_service(test_container: ContainerTestFixture) -> None:
test_container.mock(UserRepository, FakeUserRepository())
service = await test_container.get(UserService)
result = await service.create(email="a@b.com")
assert result.is_ok()

Direct usage (as async context manager)

async with ContainerTestFixture() as fixture:
fixture.mock(CacheBackendProtocol, FakeCache())
svc = await fixture.get(MyService)
async with ContainerTestFixture() as fixture:
fixture.mock(CacheBackendProtocol, FakeCache())
svc = await fixture.get(MyService)
__init__
def __init__(
    *,
    register_mocks: bool = False
) -> None
get
async def get(interface: type[T]) -> T

Resolve interface from the container.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to resolve.
Returns
TypeDescription
TThe resolved instance.
Raises
ExceptionDescription
RuntimeErrorIf the fixture has already been disposed.
get_optional
async def get_optional(interface: type[T]) -> T | None

Resolve interface, returning None if not registered.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to resolve.
Returns
TypeDescription
T | NoneThe resolved instance, or ``None`` if not found.
mock
def mock(
    interface: type[T],
    mock_obj: Any
) -> None

Register mock_obj as the singleton for interface.

This is the primary way to wire test doubles into the container.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to override.
`mock_obj`AnyThe mock/fake/stub to register.
override
async def override(
    interface: type[T],
    mock_obj: Any
) -> AsyncGenerator[None, None]

Register mock_obj for interface for the duration of the block.

Because each test receives a fresh container, this is equivalent to calling mock for the lifetime of the scope. It exists purely for readability in tests that want to be explicit about scope.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to override.
`mock_obj`AnyThe mock/fake/stub to register.
Yields
TypeDescription
AsyncGenerator[None, None]Nothing; the override is active for the body of the block.
dispose
async def dispose() -> None

Dispose the container and release all resources.

Idempotent — safe to call multiple times.

container
property container() -> LexigramContainerHarness

Return the underlying LexigramContainerHarness.

Use this for advanced scenarios that require direct container access. Prefer get and mock for normal test usage.


Reusable test suite for any ``DatabaseProviderProtocol`` implementation.

Subclass and implement create_provider:

.. code-block:: python

class TestMyProvider(DatabaseProviderCompliance):
async def create_provider(self):
return MyDatabaseProvider(dsn="sqlite+aiosqlite:///:memory:")
create_provider
async def create_provider() -> Any

Return a ready-to-use provider instance under test.

test_health_check_returns_result
async def test_health_check_returns_result() -> None

health_check returns a HealthCheckResult.

test_scoped_context_provides_connection
async def test_scoped_context_provides_connection() -> None

get_scoped_connection returns a connection within a scoped context.

test_scoped_context_is_reentrant
async def test_scoped_context_is_reentrant() -> None

Nested scoped_context blocks do not raise.

test_execute_ddl
async def test_execute_ddl() -> None

execute can run DDL statements without error.

test_execute_and_fetch
async def test_execute_and_fetch() -> None

Data inserted via execute is retrievable via fetch_one.

test_transaction_rollback_discards_writes
async def test_transaction_rollback_discards_writes() -> None

Data written inside a rolled-back transaction is not persisted.


__init__
def __init__(
    name: str = 'db-test-bed',
    connection_string: str = ':memory:',
    auto_cleanup: bool = True
)
create_test_table
async def create_test_table(
    name: str,
    schema: str
) -> None

Create a test table with the given name and SQL schema string.

seed_test_data
async def seed_test_data(
    table: str,
    data: list[dict] | dict
) -> None

Seed test data into a table.

clear_test_data
async def clear_test_data(table: str) -> None

Clear all rows from a test table.


__init__
def __init__(
    connection_string: str = ':memory:',
    auto_cleanup: bool = True
)
cleanup
async def cleanup() -> None

Drop tracked tables and clear table data.

connect
async def connect() -> None
disconnect
async def disconnect() -> None
execute_query
async def execute_query(
    query: str,
    params: list | None = None
) -> list[dict]
execute
async def execute(
    query: str,
    params: list | None = None,
    fetch: bool = True
) -> list[dict] | int
create_table
async def create_table(
    name: str,
    schema: dict | str
) -> None
drop_table
async def drop_table(name: str) -> None
clear_table
async def clear_table(name: str) -> None
get_table_count
async def get_table_count(table_name: str) -> int
insert_data
async def insert_data(
    table: str,
    data: dict | list[dict]
) -> int
insert
async def insert(
    table: str,
    data: dict | list[dict]
) -> int

Compliance suite for DistributedLockProtocol implementations.

Subclass and implement create_lock() to run all compliance tests.

create_lock
async def create_lock(key: str) -> Any

Create a DistributedLockProtocol implementation under test.

Parameters
ParameterTypeDescription
`key`strUnique resource key for the lock.
Returns
TypeDescription
AnyA fresh DistributedLockProtocol instance.
test_acquire_returns_true
async def test_acquire_returns_true() -> None

acquire() returns True when the lock is free.

test_release_returns_true_when_held
async def test_release_returns_true_when_held() -> None

release() returns True when the lock is currently held.

test_is_held_after_acquire
async def test_is_held_after_acquire() -> None

is_held() returns True after acquire().

test_is_not_held_after_release
async def test_is_not_held_after_release() -> None

is_held() returns False after release().

test_acquire_exclusive
async def test_acquire_exclusive() -> None

Second acquire() on the same key returns False while held.

test_acquire_after_release
async def test_acquire_after_release() -> None

acquire() returns True again after lock is released.

test_context_manager_acquires_and_releases
async def test_context_manager_acquires_and_releases() -> None

Context manager acquires on entry and releases on exit.

test_extend_returns_true_when_held
async def test_extend_returns_true_when_held() -> None

extend() returns True when the lock is currently held.

test_extend_returns_false_when_not_held
async def test_extend_returns_false_when_not_held() -> None

extend() returns False when the lock is not held.

test_acquire_blocking_succeeds
async def test_acquire_blocking_succeeds() -> None

acquire_blocking() returns True when the lock is free.


Reusable test suite for any ``EventBusProtocol`` implementation.

Subclass and implement create_bus and create_event:

.. code-block:: python

class TestMyEventBus(EventBusCompliance):
async def create_bus(self):
return MyEventBus()
def create_event(self):
return MyEvent(id="1")
create_bus
async def create_bus() -> Any

Return a fresh EventBusProtocol instance.

create_event
def create_event() -> Any

Return a new event instance for testing.

test_subscribe_and_publish
async def test_subscribe_and_publish() -> None

Subscribing a handler causes it to be called on publish.

test_unsubscribe_stops_delivery
async def test_unsubscribe_stops_delivery() -> None

Unsubscribing a handler prevents further delivery.

test_multiple_handlers_all_called
async def test_multiple_handlers_all_called() -> None

All subscribers for an event type are invoked.

test_publish_unrelated_event_type_not_delivered
async def test_publish_unrelated_event_type_not_delivered() -> None

A handler subscribed to one type does not receive another type.

test_publish_without_subscribers_is_silent
async def test_publish_without_subscribers_is_silent() -> None

Publishing when there are no subscribers does not raise.


In-memory fake implementing AuditLoggerProtocol and AuditStoreProtocol.

Exposes entries list for test assertions.

Example

fake = FakeAuditLogger()
await fake.log(AuditEntry(action="user.login", actor_id="u-1"))
assert len(fake.entries) == 1
assert fake.entries[0].action == "user.login"
fake = FakeAuditLogger()
await fake.log(AuditEntry(action="user.login", actor_id="u-1"))
assert len(fake.entries) == 1
assert fake.entries[0].action == "user.login"
__init__
def __init__() -> None
log
async def log(entry: AuditEntry) -> None

Implement AuditLoggerProtocol.log.

query
async def query(query: AuditQuery) -> list[AuditEntry]

Implement AuditLoggerProtocol.query — filter entries by query fields.

append
async def append(entry: AuditEntry) -> None

Implement AuditStoreProtocol.append.

count
async def count(query: AuditQuery) -> int

Implement AuditStoreProtocol.count.

clear
def clear() -> None

Clear all entries (test helper).


In-memory fake satisfying ``CacheBackendProtocol`` / ``CacheProtocol``.

Supports TTL expiry using time.monotonic.

Example

cache = FakeCache()
await cache.set("key", "value", ttl=60)
cache.assert_has_key("key")
cache = FakeCache()
await cache.set("key", "value", ttl=60)
cache.assert_has_key("key")
__init__
def __init__() -> None
get
async def get(
    key: str,
    default: Any = None
) -> Any

Return the stored value for key, or default if absent/expired.

set
async def set(
    key: str,
    value: Any,
    ttl: float | None = None
) -> None

Store value under key, optionally expiring after ttl seconds.

delete
async def delete(key: str) -> bool

Delete key; return True if it existed.

clear
async def clear() -> None

Remove all entries.

stored_keys
property stored_keys() -> list[str]

Return a snapshot of currently stored keys.

assert_has_key
def assert_has_key(key: str) -> None

Assert key is present in the cache.

assert_value
def assert_value(
    key: str,
    expected: Any
) -> None

Assert key maps to expected.


Controllable clock for deterministic time-based testing.

Example

clock = FakeClock(datetime(2026, 1, 1, tzinfo=UTC))
assert clock.now().year == 2026
clock.advance(3600)
assert clock.now().hour == 1
clock = FakeClock(datetime(2026, 1, 1, tzinfo=UTC))
assert clock.now().year == 2026
clock.advance(3600)
assert clock.now().hour == 1
__init__
def __init__(now: datetime | None = None) -> None
now
def now() -> datetime

Return the current fake time.

monotonic
def monotonic() -> float

Return a monotonic counter (starts at 0).

time
def time() -> float

Return Unix timestamp of the current fake time.

advance
def advance(seconds: float) -> None

Move time forward by seconds.

freeze
def freeze(at: datetime) -> None

Set the clock to a specific point in time.

tick
def tick() -> None

Advance by exactly 1 second.


Records dispatched commands for test assertions.

Satisfies a CommandBusProtocol interface: register() + dispatch().

Example

bus = FakeCommandBus()
await bus.dispatch(CreateUser(email="a@example.com"))
bus.assert_dispatched(CreateUser, count=1)
bus = FakeCommandBus()
await bus.dispatch(CreateUser(email="a@example.com"))
bus.assert_dispatched(CreateUser, count=1)
__init__
def __init__() -> None
register
def register(
    command_type: type,
    handler: Any
) -> None

Register handler for command_type.

dispatch
async def dispatch(command: Any) -> Any

Record command and invoke any registered handler.

dispatched
property dispatched() -> list[Any]

All dispatched commands.

dispatched_of_type
def dispatched_of_type(command_type: type[T]) -> list[T]

Return dispatched commands of command_type.

assert_dispatched
def assert_dispatched(
    command_type: type[T],
    count: int | None = None
) -> None

Assert command_type was dispatched, optionally count times.

assert_not_dispatched
def assert_not_dispatched(command_type: type[T]) -> None

Assert command_type was NOT dispatched.

clear
def clear() -> None

Reset all recorded dispatches.


In-memory config satisfying a ``ConfigProtocol``-like interface.

Supports dot-notation key access ("database.url") and optional section retrieval with model instantiation.

Example

config = FakeConfig({"database": {"url": "sqlite:///:memory:"}})
assert config.get("database.url") == "sqlite:///:memory:"
config.set("cache.backend", "memory")
config = FakeConfig({"database": {"url": "sqlite:///:memory:"}})
assert config.get("database.url") == "sqlite:///:memory:"
config.set("cache.backend", "memory")
__init__
def __init__(data: dict[str, Any] | None = None) -> None
environment
property environment() -> Any

The active deployment environment (returns test environment).

is_debug
property is_debug() -> bool

Return False for test environments.

is_production
property is_production() -> bool

Return False for test environments.

is_development
property is_development() -> bool

Return whether this is development.

is_testing
property is_testing() -> bool

Return whether this is testing.

is_staging
property is_staging() -> bool

Return whether this is staging.

has_section
def has_section(name: str) -> bool

Check whether a configuration section exists.

get
def get(
    key: str,
    default: Any = None
) -> Any

Retrieve key (dot-separated path) from config, returning default if absent.

get_section
def get_section(
    name: str,
    model_cls: type | None = None
) -> Any

Return the config section name, optionally instantiated as model_cls.

set
def set(
    key: str,
    value: Any
) -> None

Set key (dot-separated path) to value — test-only helper.


Records all published events for test assertions.

Satisfies the EventBusProtocol protocol from lexigram.contracts.events.

Example

bus = FakeEventBus()
await bus.publish(UserCreated(user_id="abc"))
bus.assert_published(UserCreated, count=1, user_id="abc")
bus = FakeEventBus()
await bus.publish(UserCreated(user_id="abc"))
bus.assert_published(UserCreated, count=1, user_id="abc")
__init__
def __init__() -> None
subscribe
def subscribe(
    event_type: type,
    handler: Any,
    priority: int = 0
) -> None

Register handler for event_type.

unsubscribe
def unsubscribe(
    event_type: type,
    handler: Any
) -> None

Remove handler for event_type.

publish
async def publish(event: Any) -> None

Record event and dispatch to any registered handlers.

published
property published() -> list[DomainEvent]

All published events.

published_of_type
def published_of_type(event_type: type[T]) -> list[T]

Return published events matching event_type.

assert_published
def assert_published(
    event_type: type[T],
    count: int | None = None,
    **attrs: Any
) -> None

Assert that event_type was published.

Optionally verify count and attribute values.

assert_not_published
def assert_not_published(event_type: type[T]) -> None

Assert that event_type was NOT published.

assert_published_once
def assert_published_once(
    event_type: type,
    **attrs: Any
) -> None

Assert exactly one event_type was published.

assert_events_in_order
def assert_events_in_order(*event_types: type) -> None

Assert that events were published in the given positional order.

Checks that position i in the published event stream matches event_types[i]. Use assert_published to verify events that may appear anywhere in the stream.

Parameters
ParameterTypeDescription
Raises
ExceptionDescription
AssertionErrorIf any position does not match or the stream is shorter than the expected sequence.

Example

await bus.publish(UserRegistered(user_id="1"))
await bus.publish(EmailSent(user_id="1"))
bus.assert_events_in_order(UserRegistered, EmailSent)
await bus.publish(UserRegistered(user_id="1"))
await bus.publish(EmailSent(user_id="1"))
bus.assert_events_in_order(UserRegistered, EmailSent)
clear
def clear() -> None

Reset the recorded events list.


Captures log entries for test assertions.

Satisfies LoggerProtocol from lexigram.contracts.core.logging.

Example

logger = FakeLogger()
logger.info("user_created", user_id="abc")
logger.assert_logged("info", "user_created")
logger = FakeLogger()
logger.info("user_created", user_id="abc")
logger.assert_logged("info", "user_created")
__init__
def __init__(bound_context: dict[str, Any] | None = None) -> None
debug
def debug(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture a DEBUG-level log entry.

info
def info(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture an INFO-level log entry.

warning
def warning(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture a WARNING-level log entry.

error
def error(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture an ERROR-level log entry.

critical
def critical(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture a CRITICAL-level log entry.

exception
def exception(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture an EXCEPTION-level log entry.

bind
def bind(**kwargs: Any) -> FakeLogger

Return a new FakeLogger sharing entries with merged context.

unbind
def unbind(*keys: str) -> FakeLogger

Return a new FakeLogger with specified context keys removed.

entries
property entries() -> list[LogEntry]

All captured log entries.

assert_logged
def assert_logged(
    level: str,
    msg_contains: str
) -> None

Assert that a log entry with level containing msg_contains exists.

assert_not_logged
def assert_not_logged(
    level: str,
    msg_contains: str | None = None
) -> None

Assert no matching log entry exists.

clear
def clear() -> None

Reset all captured entries.


Records counter, gauge, and histogram observations for test assertions.

Example

metrics = FakeMetricsCollector()
metrics.increment("requests.total")
metrics.assert_counter_incremented("requests.total")
metrics = FakeMetricsCollector()
metrics.increment("requests.total")
metrics.assert_counter_incremented("requests.total")
__init__
def __init__() -> None
increment
def increment(
    name: str,
    value: float = 1.0,
    tags: dict[str, str] | None = None
) -> None

Increment counter name by value.

gauge
def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Record gauge name = value.

histogram
def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Record a histogram observation for name.

counter
def counter(name: str) -> float

Return the current value of counter name (0 if never incremented).

assert_counter
def assert_counter(
    name: str,
    expected: float
) -> None

Assert counter name equals expected.

assert_counter_incremented
def assert_counter_incremented(name: str) -> None

Assert counter name was incremented at least once.

assert_gauge
def assert_gauge(
    name: str,
    expected: float
) -> None

Assert gauge name equals expected.

clear
def clear() -> None

Reset all recorded observations.


Records executed queries and returns configured canned results.

Example

bus = FakeQueryBus()
bus.when(GetUser, return_value=user)
result = await bus.execute(GetUser(user_id="123"))
assert result == user
bus = FakeQueryBus()
bus.when(GetUser, return_value=user)
result = await bus.execute(GetUser(user_id="123"))
assert result == user
__init__
def __init__() -> None
register
def register(
    query_type: type,
    handler: Any
) -> None

Register handler for query_type.

when
def when(
    query_type: type,
    return_value: Any
) -> None

Configure a canned response to return for query_type.

execute
async def execute(query: Any) -> Any

Record and execute query, returning canned results first.

executed
property executed() -> list[Any]

All executed queries.

executed_of_type
def executed_of_type(query_type: type[T]) -> list[T]

Return executed queries of query_type.

clear
def clear() -> None

Reset all recorded executions.


Minimal fake span for testing distributed tracing.

Attributes: name: The span operation name. context: Opaque context object (trace_id, span_id pair). parent_context: Parent context if this span is linked to another trace. attributes: Span attributes dictionary. events: List of span events. status: Current span status.

__init__
def __init__(
    name: str,
    context: tuple[str, str],
    parent_context: tuple[str, str] | None = None
) -> None
set_attribute
def set_attribute(
    key: str,
    value: Any
) -> None

Set a span attribute.

add_event
def add_event(
    name: str,
    attributes: dict[str, Any] | None = None
) -> None

Add an event to the span.

record_exception
def record_exception(exception: Exception) -> None

Record an exception on the span.

set_status
def set_status(status: str) -> None

Set the span status.


In-memory fake satisfying ``StateStoreProtocol`` protocol.

Example

store = FakeStateStore()
await store.set("session:abc", {"user_id": "u1"})
value = await store.get("session:abc")
store = FakeStateStore()
await store.set("session:abc", {"user_id": "u1"})
value = await store.get("session:abc")
__init__
def __init__() -> None
get
async def get(key: str) -> Any | None

Return the value for key, or None if absent.

set
async def set(
    key: str,
    value: Any
) -> None

Store value under key.

delete
async def delete(key: str) -> bool

Delete key; return True if it existed.

exists
async def exists(key: str) -> bool

Return True if key has a stored value.

clear
def clear() -> None

Remove all stored entries.


Minimal fake tracer implementing TracerProtocol.

Generates W3C traceparent headers and tracks created spans for test inspection.

Attributes: spans: List of all spans created by this tracer (for test inspection).

__init__
def __init__() -> None
start_span
def start_span(
    name: str,
    attributes: dict[str, Any] | None = None,
    context: tuple[str, str] | None = None
) -> FakeSpan

Start a new span with optional parent context.

Parameters
ParameterTypeDescription
`name`strSpan name.
`attributes`dict[str, Any] | NoneOptional span attributes.
`context`tuple[str, str] | NoneOptional parent context (trace_id, span_id). If provided, this span continues the same trace with a new span_id.
Returns
TypeDescription
FakeSpanNew FakeSpan instance.
get_current_span
def get_current_span() -> FakeSpan | None

Get the currently active span.

inject_context
def inject_context(
    carrier: dict[str, str],
    context: tuple[str, str] | None = None
) -> None

Inject trace context into a carrier dict (W3C traceparent format).

Parameters
ParameterTypeDescription
`carrier`dict[str, str]Mutable dict that will receive the trace headers.
`context`tuple[str, str] | NoneOptional context tuple (trace_id, span_id). If None, uses current span.
extract_context
def extract_context(carrier: dict[str, str]) -> tuple[str, str] | None

Extract trace context from a carrier dict (W3C traceparent).

Parameters
ParameterTypeDescription
`carrier`dict[str, str]Dict containing trace headers.
Returns
TypeDescription
tuple[str, str] | NoneContext tuple (trace_id, span_id) or None if no valid context found.

Tracks entity changes and events without persistence.

Optionally integrates with FakeEventBus to dispatch collected events on commit.

Example

async with FakeUnitOfWork() as uow:
uow.register_new(user)
uow.register_event(UserCreated(user_id=user.user_id))
assert uow.committed
async with FakeUnitOfWork() as uow:
uow.register_new(user)
uow.register_event(UserCreated(user_id=user.user_id))
assert uow.committed
__init__
def __init__(event_bus: FakeEventBus | None = None) -> None
register_new
def register_new(entity: Any) -> None

Mark entity as newly created.

register_dirty
def register_dirty(entity: Any) -> None

Mark entity as modified.

register_deleted
def register_deleted(entity: Any) -> None

Mark entity as deleted.

register_event
def register_event(event: Any) -> None

Queue event for dispatch on commit.

collect_events
def collect_events() -> list[Any]

Return all queued events without consuming them.

commit
async def commit() -> None

Mark as committed and optionally publish queued events.

rollback
async def rollback() -> None

Mark as rolled back and clear all tracked changes.


Deterministic clock for tests.
__init__
def __init__(initial: datetime | None = None) -> None
set
def set(dt: datetime) -> None

Set the clock to a specific time.

freeze
def freeze(dt: datetime) -> None

Backward-compatible alias for set.

advance
def advance(delta: float | timedelta | Duration) -> None

Advance the clock by a duration.

tick
def tick() -> None

Advance the clock by one second.

now
def now() -> datetime

Return the current frozen time.

monotonic
def monotonic() -> float

Return elapsed seconds since the clock was created or set.

timestamp
def timestamp() -> float

Return the Unix timestamp for the current frozen time.

time
def time() -> float

Backward-compatible alias for timestamp.


Reusable compliance suite for any ``FlagProvider`` implementation.

Subclass and implement create_provider. Override enabled_flag_name / disabled_flag_name to specify which flags the provider will return for enabled/disabled test cases.

The provider created by create_provider must have at least:

  • one flag named enabled_flag_name that evaluates to True
  • one flag named disabled_flag_name that evaluates to False
create_provider
async def create_provider() -> Any

Return a fully initialised FlagProvider for testing.

enabled_flag_name
def enabled_flag_name() -> str

Name of a flag that the provider reports as enabled.

disabled_flag_name
def disabled_flag_name() -> str

Name of a flag that the provider reports as disabled.

unknown_flag_name
def unknown_flag_name() -> str

Name of a flag that does not exist in the provider.

test_get_flag_found
async def test_get_flag_found() -> None

get_flag returns a bool for a flag that exists in the provider.

test_get_flag_not_found
async def test_get_flag_not_found() -> None

get_flag on an unknown flag returns the default (or raises FlagNotFoundError).

test_evaluate_flag_enabled
async def test_evaluate_flag_enabled() -> None

An enabled flag evaluates to True.

test_evaluate_flag_disabled
async def test_evaluate_flag_disabled() -> None

A disabled flag evaluates to False.

test_async_evaluate
async def test_async_evaluate() -> None

Async evaluation via get_flag is awaitable and returns a bool.

test_get_flag_respects_default
async def test_get_flag_respects_default() -> None

default parameter is honoured when the flag is absent.


A TestEnvironment pre-configured for integration tests.

Inherits the full TestEnvironment API (use_provider, fake, override, resolve, async context-manager) and adds:

  • with_database — boots a real database provider (defaults to SQLite in-memory so tests don’t need an external process).
  • with_cache — boots a real cache provider (defaults to the in-memory cache backend so tests can run without Redis).
  • with_all — combines database + cache in one call.
  • Config override shortcut via config= constructor argument.

None of the factory methods import the provider packages at class definition time — they use lazy imports so the test can collect and skip cleanly when a package isn’t installed.

__init__
def __init__(
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> None
with_database
def with_database(
    cls,
    url: str = 'sqlite+aiosqlite:///:memory:',
    *,
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> IntegrationEnvironment

Return an environment wired with a real database provider.

Parameters
ParameterTypeDescription
`url`strDatabase URL. Defaults to an in-memory SQLite database so tests run without any external process.
`name`strEnvironment name shown in log output.
`config`dict[str, Any] | NoneAdditional config overrides (merged with the database URL).
Returns
TypeDescription
IntegrationEnvironmentAn ``IntegrationEnvironment`` with the database provider registered.
Raises
ExceptionDescription
ImportErrorIf ``lexigram-sql`` is not installed.
with_cache
def with_cache(
    cls,
    backend: str = 'memory',
    *,
    url: str | None = None,
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> IntegrationEnvironment

Return an environment wired with a real cache provider.

Parameters
ParameterTypeDescription
`backend`strBackend type — ``"memory"`` (default), ``"redis"``, or ``"memcached"``.
`url`str | NoneBackend connection URL (required for Redis / Memcached; omit for the in-memory backend).
`name`strEnvironment name.
`config`dict[str, Any] | NoneAdditional config overrides.
Returns
TypeDescription
IntegrationEnvironmentAn ``IntegrationEnvironment`` with the cache provider registered.
Raises
ExceptionDescription
ImportErrorIf ``lexigram-cache`` is not installed.
with_all
def with_all(
    cls,
    *,
    database_url: str = 'sqlite+aiosqlite:///:memory:',
    cache_backend: str = 'memory',
    cache_url: str | None = None,
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> IntegrationEnvironment

Return an environment with both database and cache providers.

This is a convenience factory combining with_database and with_cache. Both providers are registered before the environment is returned.

Parameters
ParameterTypeDescription
`database_url`strPassed to with_database.
`cache_backend`strPassed to with_cache.
`cache_url`str | NonePassed to with_cache.
`name`strEnvironment name.
`config`dict[str, Any] | NoneAdditional config overrides.
Raises
ExceptionDescription
ImportErrorIf any required extension package is missing.
setup
async def setup() -> Any

Set up the environment, injecting FakeConfig for config overrides.

If config was passed to the constructor, it is registered as an override before the application boots so that it lands in the container before it is frozen. The override is keyed by ConfigProtocol when available, falling back to the FakeConfig class itself.


Standardized DI container for isolated component testing.

This container automatically registers common mock providers and mock component implementations (LockStore, PubSubProtocol, etc.) to provide an stable baseline for unit and integration tests.

Usage

container = LexigramContainerHarness()
# Common mocks are already registered
# Add your concrete provider for testing
container.register(MyProvider())
# Override specific mocks if needed
container.singleton(DatabaseProviderProtocol, MockDB())
service = await container.resolve(MyService)
container = LexigramContainerHarness()
# Common mocks are already registered
# Add your concrete provider for testing
container.register(MyProvider())
# Override specific mocks if needed
container.singleton(DatabaseProviderProtocol, MockDB())
service = await container.resolve(MyService)
__init__
def __init__(register_mocks: bool = True) -> None
override
def override(
    interface: type[T],
    implementation: Any
) -> Any

Context manager to temporarily override a dependency.

Example

mock
def mock(
    interface: type[T],
    mock_obj: Any
) -> None

Register a mock implementation directly as a singleton.

try_resolve
async def try_resolve(interface: type[T]) -> Result[T, Exception]

Resolve interface from the container, wrapping the outcome in Result.

This is the Result-returning counterpart to resolve. Use it when you need to assert that a dependency fails to resolve — for example to verify that a required binding is absent or mis-configured.

Returns
TypeDescription
Result[T, Exception]``Ok(instance)`` on success, ``Err(exception)`` if resolution raises for any reason (missing binding, circular dependency, etc.).

Example

result = await container.try_resolve(MyService)
assert result.is_err()
assert isinstance(result.unwrap_err(), ResolutionError)
result = await container.try_resolve(MyService)
assert result.is_err()
assert isinstance(result.unwrap_err(), ResolutionError)
bind
def bind(
    protocol: type[T],
    implementation: Any
) -> None

Register implementation as protocol with runtime protocol validation.

If protocol is decorated with @runtime_checkable, this method verifies that implementation is an instance of protocol before registering it. Non-runtime-checkable protocols are accepted as-is (mypy / pyright catch mismatches at type-check time).

Parameters
ParameterTypeDescription
`protocol`type[T]The contract / protocol type to register.
`implementation`AnyThe concrete object implementing *protocol*.
Raises
ExceptionDescription
TypeErrorWhen *protocol* is ``@runtime_checkable`` and *implementation* does not satisfy ``isinstance`` check.

Example

container.bind(CacheBackendProtocol, FakeCache())
container.bind(CacheBackendProtocol, FakeCache())

A single captured log entry.

Reusable compliance suite for any ``MiddlewareProtocol`` implementation.

Subclass and implement create_middleware. All tests exercise the async __call__(context, next) contract defined by MiddlewareProtocol.

The before / after / error terminology maps to the three observable phases of a __call__-style middleware:

  • before — code that runs prior to invoking next
  • after — code that runs after next returns
  • error — behaviour when the downstream next raises an exception
create_middleware
async def create_middleware() -> Any

Return a fresh instance of the middleware under test.

make_context
def make_context() -> dict[str, Any]

Return a minimal request context suitable for the middleware.

test_before_called
async def test_before_called() -> None

Middleware is invoked before the downstream handler is called.

Verified by asserting that __call__ is awaitable and executes without error when a valid context and next are provided.

test_after_called
async def test_after_called() -> None

Middleware receives and can observe the result from the downstream handler.

The result returned by __call__ must equal the value returned by the inner next handler (pass-through behaviour is the minimum requirement; middleware may wrap or transform but must not discard).

test_error_flow
async def test_error_flow() -> None

An exception raised by the downstream handler propagates through the middleware.

A middleware that does not explicitly handle errors must let the exception bubble up. If the middleware catches and re-raises the exception that is also acceptable, provided the original exception type is preserved (or wrapped in a framework-specific exception).

test_context_passed_to_next
async def test_context_passed_to_next() -> None

The context object passed to next is the same (or an enriched) version.

The middleware must not silently drop the context before forwarding it to the inner handler.


Compliance suite for notification channel implementations.

Subclass and implement create_channel and make_message to run all compliance tests against any SMSChannelProtocol or PushChannelProtocol implementation.

create_channel
async def create_channel() -> Any

Create the channel implementation under test.

Returns
TypeDescription
AnyA fresh instance implementing ``SMSChannelProtocol`` or ``PushChannelProtocol``.
make_message
async def make_message() -> Any

Create an appropriate message for the channel under test.

Returns
TypeDescription
AnyAn ``SMSMessage`` or ``PushMessage`` ready for delivery.
test_send_returns_ok_on_success
async def test_send_returns_ok_on_success() -> None

send() returns an Ok result on successful acceptance.

test_send_returns_delivery_receipt
async def test_send_returns_delivery_receipt() -> None

send() Ok result contains a receipt with a non-empty message_id.

test_health_check_returns_result
async def test_health_check_returns_result() -> None

health_check() returns a HealthCheckResult instance.

test_health_check_has_status
async def test_health_check_has_status() -> None

health_check() status is HEALTHY or DEGRADED.

test_health_check_custom_timeout_accepted
async def test_health_check_custom_timeout_accepted() -> None

health_check() accepts an explicit timeout without raising.

test_health_check_has_component_name
async def test_health_check_has_component_name() -> None

health_check() result includes a non-empty component name.


Compliance suite for queue backend implementations.

Subclass and implement create_backend() to run all compliance tests.

create_backend
async def create_backend(queue_name: str = 'test-queue') -> Any

Create the queue backend implementation under test.

Parameters
ParameterTypeDescription
`queue_name`strName of the queue to use for testing.
Returns
TypeDescription
AnyA fresh queue backend instance.
test_enqueue_returns_message_id
async def test_enqueue_returns_message_id() -> None

enqueue() returns a message ID string.

test_dequeue_returns_enqueued_message
async def test_dequeue_returns_enqueued_message() -> None

dequeue() returns the message that was enqueued.

test_dequeue_empty_returns_none
async def test_dequeue_empty_returns_none() -> None

dequeue() returns None when no messages are available.

test_ack_removes_message
async def test_ack_removes_message() -> None

ack() completes without error for an in-flight message.

test_nack_requeues_message
async def test_nack_requeues_message() -> None

nack() with requeue=True makes the message available again.

test_nack_discards_message
async def test_nack_discards_message() -> None

nack() with requeue=False permanently discards the message.


Reusable test suite for any ``RepositoryProtocol[T]`` implementation.

Verifies that the repository satisfies the standard persistence contract: save, get, delete, list.

Subclass and implement create_repository and create_entity:

.. code-block:: python

class TestInMemoryUserRepo(RepositoryCompliance[User]):
async def create_repository(self):
return InMemoryUserRepository()
def create_entity(self, **overrides):
return User(id=str(uuid4()), name="Alice")
create_repository
async def create_repository() -> Any

Return a fresh, empty repository instance.

create_entity
def create_entity(**overrides: Any) -> T

Return a new entity instance suitable for persistence.

test_save_and_get
async def test_save_and_get() -> None

save then get returns the entity.


Reusable test suite for any ``SearchEngineProtocol`` implementation.

Subclass and implement create_engine:

.. code-block:: python

class TestMySearch(SearchEngineCompliance):
async def create_engine(self):
return MySearchEngine(url="http://localhost:9200")
create_engine
async def create_engine() -> Any

Return a ready-to-use search engine under test.

test_create_and_delete_index
async def test_create_and_delete_index() -> None

create_index and delete_index do not raise errors.

test_index_single_document
async def test_index_single_document() -> None

index_document succeeds and the document is searchable.

test_index_many_documents
async def test_index_many_documents() -> None

index_many successfully indexes multiple documents.

test_delete_document
async def test_delete_document() -> None

delete_document removes an indexed document without error.

test_search_returns_results_object
async def test_search_returns_results_object() -> None

search returns an object with a hits or results attribute.

test_health_check_returns_result
async def test_health_check_returns_result() -> None

health_check returns a HealthCheckResult.


Async probes for common external services.

Each method attempts a real network connection and returns True if the service is reachable, False otherwise. All probes suppress all exceptions — they are purely availability indicators, not functional tests.

Example

@pytest.fixture
async def redis_client():
if not await ServiceProbe.check_redis():
pytest.skip("Redis not available")
# ... connect and return client
@pytest.fixture
async def redis_client():
if not await ServiceProbe.check_redis():
pytest.skip("Redis not available")
# ... connect and return client
check_redis
async def check_redis(url: str = 'redis://localhost:6379') -> bool

Return True if a Redis server is reachable at url.

check_postgres
async def check_postgres(dsn: str = 'postgresql://localhost/test') -> bool

Return True if a PostgreSQL server is reachable at dsn.

check_elasticsearch
async def check_elasticsearch(url: str = 'http://localhost:9200') -> bool

Return True if an Elasticsearch HTTP endpoint is reachable.

check_rabbitmq
async def check_rabbitmq(url: str = 'amqp://guest:guest@localhost:5672/') -> bool

Return True if a RabbitMQ AMQP broker is reachable.

check_meilisearch
async def check_meilisearch(url: str = 'http://localhost:7700') -> bool

Return True if a Meilisearch HTTP endpoint is reachable.

check_smtp
async def check_smtp(
    host: str = 'localhost',
    port: int = 25
) -> bool

Return True if an SMTP server is accepting connections.

check_kafka
async def check_kafka(bootstrap: str = 'localhost:19092') -> bool

Return True if a Kafka broker is reachable.

Parameters
ParameterTypeDescription
`bootstrap`strKafka bootstrap server address.
Returns
TypeDescription
boolTrue if a broker connection succeeds.
check_minio
async def check_minio(endpoint: str = 'localhost:19000') -> bool

Return True if a MinIO endpoint is reachable.

Parameters
ParameterTypeDescription
`endpoint`strMinIO endpoint in host:port format.
Returns
TypeDescription
boolTrue if the HTTP endpoint responds.
check_mongodb
async def check_mongodb(dsn: str = 'mongodb://localhost:17017') -> bool

Return True if a MongoDB server is reachable.

Parameters
ParameterTypeDescription
`dsn`strMongoDB connection string.
Returns
TypeDescription
boolTrue if the server responds to a ping.
check_qdrant
async def check_qdrant(url: str = 'http://localhost:16333') -> bool

Return True if a Qdrant vector store is reachable.

Parameters
ParameterTypeDescription
`url`strQdrant HTTP URL.
Returns
TypeDescription
boolTrue if the healthz endpoint responds with 200.
check_neo4j
async def check_neo4j(url: str = 'bolt://localhost:17687') -> bool

Return True if a Neo4j graph database is reachable.

Parameters
ParameterTypeDescription
`url`strNeo4j bolt URL.
Returns
TypeDescription
boolTrue if a TCP connection to the bolt port succeeds.

Assert that a value matches a stored snapshot.

On first run (no snapshot file exists), the current value is persisted to disk and the test is skipped via pytest.skip. On subsequent runs the value is compared to the stored snapshot and the test fails if they differ.

Parameters
ParameterTypeDescription
`snapshot_dir`Directory where snapshot files are stored. Defaults to a ``__snapshots__`` subdirectory of the current working directory.

Example

asserter = SnapshotAsserter(Path("tests/__snapshots__"))
asserter.assert_match("create_user_response", response_body)
asserter = SnapshotAsserter(Path("tests/__snapshots__"))
asserter.assert_match("create_user_response", response_body)
__init__
def __init__(snapshot_dir: Path | None = None) -> None
assert_match
def assert_match(
    name: str,
    value: Any
) -> None

Assert value matches the stored snapshot for name.

Creates the snapshot and skips the test on first call.

Parameters
ParameterTypeDescription
`name`strUnique snapshot identifier (used as the filename stem).
`value`AnyThe value to compare. Must be JSON-serialisable.
Raises
ExceptionDescription
SnapshotMismatchErrorIf the value does not match the stored snapshot.
update
def update(
    name: str,
    value: Any
) -> Path

Persist value as the stored snapshot for name.

Parameters
ParameterTypeDescription
`name`strUnique snapshot identifier.
`value`AnyThe value to store.
Returns
TypeDescription
PathPath to the written snapshot file.
delete
def delete(name: str) -> bool

Delete the snapshot file for name.

Returns
TypeDescription
bool``True`` if the file existed and was deleted; ``False`` otherwise.
exists
def exists(name: str) -> bool

Return True if a snapshot for name exists on disk.


Reusable test suite for any ``TaskQueueProtocol`` implementation.

Subclass and implement create_queue:

.. code-block:: python

class TestMyQueue(TaskQueueCompliance):
async def create_queue(self):
return MemoryTaskQueue()
create_queue
async def create_queue() -> Any

Return a fresh, empty instance of the queue under test.

test_enqueue_returns_task_id
async def test_enqueue_returns_task_id() -> None

enqueue returns the task id.

test_dequeue_returns_enqueued_task
async def test_dequeue_returns_enqueued_task() -> None

dequeue returns the task that was enqueued.

test_dequeue_empty_returns_none
async def test_dequeue_empty_returns_none() -> None

dequeue on an empty queue returns None.

test_ack_does_not_raise
async def test_ack_does_not_raise() -> None

ack on a known in-flight task completes without error.

test_ack_unknown_id_is_noop
async def test_ack_unknown_id_is_noop() -> None

ack on an unknown task id is a safe no-op.

test_nack_requeue_returns_task_to_queue
async def test_nack_requeue_returns_task_to_queue() -> None

nack with requeue=True makes the task available for dequeue again.

test_nack_discard_does_not_requeue
async def test_nack_discard_does_not_requeue() -> None

nack with requeue=False permanently discards the task.

test_nack_unknown_id_is_noop
async def test_nack_unknown_id_is_noop() -> None

nack on an unknown task id is a safe no-op.

test_get_task_count_reflects_pending
async def test_get_task_count_reflects_pending() -> None

get_task_count returns the number of tasks waiting to be dequeued.

test_clear_empties_the_queue
async def test_clear_empties_the_queue() -> None

clear removes all pending tasks.


Test environment pre-wired with mock task queue and executor.

Unlike the generic TestEnvironment, TaskTestBed overrides setup and teardown to manage the mock components directly without starting a full Application.

Attributes: mock_queue: In-memory MockTaskQueue instance created during setup. mock_executor: MockTaskExecutor instance created during setup.

Example

await bed.teardown()
```python
__init__
def __init__(config: Any = None) -> None
setup
async def setup() -> None

Initialise mock components and pre-populate the queue.

Creates a fresh MockTaskQueue and MockTaskExecutor, then enqueues the three sample tasks returned by sample_tasks. Does not start an Application or DI container.

teardown
async def teardown() -> None

Clear the mock queue and reset state.

Safe to call even if setup was never called.

get_enqueued_tasks
def get_enqueued_tasks() -> list[Any]

Return all tasks currently waiting in the mock queue.

Returns
TypeDescription
list[Any]A list of JobProtocol objects, in enqueue order.
Raises
ExceptionDescription
RuntimeErrorIf called before setup.

High-level task-system test helper.

Wraps a TaskTestBed and provides convenience methods for testing the task lifecycle without a running application or external queue backend.

Attributes: test_bed: The underlying TaskTestBed that owns the mocks. provider: The currently active MockTasksProvider, or None when no provider is running.

Example

await bed.teardown()
```python
__init__
def __init__(test_bed: TaskTestBed) -> None
start_provider
async def start_provider() -> MockTasksProvider

Create and start a MockTasksProvider bound to the test bed.

Sets self.provider and returns the new provider so callers can assert on its identity.

Returns
TypeDescription
MockTasksProviderThe newly created MockTasksProvider.
stop_provider
async def stop_provider() -> None

Shut down the active provider and clear self.provider.

No-op if no provider is running.

enqueue_test_tasks
async def enqueue_test_tasks() -> list[str]

Enqueue the three standard sample tasks into the mock queue.

Creates JobProtocol instances from sample_tasks and enqueues them.

Returns
TypeDescription
list[str]A list of three task-id strings in enqueue order.
execute_test_task
async def execute_test_task(task: Any) -> MockTaskResult[dict[str, Any]]

Execute task via the mock executor.

Parameters
ParameterTypeDescription
`task`AnyA JobProtocol (or any object compatible with MockTaskExecutor).
Returns
TypeDescription
MockTaskResult[dict[str, Any]]A MockTaskResult describing the outcome.
task_context
async def task_context() -> AsyncGenerator[MockTasksProvider, None]

Async context manager that starts a provider and stops it on exit.

Yields
TypeDescription
AsyncGenerator[MockTasksProvider, None]The active MockTasksProvider.

Example

async with client.task_context() as provider:
assert provider is not None
assert client.provider is None

Custom assertion helpers for testing.
assert_eventually_true
def assert_eventually_true(
    condition_func: Callable[[], bool],
    timeout: float = 5.0,
    message: str = 'Condition never became true'
) -> None
assert_dict_contains_subset
def assert_dict_contains_subset(
    subset: dict[str, Any],
    superset: dict[str, Any],
    message: str | None = None
) -> None
assert_async_raises
async def assert_async_raises(
    exc_class: type[Exception],
    coro: Awaitable[Any],
    message: str | None = None
) -> None
assert_metrics_contain
def assert_metrics_contain(
    metrics: dict[str, Any],
    key: str,
    min_value: float | None = None
) -> None

Factory for generating test data.
create_user
def create_user(
    user_id: str | None = None,
    **kwargs: Any
) -> dict[str, Any]
create_task
def create_task(
    task_id: str | None = None,
    **kwargs: Any
) -> dict[str, Any]
create_message
def create_message(
    topic: str = 'test',
    **kwargs: Any
) -> dict[str, Any]
create_request
def create_request(
    method: str = 'GET',
    path: str = '/',
    **kwargs: Any
) -> dict[str, Any]

Pre-wired container environment for unit tests.

Instantiates all in-memory fakes and registers them as singletons in a fresh Container. Access the fakes directly via attributes or resolve them from container.

Parameters
ParameterTypeDescription
`extra_registrations`Optional callable that accepts the Container for additional singleton registrations before setup completes.

Example

env = TestEnvironment()
await env.setup()
assert env.event_bus is not None
env.teardown()
env = TestEnvironment()
await env.setup()
assert env.event_bus is not None
env.teardown()
__init__
def __init__(extra_registrations: Any = None) -> None
setup
async def setup() -> TestEnvironment

Instantiate fakes and register them with the container.

Returns
TypeDescription
TestEnvironmentSelf, to allow ``env = await TestEnvironment().setup()``.
make_repository
def make_repository() -> InMemoryRepository

Create a fresh InMemoryRepository.

make_uow
def make_uow() -> InMemoryUnitOfWork

Create a fresh InMemoryUnitOfWork.

teardown
def teardown() -> None

Reset all fakes to their initial state.

Note: audit logger entries are cleared synchronously via the internal list. Call audit_logger.clear (async) for thread-safe clearing.


Testing utilities and fixtures module.

Provides test doubles, container overrides, and lifecycle trackers for use in tests.

Usage

@module(imports=[TestingModule.configure()])
class TestAppModule(Module):
pass
@module(imports=[TestingModule.configure()])
class TestAppModule(Module):
pass
configure
def configure(
    cls,
    **kwargs: Any
) -> DynamicModule

Create a TestingModule for use in test suites.

Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Compliance suite for VectorStoreProtocol implementations.

Subclass and implement create_store() to run all compliance tests. The store is connected before tests and disconnected after.

create_store
async def create_store() -> Any

Create a connected VectorStoreProtocol implementation under test.

Returns
TypeDescription
AnyA connected VectorStoreProtocol instance.
test_health_check_passes
async def test_health_check_passes() -> None

health_check() returns a healthy result.

test_list_collections_returns_list
async def test_list_collections_returns_list() -> None

list_collections() returns a list.

test_create_and_check_collection
async def test_create_and_check_collection() -> None

create_collection() creates a collection visible via collection_exists().

test_delete_collection_removes_it
async def test_delete_collection_removes_it() -> None

delete_collection() removes the collection.

test_collection_not_exists_for_unknown
async def test_collection_not_exists_for_unknown() -> None

collection_exists() returns False for a non-existent collection.

test_upsert_and_count
async def test_upsert_and_count() -> None

upsert() increases count in the collection.

test_search_returns_results
async def test_search_returns_results() -> None

search() returns results after upsert.

test_get_by_ids
async def test_get_by_ids() -> None

get() retrieves vectors by their IDs.

test_delete_by_ids
async def test_delete_by_ids() -> None

delete() removes vectors by ID.


TestBed specialized for web applications.

Extends TestEnvironment to add web-specific functionality. Accepts a WebProvider or an Application and exposes a fully functional HTTP client for making requests against the mounted ASGI app.

After setup, the following attributes are available:

  • bed.client: A ~WebTestClient backed by the real ASGI app.
  • bed.web_provider: The WebProvider instance.
  • bed.container: The DI container (for overrides and resolution).
Parameters
ParameterTypeDescription
`provider_or_app`A WebProvider instance or a Lexigram Application.
`name`Name of the test environment (defaults to "web-test-bed").
`raise_server_exceptions`Whether the test client re-raises server errors. Defaults to True. Set to False to inspect 5xx responses as assertions.
__init__
def __init__(
    provider_or_app: WebProvider | Application,
    name: str = 'web-test-bed',
    raise_server_exceptions: bool = True
) -> None
setup
async def setup() -> Any

Boot the application and wire up the test client.

override
def override(
    interface: type,
    implementation: Any
) -> WebTestBed

Override a DI registration for testing (fluent interface).

Must be called before setup. Returns self for chaining.

get
def get(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a GET request via the test client.

post
def post(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a POST request via the test client.

put
def put(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a PUT request via the test client.

delete
def delete(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a DELETE request via the test client.

patch
def patch(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a PATCH request via the test client.


Test client for testing web applications.

Wraps Starlette’s TestClient to provide a seamless testing experience for Lexigram web applications. It automatically extracts the underlying ASGI application from a Lexigram Application instance and provides assertion-aware TestResponse objects.

Note This class was renamed from TestClient to avoid pytest collecting it as a test class.

__init__
def __init__(
    app: Any,
    base_url: str = 'http://testserver',
    raise_server_exceptions: bool = True,
    **kwargs: Any
) -> None

Initialize the WebTestClient.

Parameters
ParameterTypeDescription
`app`AnyA Lexigram Application, WebProvider, or raw ASGI application.
`base_url`strThe base URL for requests.
`raise_server_exceptions`boolWhether to raise exceptions that occur in the ASGI app. **kwargs: Additional arguments passed to the underlying Starlette TestClient.
as_user
def as_user(user: Any) -> WebTestClient

Simulate an authenticated user for subsequent requests.

Parameters
ParameterTypeDescription
`user`AnyThe user object (must have an 'id' attribute or be convertible to str).
Returns
TypeDescription
WebTestClientself for chaining.
get
def get(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a GET request.

options
def options(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform an OPTIONS request.

head
def head(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a HEAD request.

post
def post(
    url: str,
    data: Any = None,
    json: Any = None,
    **kwargs: Any
) -> TestResponse

Perform a POST request.

put
def put(
    url: str,
    data: Any = None,
    json: Any = None,
    **kwargs: Any
) -> TestResponse

Perform a PUT request.

patch
def patch(
    url: str,
    data: Any = None,
    json: Any = None,
    **kwargs: Any
) -> TestResponse

Perform a PATCH request.

delete
def delete(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a DELETE request.

request
def request(
    method: str,
    url: str,
    headers: dict[str, str] | None = None,
    **kwargs: Any
) -> TestResponse

Perform a generic HTTP request.

websocket_connect
def websocket_connect(
    url: str,
    subprotocols: Any = None,
    **kwargs: Any
) -> Any

Perform a WebSocket connection.


Compliance suite for ``WebhookDeliveryStoreProtocol`` implementations.

Subclass and implement create_store to run all compliance tests.

create_store
async def create_store() -> Any

Create the delivery store implementation under test.

Returns
TypeDescription
AnyA fresh instance implementing ``WebhookDeliveryStoreProtocol``.
test_record_attempt_does_not_raise
async def test_record_attempt_does_not_raise() -> None

record_attempt() completes without raising for a valid attempt.

test_get_attempts_returns_recorded
async def test_get_attempts_returns_recorded() -> None

get_attempts() returns attempts that were recorded.

test_get_attempts_filters_by_subscription
async def test_get_attempts_filters_by_subscription() -> None

get_attempts() filters results to the requested subscription_id.

test_get_attempts_returns_empty_list_for_unknown
async def test_get_attempts_returns_empty_list_for_unknown() -> None

get_attempts() returns an empty list for an unknown subscription.

test_count_recent_failures_counts_failed_attempts
async def test_count_recent_failures_counts_failed_attempts() -> None

count_recent_failures() counts FAILED attempts since a given time.

test_count_recent_failures_returns_zero_for_unknown
async def test_count_recent_failures_returns_zero_for_unknown() -> None

count_recent_failures() returns 0 for a subscription with no failures.

test_count_recent_failures_ignores_delivered
async def test_count_recent_failures_ignores_delivered() -> None

count_recent_failures() does not count DELIVERED attempts.

test_get_dead_letters_returns_dead_lettered_attempts
async def test_get_dead_letters_returns_dead_lettered_attempts() -> None

get_dead_letters() returns attempts in DEAD_LETTER status.

test_get_dead_letters_excludes_delivered
async def test_get_dead_letters_excludes_delivered() -> None

get_dead_letters() does not return DELIVERED attempts.

test_get_dead_letters_returns_empty_list_when_none
async def test_get_dead_letters_returns_empty_list_when_none() -> None

get_dead_letters() returns an empty list when no dead letters exist.


Compliance suite for ``WebhookSubscriptionStoreProtocol`` implementations.

Subclass and implement create_store to run all compliance tests.

create_store
async def create_store() -> Any

Create the subscription store implementation under test.

Returns
TypeDescription
AnyA fresh instance implementing ``WebhookSubscriptionStoreProtocol``.
test_create_and_get_round_trip
async def test_create_and_get_round_trip() -> None

create() persists a subscription retrievable by get().

test_get_missing_returns_none
async def test_get_missing_returns_none() -> None

get() returns None for an unknown subscription_id.

test_list_includes_created_subscription
async def test_list_includes_created_subscription() -> None

list() returns subscriptions that were created.

test_list_active_only_excludes_inactive
async def test_list_active_only_excludes_inactive() -> None

list(active_only=True) excludes inactive subscriptions.

test_list_returns_empty_list_when_empty
async def test_list_returns_empty_list_when_empty() -> None

list() returns an empty list when no subscriptions exist.

test_update_persists_changes
async def test_update_persists_changes() -> None

update() persists changes to an existing subscription.

test_delete_removes_subscription
async def test_delete_removes_subscription() -> None

delete() removes a subscription so get() returns None afterwards.

test_delete_nonexistent_does_not_raise
async def test_delete_nonexistent_does_not_raise() -> None

delete() on an unknown ID completes without raising.


assert_all_ok
def assert_all_ok(results: list[Result[T, E]]) -> list[T]
Assert every Result in *results* is Ok and return the unwrapped values.

Example

values = assert_all_ok([service.find(id) for id in ids])
values = assert_all_ok([service.find(id) for id in ids])

assert_err
def assert_err(
    result: Result[Any, E],
    error_type: type[E] | None = None
) -> E
Assert *result* is ``Err``, optionally checking the error type.

Short alias combining assert_result_err and assert_result_err_type. Intended for tests that want concise, idiomatic assertions

error = assert_err(service.find_user("missing"), UserNotFound)
assert error.user_id == "missing"
error = assert_err(service.find_user("missing"), UserNotFound)
assert error.user_id == "missing"
Parameters
ParameterTypeDescription
`result`Result[Any, E]The ``Result`` to inspect.
`error_type`type[E] | NoneWhen provided, asserts that the error is an instance of this type.
Returns
TypeDescription
EThe unwrapped ``Err`` value.
Raises
ExceptionDescription
AssertionErrorIf *result* is ``Ok``, or if *error_type* is given and the error is not an instance of that type.

assert_healthy
def assert_healthy(results: dict[str, HealthCheckResult]) -> None
Assert every health check in *results* passed.

Example

results = await health_checker.run_all()
assert_healthy(results)
results = await health_checker.run_all()
assert_healthy(results)

assert_ok
def assert_ok(result: Result[T, E]) -> T
Assert *result* is ``Ok`` and return the inner value.

Short alias for assert_result_ok. Intended for tests that want concise, idiomatic assertions

user = assert_ok(service.find_user("123"))
assert user.email == "test@example.com"
user = assert_ok(service.find_user("123"))
assert user.email == "test@example.com"
Parameters
ParameterTypeDescription
`result`Result[T, E]The ``Result`` to inspect.
Returns
TypeDescription
TThe unwrapped ``Ok`` value.
Raises
ExceptionDescription
AssertionErrorIf *result* is ``Err``.

assert_result_err
def assert_result_err(result: Result[Any, E]) -> E
Unwrap and return the ``Err`` value; raise ``AssertionError`` if ``Ok``.

Example

error = assert_result_err(service.create_user(bad_data))
assert "email" in str(error)
error = assert_result_err(service.create_user(bad_data))
assert "email" in str(error)

assert_result_err_contains
def assert_result_err_contains(
    result: Result[Any, E],
    substring: str
) -> None
Assert Result is Err and ``str(error)`` contains *substring*.

Example

assert_result_err_contains(result, "not found")
assert_result_err_contains(result, "not found")

assert_result_err_type
def assert_result_err_type(
    result: Result[Any, E],
    error_type: type[E]
) -> None
Assert the result is ``Err`` with an error of the given type.

Example

assert_result_err_type(result, ValidationError)
assert_result_err_type(result, ValidationError)

assert_result_ok
def assert_result_ok(result: Result[T, E]) -> T
Unwrap and return the ``Ok`` value; raise ``AssertionError`` if ``Err``.

Example

user = assert_result_ok(service.create_user(data))
assert user.name == "Jo"
user = assert_result_ok(service.create_user(data))
assert user.name == "Jo"

assert_result_ok_value
def assert_result_ok_value(
    result: Result[T, E],
    expected: T
) -> None
Assert Result is Ok with a specific value.

Example

assert_result_ok_value(result, expected_user)
assert_result_ok_value(result, expected_user)

make_resource_record
def make_resource_record(
    resource_type: str = 'item',
    **fields: Any
) -> dict[str, Any]
Create a minimal resource record dict for testing.

Provides sensible defaults so callers only need to specify the fields relevant to the test.

Parameters
ParameterTypeDescription
`resource_type`strRecord type tag (stored in ``_type``). **fields: Field values to override defaults.
Returns
TypeDescription
dict[str, Any]Dict representing the resource record.

Example

user = make_resource_record("user", name="Alice", email="a@b.com")
# {"id": "test-user-1", "_type": "user", "name": "Alice", ...}
user = make_resource_record("user", name="Alice", email="a@b.com")
# {"id": "test-user-1", "_type": "user", "name": "Alice", ...}

override
def override(
    interface: type,
    implementation: Any
) -> Any
Add a per-test DI override to a test method inside a ``@testbed`` class. > **Note** > When used on a method, this stacks *on top of* any class-level overrides provided to ``@testbed``.
Parameters
ParameterTypeDescription
`interface`typeThe service type (contract/protocol) to replace.
`implementation`AnyThe replacement instance.
Returns
TypeDescription
AnyMethod decorator that records the override mapping.

Example

@testbed("my_app.app:create_app")
class TestPayments:
@override(PaymentGateway, MockPaymentGateway(fail=True))
async def test_payment_failure(self, bed):
resp = await bed.client.post("/pay", json={"amount": 100})
assert resp.status_code == 402
@testbed("my_app.app:create_app")
class TestPayments:
@override(PaymentGateway, MockPaymentGateway(fail=True))
async def test_payment_failure(self, bed):
resp = await bed.client.post("/pay", json={"amount": 100})
assert resp.status_code == 402

Raised when a value does not match the stored snapshot.
__init__
def __init__(
    name: str,
    expected: Any,
    actual: Any
) -> None