Skip to content
GitHubDiscord

API Reference

Protocol for time sources — allows swapping real/fake clocks.
def now() -> datetime
def monotonic() -> float
def time() -> float

def __init__(
    name: str = 'ai-test',
    **kwargs
) -> Any

Test client that provides ergonomic helpers for exercising AI service boundaries.

Wraps a TestEnvironment (or AITestBed) and provides recording wrappers for common AI operations — LLM completions, vector searches, and ML predictions — so tests can make assertions about what the system under test invoked.

Parameters
ParameterTypeDescription
`test_bed`The test environment that owns AI providers and test data.
`max_tokens_per_run`Maximum token budget for all LLM operations in this test run. Set to ``0`` to disable enforcement. Defaults to 10 000.
def __init__(
    test_bed: object,
    *,
    max_tokens_per_run: int = 10000
) -> None
property test_data() -> AITestData

Return the AITestData instance owned by the test bed.

property tokens_used() -> int

Total tokens consumed by LLM completions in this run.

property token_budget() -> int

Configured maximum tokens per run (0 = unlimited).

def reset_token_budget() -> None

Reset the token usage counter to zero.

async def complete_with_llm(
    prompt: str,
    **kwargs: object
) -> dict

Simulate an LLM completion, enforce the token budget, and record it.

Token cost is read from the mock response’s "tokens" field if present; otherwise estimated as ceil(len(prompt.split()) * 1.3).

Parameters
ParameterTypeDescription
`prompt`strThe prompt text sent to the (mock) LLM. **kwargs: Extra fields included in the recorded entry.
Returns
TypeDescription
dictThe recorded completion dict.
Raises
ExceptionDescription
TokenBudgetExceededErrorWhen the token budget is set and would be exceeded.
async def search_vector_store(
    query: str,
    **kwargs: object
) -> dict

Simulate a vector store search and record it in test_data.

Returns the mock response keyed "vector" if configured, otherwise an empty results structure.

async def predict_with_ml(
    input_data: object,
    **kwargs: object
) -> dict

Simulate an ML model prediction and record it in test_data.

Returns the mock response keyed "ml" if configured, otherwise a default confidence structure.

def assert_llm_completions_count(expected: int) -> None

Assert that exactly expected LLM completions were recorded.

def assert_vector_searches_count(expected: int) -> None

Assert that exactly expected vector searches were recorded.

def assert_ml_predictions_count(expected: int) -> None

Assert that exactly expected ML predictions were recorded.


Container for AI-related test data and mock responses.

Tracks AI operations (LLM completions, vector searches, ML predictions, etc.) performed during a test and provides mock response configuration.

Parameters
ParameterTypeDescription
`prefix`Prefix used when generating unique identifiers for test data.
def __init__(prefix: str = 'test') -> None
def add_llm_completion(completion: dict) -> None

Record an LLM completion operation.

def add_vector_search(search: dict) -> None

Record a vector store search operation.

def add_ml_prediction(prediction: dict) -> None

Record an ML model prediction operation.

def add_pipeline_operation(operation: dict) -> None

Record an AI pipeline stage operation.

def add_rag_operation(operation: dict) -> None

Record a retrieval-augmented generation operation.

def add_agent_operation(operation: dict) -> None

Record an AI agent action/operation.

def record_time_advance(seconds: float) -> None

Record a simulated time advance (for time-dependent AI logic).

def set_mock_response(
    key: str,
    response: object
) -> None

Register a canned response for a given AI service key.

def get_mock_response(key: str) -> object | None

Retrieve the canned response for key, or None if not configured.

def clear() -> None

Clear all recorded operations and mock responses.


Lightweight wrapper around a raw response for admin assertions.

Attributes: status_code: HTTP status code. headers: Response headers dict. text: Response body as text. json_data: Parsed JSON body (None if not JSON).

def is_htmx_redirect() -> bool

Return True if the response contains HX-Redirect header.

def htmx_redirect_url() -> str | None

Return the HX-Redirect URL if present.

def htmx_trigger() -> str | None

Return the HX-Trigger header value if present.


Test client with helpers for lexigram-admin endpoints.

Wraps a Starlette TestClient (or any httpx-compatible sync client) and provides assertion helpers and convenience methods for CRUD operations.

Parameters
ParameterTypeDescription
`client`An httpx-compatible test client (e.g. Starlette ``TestClient``).
`prefix`Admin URL prefix (default ``"/admin"``).
`default_headers`Headers added to every request.
def __init__(
    client: Any,
    prefix: str = '/admin',
    default_headers: dict[str, str] | None = None
) -> None
def url(
    resource: str,
    *parts: str
) -> str

Build an admin resource URL.

Parameters
ParameterTypeDescription
`resource`strResource name (e.g. ``"user"``). *parts: Additional path segments.
Returns
TypeDescription
strFull URL string, e.g. ``"/admin/user/123/edit"``.
def list(
    resource: str,
    params: dict[str, str] | None = None
) -> AdminResponse

GET the list page for a resource.

def detail(
    resource: str,
    record_id: str
) -> AdminResponse

GET the detail page for a record.

def create(
    resource: str,
    data: dict[str, Any]
) -> AdminResponse

POST to create a new record.

def update(
    resource: str,
    record_id: str,
    data: dict[str, Any]
) -> AdminResponse

POST to update an existing record.

def delete(
    resource: str,
    record_id: str
) -> AdminResponse

POST (or DELETE) to delete a record.

def bulk_action(
    resource: str,
    action: str,
    ids: list[str]
) -> AdminResponse

POST a bulk action.

def search(
    resource: str,
    query: str
) -> AdminResponse

GET the list page with a search query.

def assert_status(
    resp: AdminResponse,
    expected: int
) -> None

Assert response has expected HTTP status code.

def assert_ok(resp: AdminResponse) -> None

Assert response is 200 OK.

def assert_redirect(resp: AdminResponse) -> None

Assert response is an HTMX or standard redirect.

def assert_unprocessable(resp: AdminResponse) -> None

Assert response is HTTP 422 (validation error).

def assert_contains(
    resp: AdminResponse,
    text: str
) -> None

Assert response body contains text.

def assert_htmx_trigger(
    resp: AdminResponse,
    event: str
) -> None

Assert HX-Trigger header contains event.


Test harness that boots the application with optional DI overrides.

Attributes: app: The booted Application. container: The DI container for direct service resolution. client: HTTP test client backed by the real ASGI app.

def __init__() -> None
async def from_factory(
    cls,
    factory: str | Any,
    overrides: dict[type, Any] | None = None
) -> AsyncIterator[AppTestBed]

Create a test bed from an application factory.

Parameters
ParameterTypeDescription
`factory`str | AnyEither a dotted import string (``"my_app.app:create_app"``) or a callable that returns an Application.
`overrides`dict[type, Any] | NoneOptional dict mapping service types to replacement instances. Applied after providers register but before boot.
Yields
TypeDescription
AsyncIterator[AppTestBed]A fully booted AppTestBed.

Example

async with AppTestBed.from_factory(
"my_app.app:create_app",
overrides={EmailService: MockEmailService()},
) as bed:
resp = await bed.client.get("/health")
assert resp.status_code == 200
async def from_app(
    cls,
    app: Any,
    overrides: dict[type, Any] | None = None
) -> AsyncIterator[AppTestBed]

Create a test bed from an already-constructed Application.

Parameters
ParameterTypeDescription
`app`AnyAn Application instance.
`overrides`dict[type, Any] | NoneOptional DI overrides (same as from_factory).
Yields
TypeDescription
AsyncIterator[AppTestBed]A fully booted AppTestBed.

Helper utilities for async testing.
async def wait_for_condition(
    condition_func: Callable[[], bool],
    timeout: float = 5.0,
    interval: float = 0.1
) -> bool
async def collect_async_results(coros: list[Coroutine[Any, Any, Any]]) -> list[Any]
async def run_with_timeout(
    coro: Coroutine[Any, Any, Any],
    timeout: float
) -> Any

Compliance suite for AuditLoggerProtocol implementations.

Subclass and implement create_logger() to run all compliance tests.

async def create_logger() -> Any

Create the AuditLoggerProtocol implementation under test.

Returns
TypeDescription
AnyA fresh instance implementing AuditLoggerProtocol.
async def test_log_does_not_raise() -> None

log() completes without raising for a valid entry.

async def test_log_creates_queryable_entry() -> None

log() creates an entry that is returned by query().

async def test_log_with_metadata() -> None

log() stores extra metadata fields.

async def test_query_by_actor() -> None

query() filters by actor_id.

async def test_query_by_action() -> None

query() filters by action name.

async def test_query_returns_list() -> None

query() always returns a list, even when empty.


Compliance suite for AuditStoreProtocol implementations.

Subclass and implement create_store() to run all compliance tests.

async def create_store() -> Any

Create the AuditStoreProtocol implementation under test.

Returns
TypeDescription
AnyA fresh instance implementing AuditStoreProtocol.
async def test_append_does_not_raise() -> None

append() completes without raising for a valid entry.

async def test_append_and_query_round_trip() -> None

append() and query() round-trip an audit entry.

async def test_count_reflects_appended() -> None

count() returns correct count after appends.

async def test_query_by_actor() -> None

query() filters by actor_id.

async def test_query_empty_returns_list() -> None

query() returns an empty list for unknown actor.


Reusable test suite for any ``BlobStoreProtocol`` implementation.

Subclass and implement create_store:

.. code-block:: python

class TestMyBlobStore(BlobStoreCompliance):
async def create_store(self):
return LocalDriver(root="/tmp/test")
async def create_store() -> Any

Return a ready-to-use, empty BlobStoreProtocol under test.

async def test_upload_and_download() -> None

upload then download returns the same bytes.

async def test_exists_after_upload() -> None

exists returns True after a file is uploaded.

async def test_not_exists_for_missing_path() -> None

exists returns False for a path that has never been uploaded.

async def test_delete_removes_file() -> None

delete removes the file; exists returns False afterwards.

async def test_list_returns_uploaded_paths() -> None

list with a prefix returns paths of uploaded objects.

async def test_get_url_returns_string() -> None

get_url returns a non-empty string.

async def test_health_check_returns_result() -> None

health_check returns a HealthCheckResult.


Reusable test suite for any ``CacheBackendProtocol`` implementation.

Subclass and implement create_backend:

.. code-block:: python

class TestRedisCache(CacheBackendCompliance):
async def create_backend(self):
return RedisCacheBackend("redis://localhost")
async def create_backend() -> Any

Return a fresh, empty instance of the backend under test.

async def test_set_and_get() -> None

set then get returns the stored value.

async def test_get_missing_returns_default() -> None

get on a missing key returns the default.

async def test_get_missing_returns_none_without_default() -> None

get on a missing key returns None when no default is given.

async def test_delete_existing_key() -> None

delete returns True and removes the key.

async def test_delete_missing_key() -> None

delete on a missing key returns False.

async def test_clear() -> None

clear removes all stored entries.

async def test_overwrite_existing_key() -> None

set on an existing key overwrites the value.

async def test_ttl_expiry() -> None

Values with TTL expire and are no longer returned.

async def test_no_ttl_does_not_expire() -> None

Values without TTL persist and are accessible.

async def test_stores_various_value_types() -> None

The backend accepts strings, ints, dicts, and lists.


Factory for creating test containers with common provider registrations.

This factory provides a standardized way to create DI containers for testing that automatically register common protocols and mock implementations.

Example

factory = ContainerFactory() container = factory.create_test_container()

Container has mock providers registered for common protocols

Section titled “Container has mock providers registered for common protocols”
def __init__() -> Any
def create_test_container() -> Any

Create a test container with common protocol registrations.

Returns
TypeDescription
AnyConfigured container with mock implementations for common protocols.

Isolated DI container scoped to a single test.

Wraps LexigramContainerHarness with automatic disposal and convenience helpers for registering overrides.

Intended to be used through the test_container pytest fixture so each test receives a fully isolated container with fresh mock registrations.

Typical usage (via pytest fixture)

async def test_my_service(test_container: ContainerTestFixture) -> None:
test_container.mock(UserRepository, FakeUserRepository())
service = await test_container.get(UserService)
result = await service.create(email="a@b.com")
assert result.is_ok()

Direct usage (as async context manager)

async with ContainerTestFixture() as fixture:
fixture.mock(CacheBackendProtocol, FakeCache())
svc = await fixture.get(MyService)
def __init__(
    *,
    register_mocks: bool = False
) -> None
async def get(interface: type[T]) -> T

Resolve interface from the container.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to resolve.
Returns
TypeDescription
TThe resolved instance.
Raises
ExceptionDescription
RuntimeErrorIf the fixture has already been disposed.
async def get_optional(interface: type[T]) -> T | None

Resolve interface, returning None if not registered.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to resolve.
Returns
TypeDescription
T | NoneThe resolved instance, or ``None`` if not found.
def mock(
    interface: type[T],
    mock_obj: Any
) -> None

Register mock_obj as the singleton for interface.

This is the primary way to wire test doubles into the container.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to override.
`mock_obj`AnyThe mock/fake/stub to register.
async def override(
    interface: type[T],
    mock_obj: Any
) -> AsyncGenerator[None, None]

Register mock_obj for interface for the duration of the block.

Because each test receives a fresh container, this is equivalent to calling mock for the lifetime of the scope. It exists purely for readability in tests that want to be explicit about scope.

Parameters
ParameterTypeDescription
`interface`type[T]The protocol or class to override.
`mock_obj`AnyThe mock/fake/stub to register.
Yields
TypeDescription
AsyncGenerator[None, None]Nothing; the override is active for the body of the block.
async def dispose() -> None

Dispose the container and release all resources.

Idempotent — safe to call multiple times.

property container() -> LexigramContainerHarness

Return the underlying LexigramContainerHarness.

Use this for advanced scenarios that require direct container access. Prefer get and mock for normal test usage.


Reusable test suite for any ``DatabaseProviderProtocol`` implementation.

Subclass and implement create_provider:

.. code-block:: python

class TestMyProvider(DatabaseProviderCompliance):
async def create_provider(self):
return MyDatabaseProvider(dsn="sqlite+aiosqlite:///:memory:")
async def create_provider() -> Any

Return a ready-to-use provider instance under test.

async def test_health_check_returns_result() -> None

health_check returns a HealthCheckResult.

async def test_scoped_context_provides_connection() -> None

get_scoped_connection returns a connection within a scoped context.

async def test_scoped_context_is_reentrant() -> None

Nested scoped_context blocks do not raise.

async def test_execute_ddl() -> None

execute can run DDL statements without error.

async def test_execute_and_fetch() -> None

Data inserted via execute is retrievable via fetch_one.

async def test_transaction_rollback_discards_writes() -> None

Data written inside a rolled-back transaction is not persisted.


def __init__(
    name: str = 'db-test-bed',
    connection_string: str = ':memory:',
    auto_cleanup: bool = True
)
async def create_test_table(
    name: str,
    schema: str
) -> None

Create a test table with the given name and SQL schema string.

async def seed_test_data(
    table: str,
    data: list[dict] | dict
) -> None

Seed test data into a table.

async def clear_test_data(table: str) -> None

Clear all rows from a test table.


def __init__(
    connection_string: str = ':memory:',
    auto_cleanup: bool = True
)
async def cleanup() -> None

Drop tracked tables and clear table data.

async def connect() -> None
async def disconnect() -> None
async def execute_query(
    query: str,
    params: list | None = None
) -> list[dict]
async def execute(
    query: str,
    params: list | None = None,
    fetch: bool = True
) -> list[dict] | int
async def create_table(
    name: str,
    schema: dict | str
) -> None
async def drop_table(name: str) -> None
async def clear_table(name: str) -> None
async def get_table_count(table_name: str) -> int
async def insert_data(
    table: str,
    data: dict | list[dict]
) -> int
async def insert(
    table: str,
    data: dict | list[dict]
) -> int

Compliance suite for DistributedLockProtocol implementations.

Subclass and implement create_lock() to run all compliance tests.

async def create_lock(key: str) -> Any

Create a DistributedLockProtocol implementation under test.

Parameters
ParameterTypeDescription
`key`strUnique resource key for the lock.
Returns
TypeDescription
AnyA fresh DistributedLockProtocol instance.
async def test_acquire_returns_true() -> None

acquire() returns True when the lock is free.

async def test_release_returns_true_when_held() -> None

release() returns True when the lock is currently held.

async def test_is_held_after_acquire() -> None

is_held() returns True after acquire().

async def test_is_not_held_after_release() -> None

is_held() returns False after release().

async def test_acquire_exclusive() -> None

Second acquire() on the same key returns False while held.

async def test_acquire_after_release() -> None

acquire() returns True again after lock is released.

async def test_context_manager_acquires_and_releases() -> None

Context manager acquires on entry and releases on exit.

async def test_extend_returns_true_when_held() -> None

extend() returns True when the lock is currently held.

async def test_extend_returns_false_when_not_held() -> None

extend() returns False when the lock is not held.

async def test_acquire_blocking_succeeds() -> None

acquire_blocking() returns True when the lock is free.


Reusable test suite for any ``EventBusProtocol`` implementation.

Subclass and implement create_bus and create_event:

.. code-block:: python

class TestMyEventBus(EventBusCompliance):
async def create_bus(self):
return MyEventBus()
def create_event(self):
return MyEvent(id="1")
async def create_bus() -> Any

Return a fresh EventBusProtocol instance.

def create_event() -> Any

Return a new event instance for testing.

async def test_subscribe_and_publish() -> None

Subscribing a handler causes it to be called on publish.

async def test_unsubscribe_stops_delivery() -> None

Unsubscribing a handler prevents further delivery.

async def test_multiple_handlers_all_called() -> None

All subscribers for an event type are invoked.

async def test_publish_unrelated_event_type_not_delivered() -> None

A handler subscribed to one type does not receive another type.

async def test_publish_without_subscribers_is_silent() -> None

Publishing when there are no subscribers does not raise.


In-memory fake satisfying ``CacheBackendProtocol`` / ``CacheProtocol``.

Supports TTL expiry using time.monotonic.

Example

cache = FakeCache()
await cache.set("key", "value", ttl=60)
cache.assert_has_key("key")
def __init__() -> None
async def get(
    key: str,
    default: Any = None
) -> Any

Return the stored value for key, or default if absent/expired.

async def set(
    key: str,
    value: Any,
    ttl: float | None = None
) -> None

Store value under key, optionally expiring after ttl seconds.

async def delete(key: str) -> bool

Delete key; return True if it existed.

async def clear() -> None

Remove all entries.

property stored_keys() -> list[str]

Return a snapshot of currently stored keys.

def assert_has_key(key: str) -> None

Assert key is present in the cache.

def assert_value(
    key: str,
    expected: Any
) -> None

Assert key maps to expected.


Controllable clock for deterministic time-based testing.

Example

clock = FakeClock(datetime(2026, 1, 1, tzinfo=UTC))
assert clock.now().year == 2026
clock.advance(3600)
assert clock.now().hour == 1
def __init__(now: datetime | None = None) -> None
def now() -> datetime

Return the current fake time.

def monotonic() -> float

Return a monotonic counter (starts at 0).

def time() -> float

Return Unix timestamp of the current fake time.

def advance(seconds: float) -> None

Move time forward by seconds.

def freeze(at: datetime) -> None

Set the clock to a specific point in time.

def tick() -> None

Advance by exactly 1 second.


Records dispatched commands for test assertions.

Satisfies a CommandBusProtocol interface: register() + dispatch().

Example

bus = FakeCommandBus()
await bus.dispatch(CreateUser(email="a@example.com"))
bus.assert_dispatched(CreateUser, count=1)
def __init__() -> None
def register(
    command_type: type,
    handler: Any
) -> None

Register handler for command_type.

async def dispatch(command: Any) -> Any

Record command and invoke any registered handler.

property dispatched() -> list[Any]

All dispatched commands.

def dispatched_of_type(command_type: type[T]) -> list[T]

Return dispatched commands of command_type.

def assert_dispatched(
    command_type: type[T],
    count: int | None = None
) -> None

Assert command_type was dispatched, optionally count times.

def assert_not_dispatched(command_type: type[T]) -> None

Assert command_type was NOT dispatched.

def clear() -> None

Reset all recorded dispatches.


In-memory config satisfying a ``ConfigProtocol``-like interface.

Supports dot-notation key access ("database.url") and optional section retrieval with model instantiation.

Example

config = FakeConfig({"database": {"url": "sqlite:///:memory:"}})
assert config.get("database.url") == "sqlite:///:memory:"
config.set("cache.backend", "memory")
def __init__(data: dict[str, Any] | None = None) -> None
property environment() -> Any

The active deployment environment (returns test environment).

property is_debug() -> bool

Return False for test environments.

property is_production() -> bool

Return False for test environments.

property is_development() -> bool

Return whether this is development.

property is_testing() -> bool

Return whether this is testing.

property is_staging() -> bool

Return whether this is staging.

def has_section(name: str) -> bool

Check whether a configuration section exists.

def get(
    key: str,
    default: Any = None
) -> Any

Retrieve key (dot-separated path) from config, returning default if absent.

def get_section(
    name: str,
    model_cls: type | None = None
) -> Any

Return the config section name, optionally instantiated as model_cls.

def set(
    key: str,
    value: Any
) -> None

Set key (dot-separated path) to value — test-only helper.


Records all published events for test assertions.

Satisfies the EventBusProtocol protocol from lexigram.contracts.events.

Example

bus = FakeEventBus()
await bus.publish(UserCreated(user_id="abc"))
bus.assert_published(UserCreated, count=1, user_id="abc")
def __init__() -> None
def subscribe(
    event_type: type,
    handler: Any,
    priority: int = 0
) -> None

Register handler for event_type.

def unsubscribe(
    event_type: type,
    handler: Any
) -> None

Remove handler for event_type.

async def publish(event: Any) -> None

Record event and dispatch to any registered handlers.

property published() -> list[DomainEvent]

All published events.

def published_of_type(event_type: type[T]) -> list[T]

Return published events matching event_type.

def assert_published(
    event_type: type[T],
    count: int | None = None,
    **attrs: Any
) -> None

Assert that event_type was published.

Optionally verify count and attribute values.

def assert_not_published(event_type: type[T]) -> None

Assert that event_type was NOT published.

def assert_published_once(
    event_type: type,
    **attrs: Any
) -> None

Assert exactly one event_type was published.

def assert_events_in_order(*event_types: type) -> None

Assert that events were published in the given positional order.

Checks that position i in the published event stream matches event_types[i]. Use assert_published to verify events that may appear anywhere in the stream.

Parameters
ParameterTypeDescription
Raises
ExceptionDescription
AssertionErrorIf any position does not match or the stream is shorter than the expected sequence.

Example

await bus.publish(UserRegistered(user_id="1"))
await bus.publish(EmailSent(user_id="1"))
bus.assert_events_in_order(UserRegistered, EmailSent)
def clear() -> None

Reset the recorded events list.


Captures log entries for test assertions.

Satisfies LoggerProtocol from lexigram.contracts.core.logging.

Example

logger = FakeLogger()
logger.info("user_created", user_id="abc")
logger.assert_logged("info", "user_created")
def __init__(bound_context: dict[str, Any] | None = None) -> None
def debug(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture a DEBUG-level log entry.

def info(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture an INFO-level log entry.

def warning(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture a WARNING-level log entry.

def error(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture an ERROR-level log entry.

def critical(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture a CRITICAL-level log entry.

def exception(
    msg: str,
    *args: Any,
    **kwargs: Any
) -> None

Capture an EXCEPTION-level log entry.

def bind(**kwargs: Any) -> FakeLogger

Return a new FakeLogger sharing entries with merged context.

def unbind(*keys: str) -> FakeLogger

Return a new FakeLogger with specified context keys removed.

property entries() -> list[LogEntry]

All captured log entries.

def assert_logged(
    level: str,
    msg_contains: str
) -> None

Assert that a log entry with level containing msg_contains exists.

def assert_not_logged(
    level: str,
    msg_contains: str | None = None
) -> None

Assert no matching log entry exists.

def clear() -> None

Reset all captured entries.


Records counter, gauge, and histogram observations for test assertions.

Example

metrics = FakeMetricsCollector()
metrics.increment("requests.total")
metrics.assert_counter_incremented("requests.total")
def __init__() -> None
def increment(
    name: str,
    value: float = 1.0,
    tags: dict[str, str] | None = None
) -> None

Increment counter name by value.

def gauge(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Record gauge name = value.

def histogram(
    name: str,
    value: float,
    tags: dict[str, str] | None = None
) -> None

Record a histogram observation for name.

def counter(name: str) -> float

Return the current value of counter name (0 if never incremented).

def assert_counter(
    name: str,
    expected: float
) -> None

Assert counter name equals expected.

def assert_counter_incremented(name: str) -> None

Assert counter name was incremented at least once.

def assert_gauge(
    name: str,
    expected: float
) -> None

Assert gauge name equals expected.

def clear() -> None

Reset all recorded observations.


Records executed queries and returns configured canned results.

Example

bus = FakeQueryBus()
bus.when(GetUser, return_value=user)
result = await bus.execute(GetUser(user_id="123"))
assert result == user
def __init__() -> None
def register(
    query_type: type,
    handler: Any
) -> None

Register handler for query_type.

def when(
    query_type: type,
    return_value: Any
) -> None

Configure a canned response to return for query_type.

async def execute(query: Any) -> Any

Record and execute query, returning canned results first.

property executed() -> list[Any]

All executed queries.

def executed_of_type(query_type: type[T]) -> list[T]

Return executed queries of query_type.

def clear() -> None

Reset all recorded executions.


In-memory fake satisfying ``StateStoreProtocol`` protocol.

Example

store = FakeStateStore()
await store.set("session:abc", {"user_id": "u1"})
value = await store.get("session:abc")
def __init__() -> None
async def get(key: str) -> Any | None

Return the value for key, or None if absent.

async def set(
    key: str,
    value: Any
) -> None

Store value under key.

async def delete(key: str) -> bool

Delete key; return True if it existed.

async def exists(key: str) -> bool

Return True if key has a stored value.

def clear() -> None

Remove all stored entries.


Tracks entity changes and events without persistence.

Optionally integrates with FakeEventBus to dispatch collected events on commit.

Example

async with FakeUnitOfWork() as uow:
uow.register_new(user)
uow.register_event(UserCreated(user_id=user.user_id))
assert uow.committed
def __init__(event_bus: FakeEventBus | None = None) -> None
def register_new(entity: Any) -> None

Mark entity as newly created.

def register_dirty(entity: Any) -> None

Mark entity as modified.

def register_deleted(entity: Any) -> None

Mark entity as deleted.

def register_event(event: Any) -> None

Queue event for dispatch on commit.

def collect_events() -> list[Any]

Return all queued events without consuming them.

async def commit() -> None

Mark as committed and optionally publish queued events.

async def rollback() -> None

Mark as rolled back and clear all tracked changes.


Reusable compliance suite for any ``FlagProvider`` implementation.

Subclass and implement create_provider. Override enabled_flag_name / disabled_flag_name to specify which flags the provider will return for enabled/disabled test cases.

The provider created by create_provider must have at least:

  • one flag named enabled_flag_name that evaluates to True
  • one flag named disabled_flag_name that evaluates to False
async def create_provider() -> Any

Return a fully initialised FlagProvider for testing.

def enabled_flag_name() -> str

Name of a flag that the provider reports as enabled.

def disabled_flag_name() -> str

Name of a flag that the provider reports as disabled.

def unknown_flag_name() -> str

Name of a flag that does not exist in the provider.

async def test_get_flag_found() -> None

get_flag returns a bool for a flag that exists in the provider.

async def test_get_flag_not_found() -> None

get_flag on an unknown flag returns the default (or raises FlagNotFoundError).

async def test_evaluate_flag_enabled() -> None

An enabled flag evaluates to True.

async def test_evaluate_flag_disabled() -> None

A disabled flag evaluates to False.

async def test_async_evaluate() -> None

Async evaluation via get_flag is awaitable and returns a bool.

async def test_get_flag_respects_default() -> None

default parameter is honoured when the flag is absent.


A TestEnvironment pre-configured for integration tests.

Inherits the full TestEnvironment API (use_provider, fake, override, resolve, async context-manager) and adds:

  • with_database — boots a real database provider (defaults to SQLite in-memory so tests don’t need an external process).
  • with_cache — boots a real cache provider (defaults to the in-memory cache backend so tests can run without Redis).
  • with_all — combines database + cache in one call.
  • Config override shortcut via config= constructor argument.

None of the factory methods import the provider packages at class definition time — they use lazy imports so the test can collect and skip cleanly when a package isn’t installed.

def __init__(
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> None
def with_database(
    cls,
    url: str = 'sqlite+aiosqlite:///:memory:',
    *,
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> IntegrationEnvironment

Return an environment wired with a real database provider.

Parameters
ParameterTypeDescription
`url`strDatabase URL. Defaults to an in-memory SQLite database so tests run without any external process.
`name`strEnvironment name shown in log output.
`config`dict[str, Any] | NoneAdditional config overrides (merged with the database URL).
Returns
TypeDescription
IntegrationEnvironmentAn ``IntegrationEnvironment`` with the database provider registered.
Raises
ExceptionDescription
ImportErrorIf ``lexigram-sql`` is not installed.
def with_cache(
    cls,
    backend: str = 'memory',
    *,
    url: str | None = None,
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> IntegrationEnvironment

Return an environment wired with a real cache provider.

Parameters
ParameterTypeDescription
`backend`strBackend type — ``"memory"`` (default), ``"redis"``, or ``"memcached"``.
`url`str | NoneBackend connection URL (required for Redis / Memcached; omit for the in-memory backend).
`name`strEnvironment name.
`config`dict[str, Any] | NoneAdditional config overrides.
Returns
TypeDescription
IntegrationEnvironmentAn ``IntegrationEnvironment`` with the cache provider registered.
Raises
ExceptionDescription
ImportErrorIf ``lexigram-cache`` is not installed.
def with_all(
    cls,
    *,
    database_url: str = 'sqlite+aiosqlite:///:memory:',
    cache_backend: str = 'memory',
    cache_url: str | None = None,
    name: str = 'integration-test',
    config: dict[str, Any] | None = None
) -> IntegrationEnvironment

Return an environment with both database and cache providers.

This is a convenience factory combining with_database and with_cache. Both providers are registered before the environment is returned.

Parameters
ParameterTypeDescription
`database_url`strPassed to with_database.
`cache_backend`strPassed to with_cache.
`cache_url`str | NonePassed to with_cache.
`name`strEnvironment name.
`config`dict[str, Any] | NoneAdditional config overrides.
Raises
ExceptionDescription
ImportErrorIf any required extension package is missing.
async def setup() -> Any

Set up the environment, injecting FakeConfig for config overrides.

If config was passed to the constructor, it is registered as an override before the application boots so that it lands in the container before it is frozen. The override is keyed by ConfigProtocol when available, falling back to the FakeConfig class itself.


Configuration for integration tests.

All values read from environment variables with Docker Compose defaults.

Attributes: postgres_dsn: SQLAlchemy-style async DSN for PostgreSQL. postgres_dsn_raw: Plain asyncpg DSN (no +asyncpg driver prefix). redis_url: Redis connection URL. kafka_bootstrap: Kafka bootstrap servers. minio_endpoint: MinIO endpoint (host:port). minio_access_key: MinIO access key. minio_secret_key: MinIO secret key. elasticsearch_url: Elasticsearch base URL. mongodb_dsn: MongoDB connection string. qdrant_url: Qdrant HTTP URL. neo4j_url: Neo4j bolt URL. neo4j_auth: Neo4j auth string (user/password).

def from_env(cls) -> IntegrationTestConfig

Create config from current environment variables.

Returns
TypeDescription
IntegrationTestConfigIntegrationTestConfig populated from environment with Docker Compose defaults.

Standardized DI container for isolated component testing.

This container automatically registers common mock providers and mock component implementations (LockStore, PubSubProtocol, etc.) to provide an stable baseline for unit and integration tests.

Usage

container = LexigramContainerHarness()
# Common mocks are already registered
# Add your concrete provider for testing
container.register(MyProvider())
# Override specific mocks if needed
container.singleton(DatabaseProviderProtocol, MockDB())
service = await container.resolve(MyService)
def __init__(register_mocks: bool = True) -> None
def override(
    interface: type[T],
    implementation: Any
) -> Any

Context manager to temporarily override a dependency.

Example

with container.override(UserService, MockUserService()): …

def mock(
    interface: type[T],
    mock_obj: Any
) -> None

Register a mock implementation directly as a singleton.

async def try_resolve(interface: type[T]) -> Result[T, Exception]

Resolve interface from the container, wrapping the outcome in Result.

This is the Result-returning counterpart to resolve. Use it when you need to assert that a dependency fails to resolve — for example to verify that a required binding is absent or mis-configured.

Returns
TypeDescription
Result[T, Exception]``Ok(instance)`` on success, ``Err(exception)`` if resolution raises for any reason (missing binding, circular dependency, etc.).

Example

result = await container.try_resolve(MyService)
assert result.is_err()
assert isinstance(result.unwrap_err(), ResolutionError)
def bind(
    protocol: type[T],
    implementation: Any
) -> None

Register implementation as protocol with runtime protocol validation.

If protocol is decorated with @runtime_checkable, this method verifies that implementation is an instance of protocol before registering it. Non-runtime-checkable protocols are accepted as-is (mypy / pyright catch mismatches at type-check time).

Parameters
ParameterTypeDescription
`protocol`type[T]The contract / protocol type to register.
`implementation`AnyThe concrete object implementing *protocol*.
Raises
ExceptionDescription
TypeErrorWhen *protocol* is ``@runtime_checkable`` and *implementation* does not satisfy ``isinstance`` check.

Example

container.bind(CacheBackendProtocol, FakeCache())

A single captured log entry.

Reusable compliance suite for any ``MiddlewareProtocol`` implementation.

Subclass and implement create_middleware. All tests exercise the async __call__(context, next) contract defined by MiddlewareProtocol.

The before / after / error terminology maps to the three observable phases of a __call__-style middleware:

  • before — code that runs prior to invoking next
  • after — code that runs after next returns
  • error — behaviour when the downstream next raises an exception
async def create_middleware() -> Any

Return a fresh instance of the middleware under test.

def make_context() -> dict[str, Any]

Return a minimal request context suitable for the middleware.

async def test_before_called() -> None

Middleware is invoked before the downstream handler is called.

Verified by asserting that __call__ is awaitable and executes without error when a valid context and next are provided.

async def test_after_called() -> None

Middleware receives and can observe the result from the downstream handler.

The result returned by __call__ must equal the value returned by the inner next handler (pass-through behaviour is the minimum requirement; middleware may wrap or transform but must not discard).

async def test_error_flow() -> None

An exception raised by the downstream handler propagates through the middleware.

A middleware that does not explicitly handle errors must let the exception bubble up. If the middleware catches and re-raises the exception that is also acceptable, provided the original exception type is preserved (or wrapped in a framework-specific exception).

async def test_context_passed_to_next() -> None

The context object passed to next is the same (or an enriched) version.

The middleware must not silently drop the context before forwarding it to the inner handler.


Base class for mock providers in testing.
def __init__() -> None
async def register(container: Container) -> None

No-op register for mock providers.

async def boot(container: Container) -> None
async def shutdown() -> None
property started() -> bool
property stopped() -> bool

Mock vector store for testing.

Provides simple in-memory storage with simulated similarity search. Does not require actual embeddings - uses text similarity instead.

Example

store = MockVectorStore() await store.add([ … Document(text=“Python is a programming language”, id=“1”), … Document(text=“JavaScript is for web development”, id=“2”) … ]) results = await store.search( … query_vector=[], # Ignored in mock … top_k=1 … )

def __init__(
    config: Any | None = None,
    **kwargs: Any
)

Initialize mock vector store.

Parameters
ParameterTypeDescription
`config`Any | NoneOptional vector store configuration **kwargs: Dependencies, including: - similarity_threshold: Minimum similarity score for results - dimension_size: Dimension size of embeddings to validate
async def add(documents: list[Document]) -> Result[list[str], VectorStoreError]

Add documents to store.

Parameters
ParameterTypeDescription
`documents`list[Document]Documents to add
Returns
TypeDescription
Result[list[str], VectorStoreError]``Ok(list[str])`` with document IDs on success.
async def batch_upsert(
    documents: list[Document],
    batch_size: int = 100
) -> Result[int, VectorStoreError]

Upsert documents in batches.

Parameters
ParameterTypeDescription
`documents`list[Document]List of documents to upsert.
`batch_size`intNumber of documents per batch. defaults to 100.
Returns
TypeDescription
Result[int, VectorStoreError]``Ok(int)`` with the total count of documents upserted, or ``Err(VectorStoreError)`` on failure.
async def search(
    query_vector: list[float] | None = None,
    query: list[float] | str | None = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict | None = None,
    filters: dict[str, Any] | None = None,
    filter_: dict | None = None,
    **kwargs: Any
) -> Result[list[SearchResult], VectorStoreError]

Search for similar documents.

Uses simple text matching instead of vector similarity. This is sufficient for testing most RAG workflows.

Parameters
ParameterTypeDescription
`query_vector`list[float] | NoneQuery embedding (ignored in mock) k/top_k: Number of results filters/filter_: Metadata filters
Returns
TypeDescription
Result[list[SearchResult], VectorStoreError]``Ok(list[SearchResult])`` on success.
async def delete(ids: list[str]) -> Result[int, VectorStoreError]

Delete documents by ID.

Parameters
ParameterTypeDescription
`ids`list[str]Document IDs to delete
Returns
TypeDescription
Result[int, VectorStoreError]``Ok(int)`` with the count of deleted documents.
def get_document_count() -> int

Get total number of documents.

Returns
TypeDescription
intDocument count
def clear() -> None

Clear all documents.

async def close() -> None

Close store.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.
async def add_texts(
    texts: list[str],
    embeddings: list[list[float]] | None = None,
    metadatas: list[dict[str, Any]] | None = None,
    collection_name: str | None = None
) -> list[str]

Convenience wrapper to add raw texts with optional embeddings/metadata.

Raises
ExceptionDescription
VectorStoreErrorIf adding documents fails (unwraps the Result).

Mock vector store that can simulate errors.

Useful for testing error handling.

Example

store = MockVectorStoreWithErrors(fail_on_search=True) await store.search([0.1, 0.2], top_k=5) # Returns Err(VectorStoreError)

def __init__(
    fail_on_add: bool = False,
    fail_on_search: bool = False,
    fail_on_delete: bool = False,
    error_rate: float | None = None,
    error_message: str = 'Mock vector store error'
)

Initialize error-simulating mock.

Parameters
ParameterTypeDescription
`fail_on_add`boolWhether to fail on add()
`fail_on_search`boolWhether to fail on search()
`fail_on_delete`boolWhether to fail on delete()
`error_rate`float | NoneProbabilistic error rate (0-1), compatible with fixtures
`error_message`strError message to raise
async def add(documents: list[Document]) -> Result[list[str], VectorStoreError]

Add with possible error.

async def search(
    query_vector: list[float] | None = None,
    query: list[float] | str | None = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict | None = None,
    filters: dict[str, Any] | None = None,
    filter_: dict | None = None,
    **kwargs: Any
) -> Result[list[SearchResult], VectorStoreError]

Search with possible error.

async def delete(ids: list[str]) -> Result[int, VectorStoreError]

Delete with possible error.


Mock vector store with actual similarity calculation.

Uses cosine similarity on provided embeddings if available, falls back to text matching otherwise.

Example

store = MockVectorStoreWithSimilarity() await store.add([ … Document( … text=“Python programming”, … embedding=[0.1, 0.2, 0.3], … id=“1” … ) … ]) results = await store.search( … query_vector=[0.1, 0.2, 0.3], … top_k=1 … ) results[0].score # High similarity 1.0

async def search(
    query_vector: list[float] | None = None,
    query: list[float] | str | None = None,
    k: int | None = None,
    top_k: int | None = None,
    filter: dict | None = None,
    filters: dict[str, Any] | None = None,
    filter_: dict | None = None,
    **kwargs: Any
) -> Result[list[SearchResult], VectorStoreError]

Search with actual similarity calculation.

Parameters
ParameterTypeDescription
`query_vector`list[float] | NoneQuery embedding vector k/top_k: Number of results filters/filter_: Metadata filters
Returns
TypeDescription
Result[list[SearchResult], VectorStoreError]``Ok(list[SearchResult])`` sorted by similarity.

Compliance suite for queue backend implementations.

Subclass and implement create_backend() to run all compliance tests.

async def create_backend(queue_name: str = 'test-queue') -> Any

Create the queue backend implementation under test.

Parameters
ParameterTypeDescription
`queue_name`strName of the queue to use for testing.
Returns
TypeDescription
AnyA fresh queue backend instance.
async def test_enqueue_returns_message_id() -> None

enqueue() returns a message ID string.

async def test_dequeue_returns_enqueued_message() -> None

dequeue() returns the message that was enqueued.

async def test_dequeue_empty_returns_none() -> None

dequeue() returns None when no messages are available.

async def test_ack_removes_message() -> None

ack() completes without error for an in-flight message.

async def test_nack_requeues_message() -> None

nack() with requeue=True makes the message available again.

async def test_nack_discards_message() -> None

nack() with requeue=False permanently discards the message.


Reusable test suite for any ``RepositoryProtocol[T]`` implementation.

Verifies that the repository satisfies the standard persistence contract: save, get, delete, list.

Subclass and implement create_repository and create_entity:

.. code-block:: python

class TestInMemoryUserRepo(RepositoryCompliance[User]):
async def create_repository(self):
return InMemoryUserRepository()
def create_entity(self, **overrides):
return User(id=str(uuid4()), name="Alice")
async def create_repository() -> Any

Return a fresh, empty repository instance.

def create_entity(**overrides: Any) -> T

Return a new entity instance suitable for persistence.

async def test_save_and_get() -> None

save then get returns the entity.

async def test_get_missing_returns_none() -> None

get on a non-existent id returns None.

async def test_delete_existing() -> None

delete removes the entity and returns True.

async def test_delete_missing_returns_false() -> None

delete on a non-existent id returns False.

async def test_list_returns_all_saved() -> None

list returns all saved entities.

async def test_save_overwrites_existing() -> None

Saving an entity with the same id replaces the old version.


Reusable test suite for any ``SearchEngineProtocol`` implementation.

Subclass and implement create_engine:

.. code-block:: python

class TestMySearch(SearchEngineCompliance):
async def create_engine(self):
return MySearchEngine(url="http://localhost:9200")
async def create_engine() -> Any

Return a ready-to-use search engine under test.

async def test_create_and_delete_index() -> None

create_index and delete_index do not raise errors.

async def test_index_single_document() -> None

index_document succeeds and the document is searchable.

async def test_index_many_documents() -> None

index_many successfully indexes multiple documents.

async def test_delete_document() -> None

delete_document removes an indexed document without error.

async def test_search_returns_results_object() -> None

search returns an object with a hits or results attribute.

async def test_health_check_returns_result() -> None

health_check returns a HealthCheckResult.


Async probes for common external services.

Each method attempts a real network connection and returns True if the service is reachable, False otherwise. All probes suppress all exceptions — they are purely availability indicators, not functional tests.

Example

@pytest.fixture
async def redis_client():
if not await ServiceProbe.check_redis():
pytest.skip("Redis not available")
# ... connect and return client
async def check_redis(url: str = 'redis://localhost:6379') -> bool

Return True if a Redis server is reachable at url.

async def check_postgres(dsn: str = 'postgresql://localhost/test') -> bool

Return True if a PostgreSQL server is reachable at dsn.

async def check_elasticsearch(url: str = 'http://localhost:9200') -> bool

Return True if an Elasticsearch HTTP endpoint is reachable.

async def check_rabbitmq(url: str = 'amqp://guest:guest@localhost:5672/') -> bool

Return True if a RabbitMQ AMQP broker is reachable.

async def check_meilisearch(url: str = 'http://localhost:7700') -> bool

Return True if a Meilisearch HTTP endpoint is reachable.

async def check_smtp(
    host: str = 'localhost',
    port: int = 25
) -> bool

Return True if an SMTP server is accepting connections.

async def check_kafka(bootstrap: str = 'localhost:19092') -> bool

Return True if a Kafka broker is reachable.

Parameters
ParameterTypeDescription
`bootstrap`strKafka bootstrap server address.
Returns
TypeDescription
boolTrue if a broker connection succeeds.
async def check_minio(endpoint: str = 'localhost:19000') -> bool

Return True if a MinIO endpoint is reachable.

Parameters
ParameterTypeDescription
`endpoint`strMinIO endpoint in host:port format.
Returns
TypeDescription
boolTrue if the HTTP endpoint responds.
async def check_mongodb(dsn: str = 'mongodb://localhost:17017') -> bool

Return True if a MongoDB server is reachable.

Parameters
ParameterTypeDescription
`dsn`strMongoDB connection string.
Returns
TypeDescription
boolTrue if the server responds to a ping.
async def check_qdrant(url: str = 'http://localhost:16333') -> bool

Return True if a Qdrant vector store is reachable.

Parameters
ParameterTypeDescription
`url`strQdrant HTTP URL.
Returns
TypeDescription
boolTrue if the healthz endpoint responds with 200.
async def check_neo4j(url: str = 'bolt://localhost:17687') -> bool

Return True if a Neo4j graph database is reachable.

Parameters
ParameterTypeDescription
`url`strNeo4j bolt URL.
Returns
TypeDescription
boolTrue if a TCP connection to the bolt port succeeds.

Assert that a value matches a stored snapshot.

On first run (no snapshot file exists), the current value is persisted to disk and the test is skipped via pytest.skip. On subsequent runs the value is compared to the stored snapshot and the test fails if they differ.

Parameters
ParameterTypeDescription
`snapshot_dir`Directory where snapshot files are stored. Defaults to a ``__snapshots__`` subdirectory of the current working directory.

Example

asserter = SnapshotAsserter(Path("tests/__snapshots__"))
asserter.assert_match("create_user_response", response_body)
def __init__(snapshot_dir: Path | None = None) -> None
def assert_match(
    name: str,
    value: Any
) -> None

Assert value matches the stored snapshot for name.

Creates the snapshot and skips the test on first call.

Parameters
ParameterTypeDescription
`name`strUnique snapshot identifier (used as the filename stem).
`value`AnyThe value to compare. Must be JSON-serialisable.
Raises
ExceptionDescription
SnapshotMismatchErrorIf the value does not match the stored snapshot.
def update(
    name: str,
    value: Any
) -> Path

Persist value as the stored snapshot for name.

Parameters
ParameterTypeDescription
`name`strUnique snapshot identifier.
`value`AnyThe value to store.
Returns
TypeDescription
PathPath to the written snapshot file.
def delete(name: str) -> bool

Delete the snapshot file for name.

Returns
TypeDescription
bool``True`` if the file existed and was deleted; ``False`` otherwise.
def exists(name: str) -> bool

Return True if a snapshot for name exists on disk.


Real system clock satisfying the Clock protocol.
def now() -> datetime

Return the current UTC time.

def monotonic() -> float

Return the monotonic clock value.

def time() -> float

Return the current Unix timestamp.


Reusable test suite for any ``TaskQueueProtocol`` implementation.

Subclass and implement create_queue:

.. code-block:: python

class TestMyQueue(TaskQueueCompliance):
async def create_queue(self):
return MemoryTaskQueue()
async def create_queue() -> Any

Return a fresh, empty instance of the queue under test.

async def test_enqueue_returns_task_id() -> None

enqueue returns the task id.

async def test_dequeue_returns_enqueued_task() -> None

dequeue returns the task that was enqueued.

async def test_dequeue_empty_returns_none() -> None

dequeue on an empty queue returns None.

async def test_ack_does_not_raise() -> None

ack on a known in-flight task completes without error.

async def test_ack_unknown_id_is_noop() -> None

ack on an unknown task id is a safe no-op.

async def test_nack_requeue_returns_task_to_queue() -> None

nack with requeue=True makes the task available for dequeue again.

async def test_nack_discard_does_not_requeue() -> None

nack with requeue=False permanently discards the task.

async def test_nack_unknown_id_is_noop() -> None

nack on an unknown task id is a safe no-op.

async def test_get_task_count_reflects_pending() -> None

get_task_count returns the number of tasks waiting to be dequeued.

async def test_clear_empties_the_queue() -> None

clear removes all pending tasks.


Test environment pre-wired with mock task queue and executor.

Unlike the generic TestEnvironment, TaskTestBed overrides setup and teardown to manage the mock components directly without starting a full Application.

Attributes: mock_queue: In-memory MockTaskQueue instance created during setup. mock_executor: MockTaskExecutor instance created during setup.

Example

bed = TaskTestBed()
await bed.setup()
client = TaskTestClient(bed)
task_ids = await client.enqueue_test_tasks()
...
await bed.teardown()
def __init__(config: Any = None) -> None
async def setup() -> None

Initialise mock components and pre-populate the queue.

Creates a fresh MockTaskQueue and MockTaskExecutor, then enqueues the three sample tasks returned by sample_tasks. Does not start an Application or DI container.

async def teardown() -> None

Clear the mock queue and reset state.

Safe to call even if setup was never called.

def get_enqueued_tasks() -> list[Any]

Return all tasks currently waiting in the mock queue.

Returns
TypeDescription
list[Any]A list of JobProtocol objects, in enqueue order.
Raises
ExceptionDescription
RuntimeErrorIf called before setup.

High-level task-system test helper.

Wraps a TaskTestBed and provides convenience methods for testing the task lifecycle without a running application or external queue backend.

Attributes: test_bed: The underlying TaskTestBed that owns the mocks. provider: The currently active MockTasksProvider, or None when no provider is running.

Example

bed = TaskTestBed()
await bed.setup()
client = TaskTestClient(bed)
async with client.task_context() as provider:
task_ids = await client.enqueue_test_tasks()
...
await bed.teardown()
def __init__(test_bed: TaskTestBed) -> None
async def start_provider() -> MockTasksProvider

Create and start a MockTasksProvider bound to the test bed.

Sets self.provider and returns the new provider so callers can assert on its identity.

Returns
TypeDescription
MockTasksProviderThe newly created MockTasksProvider.
async def stop_provider() -> None

Shut down the active provider and clear self.provider.

No-op if no provider is running.

async def enqueue_test_tasks() -> list[str]

Enqueue the three standard sample tasks into the mock queue.

Creates JobProtocol instances from sample_tasks and enqueues them.

Returns
TypeDescription
list[str]A list of three task-id strings in enqueue order.
async def execute_test_task(task: Any) -> MockTaskResult[dict[str, Any]]

Execute task via the mock executor.

Parameters
ParameterTypeDescription
`task`AnyA JobProtocol (or any object compatible with MockTaskExecutor).
Returns
TypeDescription
MockTaskResult[dict[str, Any]]A MockTaskResult describing the outcome.
async def task_context() -> AsyncGenerator[MockTasksProvider, None]

Async context manager that starts a provider and stops it on exit.

Yields
TypeDescription
AsyncGenerator[MockTasksProvider, None]The active MockTasksProvider.

Example

async with client.task_context() as provider:
assert provider is not None
assert client.provider is None

Static collection of canned task/job fixtures for testing.

All methods return plain dict objects so that callers can use them without importing task-specific model classes. The dicts are intentionally simple and cover the same task names that MockTaskExecutor recognises.

def sample_tasks() -> list[dict[str, Any]]

Return three representative task definitions.

Returns
TypeDescription
list[dict[str, Any]]A list of three task dicts covering email_notification, data_processing, and cleanup_job.
def sample_jobs() -> list[dict[str, Any]]

Return two representative background-job definitions.

Returns
TypeDescription
list[dict[str, Any]]A list with batch_import and maintenance dicts.
def sample_scheduled_jobs() -> list[dict[str, Any]]

Return two scheduled-job definitions.

Returns
TypeDescription
list[dict[str, Any]]A list with daily_backup and hourly_cleanup dicts.

Custom assertion helpers for testing.
def assert_eventually_true(
    condition_func: Callable[[], bool],
    timeout: float = 5.0,
    message: str = 'Condition never became true'
) -> None
def assert_dict_contains_subset(
    subset: dict[str, Any],
    superset: dict[str, Any],
    message: str | None = None
) -> None
async def assert_async_raises(
    exc_class: type[Exception],
    coro: Awaitable[Any],
    message: str | None = None
) -> None
def assert_metrics_contain(
    metrics: dict[str, Any],
    key: str,
    min_value: float | None = None
) -> None

Factory for generating test data.
def create_user(
    user_id: str | None = None,
    **kwargs: Any
) -> dict[str, Any]
def create_task(
    task_id: str | None = None,
    **kwargs: Any
) -> dict[str, Any]
def create_message(
    topic: str = 'test',
    **kwargs: Any
) -> dict[str, Any]
def create_request(
    method: str = 'GET',
    path: str = '/',
    **kwargs: Any
) -> dict[str, Any]

Isolated test environment for provider testing.

test = False

The TestEnvironment provides a controlled testing environment for Lexigram providers and applications. It manages the application lifecycle, dependency injection container, and provider registration for tests.

Attributes: name: Name of the test environment. app: The Application instance, if created. container: The DI container for service resolution. providers: Dictionary of registered providers. mock_providers: Dictionary of registered mock providers.

Example

Creating an environment

env = TestEnvironment("my-test")
# Add providers
env.use_provider(MyProvider())
# Add mock providers
env.use_mock_provider(MockDatabaseProvider())
# Override services
env.override_service(DbService, MockDbService)
# Run the test
async with env.run():
service = await env.container.resolve(MyService)

Using with pytest fixtures

@pytest.fixture
async def test_env():
env = TestEnvironment("test")
async with env.run():
yield env
def __init__(name_or_app: str | Application = 'test-bed') -> None
def use_provider(provider: Provider) -> TestEnvironment

Add a provider to the test bed.

def use_mock_provider(provider: MockProvider) -> TestEnvironment

Add a mock provider to the test bed.

def override(
    interface: type,
    implementation: Any
) -> TestEnvironment

Override a service registration.

def add_fixture(
    name: str,
    fixture_func: Callable
) -> TestEnvironment

Add a test fixture.

def fake(contract: type) -> TestEnvironment

Register the well-known fake for contract and return self.

The fake-registry maps core protocol types to their in-memory doubles from lexigram.testing.fakes. Raises :exc:ValueError if no fake is registered for the given contract.

Example

env.fake(FakeEventBus).fake(FakeLogger)
def get_fake(contract: type) -> Any

Return the fake instance registered for contract.

Raises :exc:ValueError if contract has no registered fake. Use this in tests to inspect recorded interactions after the code under test has run.

Example

bus = env.get_fake(FakeEventBus)
bus.assert_published(UserCreated)
async def setup() -> Application

Setup the test environment.

If called on an already-running environment, automatically tears down and restarts if new providers have been added since the last setup — enabling the pattern of incremental provider registration followed by a second setup() call.

async def teardown() -> None

Teardown the test environment.

async def teardown_providers() -> None

Hook for subclasses to tear down providers (no-op default).

async def context() -> AsyncGenerator[TestEnvironment, None]

Context manager for test bed lifecycle.

def get_provider(name: str) -> Provider | None

Get a provider by name.

def get_mock_provider(name: str) -> MockProvider | None

Get a mock provider by name.

def resolve_sync(service_type: type) -> Any

Resolve a service from the container synchronously.

async def resolve(service_type: type) -> Any

Resolve a service from the container asynchronously.

async def health_check() -> dict[str, dict[str, Any]]

Get health status of all providers.

def create_mock(
    provider_class: type[MockProvider],
    **kwargs: Any
) -> MockProvider

Create a mock provider instance.


Testing utilities and fixtures module.

Provides test doubles, container overrides, and lifecycle trackers for use in tests.

Usage

@module(imports=[TestingModule.configure()])
class TestAppModule(Module):
pass
def configure(
    cls,
    **kwargs: Any
) -> DynamicModule

Create a TestingModule for use in test suites.

Parameters
ParameterTypeDescription
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Compliance suite for VectorStoreProtocol implementations.

Subclass and implement create_store() to run all compliance tests. The store is connected before tests and disconnected after.

async def create_store() -> Any

Create a connected VectorStoreProtocol implementation under test.

Returns
TypeDescription
AnyA connected VectorStoreProtocol instance.
async def test_health_check_passes() -> None

health_check() returns a healthy result.

async def test_list_collections_returns_list() -> None

list_collections() returns a list.

async def test_create_and_check_collection() -> None

create_collection() creates a collection visible via collection_exists().

async def test_delete_collection_removes_it() -> None

delete_collection() removes the collection.

async def test_collection_not_exists_for_unknown() -> None

collection_exists() returns False for a non-existent collection.

async def test_upsert_and_count() -> None

upsert() increases count in the collection.

async def test_search_returns_results() -> None

search() returns results after upsert.

async def test_get_by_ids() -> None

get() retrieves vectors by their IDs.

async def test_delete_by_ids() -> None

delete() removes vectors by ID.


TestBed specialized for web applications.

Extends TestEnvironment to add web-specific functionality. Accepts a WebProvider or an Application and exposes a fully functional HTTP client for making requests against the mounted ASGI app.

After setup, the following attributes are available:

  • bed.client: A ~WebTestClient backed by the real ASGI app.
  • bed.web_provider: The WebProvider instance.
  • bed.container: The DI container (for overrides and resolution).
Parameters
ParameterTypeDescription
`provider_or_app`A WebProvider instance or a Lexigram Application.
`name`Name of the test environment (defaults to "web-test-bed").
`raise_server_exceptions`Whether the test client re-raises server errors. Defaults to True. Set to False to inspect 5xx responses as assertions.
def __init__(
    provider_or_app: WebProvider | Application,
    name: str = 'web-test-bed',
    raise_server_exceptions: bool = True
) -> None
async def setup() -> Any

Boot the application and wire up the test client.

def override(
    interface: type,
    implementation: Any
) -> WebTestBed

Override a DI registration for testing (fluent interface).

Must be called before setup. Returns self for chaining.

def get(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a GET request via the test client.

def post(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a POST request via the test client.

def put(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a PUT request via the test client.

def delete(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a DELETE request via the test client.

def patch(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a PATCH request via the test client.


Test client for testing web applications.

Wraps Starlette’s TestClient to provide a seamless testing experience for Lexigram web applications. It automatically extracts the underlying ASGI application from a Lexigram Application instance and provides assertion-aware TestResponse objects.

Note This class was renamed from TestClient to avoid pytest collecting it as a test class.

def __init__(
    app: Any,
    base_url: str = 'http://testserver',
    raise_server_exceptions: bool = True,
    **kwargs: Any
) -> None

Initialize the WebTestClient.

Parameters
ParameterTypeDescription
`app`AnyA Lexigram Application, WebProvider, or raw ASGI application.
`base_url`strThe base URL for requests.
`raise_server_exceptions`boolWhether to raise exceptions that occur in the ASGI app. **kwargs: Additional arguments passed to the underlying Starlette TestClient.
def as_user(user: Any) -> WebTestClient

Simulate an authenticated user for subsequent requests.

Parameters
ParameterTypeDescription
`user`AnyThe user object (must have an 'id' attribute or be convertible to str).
Returns
TypeDescription
WebTestClientself for chaining.
def get(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a GET request.

def options(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform an OPTIONS request.

def head(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a HEAD request.

def post(
    url: str,
    data: Any = None,
    json: Any = None,
    **kwargs: Any
) -> TestResponse

Perform a POST request.

def put(
    url: str,
    data: Any = None,
    json: Any = None,
    **kwargs: Any
) -> TestResponse

Perform a PUT request.

def patch(
    url: str,
    data: Any = None,
    json: Any = None,
    **kwargs: Any
) -> TestResponse

Perform a PATCH request.

def delete(
    url: str,
    **kwargs: Any
) -> TestResponse

Perform a DELETE request.

def request(
    method: str,
    url: str,
    headers: dict[str, str] | None = None,
    **kwargs: Any
) -> TestResponse

Perform a generic HTTP request.

def websocket_connect(
    url: str,
    subprotocols: Any = None,
    **kwargs: Any
) -> Any

Perform a WebSocket connection.


def assert_all_ok(results: list[Result[T, E]]) -> list[T]

Assert every Result in results is Ok and return the unwrapped values.

Example

values = assert_all_ok([service.find(id) for id in ids])

def assert_err(
    result: Result[Any, E],
    error_type: type[E] | None = None
) -> E

Assert result is Err, optionally checking the error type.

Short alias combining assert_result_err and assert_result_err_type. Intended for tests that want concise, idiomatic assertions

error = assert_err(service.find_user("missing"), UserNotFound)
assert error.user_id == "missing"
Parameters
ParameterTypeDescription
`result`Result[Any, E]The ``Result`` to inspect.
`error_type`type[E] | NoneWhen provided, asserts that the error is an instance of this type.
Returns
TypeDescription
EThe unwrapped ``Err`` value.
Raises
ExceptionDescription
AssertionErrorIf *result* is ``Ok``, or if *error_type* is given and the error is not an instance of that type.

def assert_healthy(results: dict[str, HealthCheckResult]) -> None

Assert every health check in results passed.

Example

results = await health_checker.run_all()
assert_healthy(results)

def assert_ok(result: Result[T, E]) -> T

Assert result is Ok and return the inner value.

Short alias for assert_result_ok. Intended for tests that want concise, idiomatic assertions

user = assert_ok(service.find_user("123"))
assert user.email == "test@example.com"
Parameters
ParameterTypeDescription
`result`Result[T, E]The ``Result`` to inspect.
Returns
TypeDescription
TThe unwrapped ``Ok`` value.
Raises
ExceptionDescription
AssertionErrorIf *result* is ``Err``.

def assert_result_err(result: Result[Any, E]) -> E

Unwrap and return the Err value; raise AssertionError if Ok.

Example

error = assert_result_err(service.create_user(bad_data))
assert "email" in str(error)

def assert_result_err_contains(
    result: Result[Any, E],
    substring: str
) -> None

Assert Result is Err and str(error) contains substring.

Example

assert_result_err_contains(result, "not found")

def assert_result_err_type(
    result: Result[Any, E],
    error_type: type[E]
) -> None

Assert the result is Err with an error of the given type.

Example

assert_result_err_type(result, ValidationError)

def assert_result_maps_to(
    result: Result[T, E],
    mapper: Callable[[T], U],
    expected: U
) -> None

Assert the Ok value transforms to expected via mapper.

Example

assert_result_maps_to(result, lambda u: u.email, "user@example.com")

def assert_result_ok(result: Result[T, E]) -> T

Unwrap and return the Ok value; raise AssertionError if Err.

Example

user = assert_result_ok(service.create_user(data))
assert user.name == "Jo"

def assert_result_ok_value(
    result: Result[T, E],
    expected: T
) -> None

Assert Result is Ok with a specific value.

Example

assert_result_ok_value(result, expected_user)

def make_resource_record(
    resource_type: str = 'item',
    **fields: Any
) -> dict[str, Any]

Create a minimal resource record dict for testing.

Provides sensible defaults so callers only need to specify the fields relevant to the test.

Parameters
ParameterTypeDescription
`resource_type`strRecord type tag (stored in ``_type``). **fields: Field values to override defaults.
Returns
TypeDescription
dict[str, Any]Dict representing the resource record.

Example

user = make_resource_record("user", name="Alice", email="a@b.com")
# {"id": "test-user-1", "_type": "user", "name": "Alice", ...}

def override(
    container: Container,
    interface: type[T],
    implementation: Any
) -> Iterator[None]

Context manager to temporarily override a dependency in the container.

Example

with override(container, UserService, MockUserService()): # UserService resolves to MockUserService here …


Raised when a value does not match the stored snapshot.
def __init__(
    name: str,
    expected: Any,
    actual: Any
) -> None