Skip to content
GitHubDiscord

API Reference

Protocol for HTTP client implementations.
async def start() -> None

Start the HTTP client and its underlying connection pool.

async def stop() -> None

Stop the HTTP client and release all connection resources.

async def request(
    method: str,
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform an arbitrary HTTP request.

Parameters
ParameterTypeDescription
`method`strHTTP method (GET, POST, PUT, …).
`url`strRequest URL. **kwargs: Additional options (headers, data, json, params, …).
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.
async def get(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform GET request.

Parameters
ParameterTypeDescription
`url`strRequest URL. **kwargs: Additional options.
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.
async def post(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform POST request.

Parameters
ParameterTypeDescription
`url`strRequest URL. **kwargs: Additional options (e.g. ``json=``, ``data=``).
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.
async def put(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform PUT request.

Parameters
ParameterTypeDescription
`url`strRequest URL. **kwargs: Additional options.
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.
async def delete(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform DELETE request.

Parameters
ParameterTypeDescription
`url`strRequest URL. **kwargs: Additional options.
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.
async def patch(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform PATCH request.

Parameters
ParameterTypeDescription
`url`strRequest URL. **kwargs: Additional options.
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.
async def head(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform HEAD request.

Parameters
ParameterTypeDescription
`url`strRequest URL. **kwargs: Additional options.
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.

Protocol for interceptor chain management.

Manages a collection of interceptors and orchestrates their execution in sequence. Each interceptor in the chain processes the request/response before passing control to the next interceptor.

Example

class InterceptorChain:
def __init__(self, interceptors: list[InterceptorProtocol]):
self._interceptors = interceptors
async def execute_request(self, context: Any) -> Any:
for interceptor in self._interceptors:
context = await interceptor.intercept_request(context)
return context
async def execute_response(self, response: Any) -> Any:
for interceptor in reversed(self._interceptors):
response = await interceptor.intercept_response(response)
return response
def add_interceptor(interceptor: InterceptorProtocol) -> None

Add an interceptor to the chain.

Parameters
ParameterTypeDescription
`interceptor`InterceptorProtocolThe interceptor to add.
def remove_interceptor(interceptor: InterceptorProtocol) -> None

Remove an interceptor from the chain.

Parameters
ParameterTypeDescription
`interceptor`InterceptorProtocolThe interceptor to remove.
async def process_request(context: Any) -> Any

Process request through the interceptor chain.

Parameters
ParameterTypeDescription
`context`AnyRequest context to process.
Returns
TypeDescription
AnyModified request context after all interceptors.
async def process_response(response: Any) -> Any

Process response through the interceptor chain.

Parameters
ParameterTypeDescription
`response`AnyResponse to process.
Returns
TypeDescription
AnyModified response after all interceptors.

Protocol for request/response interceptors.

Interceptors are applied to every request/response cycle in HTTPClient. Implementations receive typed RequestContext objects and may modify them before the request is dispatched, or inspect / annotate the raw response after it arrives.

Example

class LoggingInterceptor: async def intercept_request(self, context: Any) -> Any: logger.info(“outbound_request”, method=context.method, url=context.url) return context

async def intercept_response(self, response: Any) -> Any:
logger.info("inbound_response", status=response.status)
return response
async def intercept_request(context: Any) -> Any

Called before a request is dispatched.

Parameters
ParameterTypeDescription
`context`AnyRequestContext for the outbound request. Implementations may mutate and return it.
Returns
TypeDescription
AnyThe (possibly modified) request context.
async def intercept_response(response: Any) -> Any

Called after a response is received from the server.

Parameters
ParameterTypeDescription
`response`AnyRaw response object from the underlying HTTP library. Implementations may annotate or replace it.
Returns
TypeDescription
AnyThe (possibly modified) response.

Async HTTP client with base-URL resolution and context-manager lifecycle.

Wraps HTTPClient and adds:

  • base_url prepended to every relative request path.
  • Default headers merged into every request.
  • Direct HttpResponse returns from verb methods (raises on infrastructure failures rather than returning Result).
  • stream async context manager for line-oriented streaming responses (SSE, NDJSON).
  • Lazy initialisation — no explicit start() call required; the underlying connection pool is created on first request and torn down by close.
Parameters
ParameterTypeDescription
`base_url`Base URL prepended to every relative path.
`headers`Default request headers merged with per-request headers.
`timeout`Request timeout in seconds (default 300).
`name`Logical name used in log messages.
`retry`Optional retry policy injected into the underlying HTTPClient.
`circuit_breaker`Optional circuit breaker injected into the underlying HTTPClient.

Example

async with BaseURLHTTPClient(
base_url="https://api.example.com/v1",
headers={"Authorization": "Bearer token"},
timeout=60.0,
) as client:
resp = await client.post("/completions", json=payload)
resp.raise_for_status()
def __init__(
    base_url: str = '',
    headers: dict[str, str] | None = None,
    timeout: float = 300.0,
    name: str = 'base-url-http-client',
    retry: RetryPolicyProtocol | None = None,
    circuit_breaker: CircuitBreakerProtocol | None = None
) -> None
async def close() -> None

Stop the underlying HTTP client and release all connections.

async def stop() -> None

Alias for close.

async def request(
    method: str,
    url: str,
    **kwargs: Any
) -> HttpResponse

Make an HTTP request, returning HttpResponse directly.

Infrastructure failures are raised as exceptions (not wrapped in Result). HTTP 4xx/5xx responses are returned as-is; call raise_for_status to convert them to exceptions when desired.

Parameters
ParameterTypeDescription
`method`strHTTP method (GET, POST, …).
`url`strRelative path or absolute URL. **kwargs: Forwarded to aiohttp (``json=``, ``data=``, ``headers=``, …).
Returns
TypeDescription
HttpResponseFramework HttpResponse.
async def get(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform a GET request.

Parameters
ParameterTypeDescription
`url`strRelative path or absolute URL. **kwargs: Forwarded to aiohttp.
Returns
TypeDescription
HttpResponseFramework HttpResponse.
async def post(
    url: str,
    data: Any = None,
    **kwargs: Any
) -> HttpResponse

Perform a POST request.

Parameters
ParameterTypeDescription
`url`strRelative path or absolute URL.
`data`AnyOptional body. ``dict`` is sent as JSON; other types as raw ``data``. Use ``json=`` kwarg for explicit JSON. **kwargs: Forwarded to aiohttp.
Returns
TypeDescription
HttpResponseFramework HttpResponse.
async def put(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform a PUT request.

Parameters
ParameterTypeDescription
`url`strRelative path or absolute URL. **kwargs: Forwarded to aiohttp.
Returns
TypeDescription
HttpResponseFramework HttpResponse.
async def delete(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform a DELETE request.

Parameters
ParameterTypeDescription
`url`strRelative path or absolute URL. **kwargs: Forwarded to aiohttp.
Returns
TypeDescription
HttpResponseFramework HttpResponse.
async def patch(
    url: str,
    **kwargs: Any
) -> HttpResponse

Perform a PATCH request.

Parameters
ParameterTypeDescription
`url`strRelative path or absolute URL. **kwargs: Forwarded to aiohttp.
Returns
TypeDescription
HttpResponseFramework HttpResponse.
async def stream(
    method: str,
    url: str,
    **kwargs: Any
) -> AsyncIterator[StreamContext]

Async context manager for line-oriented streaming responses.

Opens a persistent HTTP connection and wraps the response in a StreamContext that exposes aiter_lines for consuming SSE / NDJSON streams — the dominant format used by LLM provider streaming APIs.

Parameters
ParameterTypeDescription
`method`strHTTP method (typically ``"POST"`` for LLM completions).
`url`strRelative path or absolute URL. **kwargs: Forwarded to aiohttp (``json=``, ``data=``, ``headers=``, …).
Yields
TypeDescription
AsyncIterator[StreamContext]StreamContext with status, raise_for_status, and aiter_lines.

Example

async with client.stream("POST", "/chat/completions", json=payload) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if line.startswith("data: "):
...

Async HTTP connection pool backed by ``aiohttp``.
Parameters
ParameterTypeDescription
`max_connections`Maximum total concurrent connections (default: 10).
`max_keepalive_connections`Keep-alive connections per host (default: 5).
`timeout`Request timeout in seconds (default: 30.0).
`ttl_dns_cache`DNS cache TTL in seconds (default: 300).
`force_close`Force-close connections after each use (default: False).
`verify_ssl`Verify SSL certificates (default: True). Set to ``False`` only in trusted internal networks — never in production for external endpoints.

Example

pool = ConnectionPool(max_connections=100, timeout=60.0)
await pool.start()
response = await pool._session.get("https://api.example.com")
await pool.stop()
def __init__(
    max_connections: int = 10,
    max_keepalive_connections: int = 5,
    max_connections_per_host: int = 10,
    timeout: float = 30.0,
    ttl_dns_cache: int = 300,
    force_close: bool = False,
    verify_ssl: bool = True,
    proxy: str | None = None,
    trust_env: bool = True,
    cookie_jar: bool = True
) -> None
async def start() -> None

Initialise the connection pool and create the aiohttp session.

async def stop() -> None

Close the connection pool and release all resources.

def is_started() -> bool

Return True if the pool session is active.

async def warm_up(urls: list[str]) -> None

Pre-establish connections to known endpoints.

Fires a lightweight HEAD (falling back to GET) request to each URL so that the TCP and TLS handshake has already been completed before the first real request arrives. Errors are swallowed silently — this is a best-effort optimisation, not a connectivity check.

Parameters
ParameterTypeDescription
`urls`list[str]Base URLs to pre-connect to (e.g. ``["https://api.example.com"]``).

Example

await pool.start()
await pool.warm_up(["https://api.example.com", "https://cdn.example.com"])

Configuration for the HTTP connection pool.

Attributes: max_connections: Maximum total concurrent connections (default: 10). max_keepalive_connections: Keep-alive connections per host (default: 5). max_connections_per_host: Maximum connections per individual host (default: 10). timeout: Request timeout in seconds (default: 30.0). ttl_dns_cache: DNS cache TTL in seconds (default: 300). force_close: Force-close connections after use (default: False).


Async HTTP client with connection pooling and optional resilience.

All dependencies are provided at construction time (constructor injection). When used through HTTPProvider, retry_policy and circuit_breaker are resolved from the container and injected here. For standalone usage, pass them directly or accept the defaults.

Parameters
ParameterTypeDescription
`config`Pool and client configuration. Defaults to HTTPClientConfig with framework defaults.
`pool`Optional pre-configured ConnectionPool. Created from *config* when not provided.
`retry_policy`Retry policy for transient failures. Defaults to a RetryPolicy with framework defaults.
`circuit_breaker`Optional circuit breaker; disabled when ``None``.
`interceptors`Zero or more interceptors applied to every request.
`resilience`Optional ResiliencePipelineProtocol that combines retry, circuit-breaker, bulkhead and timeout into a single composable pipeline. When provided it takes precedence over the individual ``retry_policy`` and ``circuit_breaker`` parameters.
`metrics`Optional MetricsRecorderProtocol for emitting ``http.request.duration`` (histogram) and ``http.request.status`` (counter) per-request metrics.

Example

config = HTTPClientConfig() async with HTTPClient.session_context(config) as client: … response = await client.get(“https://api.example.com/data”)

def __init__(
    config: HTTPClientConfig | None = None,
    pool: ConnectionPool | None = None,
    retry_policy: RetryPolicyProtocol | None = None,
    circuit_breaker: CircuitBreakerProtocol | None = None,
    interceptors: Iterable[InterceptorProtocol] = (),
    resilience: ResiliencePipelineProtocol | None = None,
    metrics: MetricsRecorderProtocol | None = None
) -> None
property config() -> HTTPClientConfig

Return the client configuration.

property pool() -> ConnectionPool

Return the underlying connection pool.

def pool(value: ConnectionPool) -> None

Replace the connection pool (mainly for testing).

async def start() -> None

Start the HTTP client and its connection pool.

async def stop() -> None

Stop the HTTP client and close the connection pool.

async def request(
    method: str,
    url: str,
    **kwargs: Any
) -> HttpResponse

Make an HTTP request with retry and optional circuit-breaker protection.

Note Infrastructure-level method — this method raises exceptions for all failures (circuit open, retries exhausted, connection errors). Use the high-level verb methods (get, post, etc.) in application/domain code — they return Result and avoid exception-based control flow for expected failures.

Summary of the two-layer strategy:
* ``request()`` — infrastructure layer, raises exceptions.
* ``get()``, ``post()``, etc. — domain layer, return ``Result``.
Parameters
ParameterTypeDescription
`method`strHTTP method (``GET``, ``POST``, etc.)
`url`strTarget URL. **kwargs: Additional keyword arguments forwarded to ``aiohttp``.
Returns
TypeDescription
HttpResponseFramework-owned HttpResponse.
Raises
ExceptionDescription
HTTPCircuitOpenErrorWhen the circuit breaker is open.
HTTPRetryExhaustedErrorWhen all retry attempts are exhausted.
HTTPConnectionErrorWhen the connection cannot be established.
HTTPTimeoutErrorWhen the request times out.
async def get(
    url: str,
    **kwargs: Any
) -> Result[HttpResponse, HTTPClientError]

Perform a GET request.

Parameters
ParameterTypeDescription
`url`strTarget URL. **kwargs: Forwarded to request.
Returns
TypeDescription
Result[HttpResponse, HTTPClientError]``Ok(HttpResponse)`` on 2xx; ``Err(HTTPStatusError)`` on 4xx/5xx; ``Err(HTTPConnectionError | HTTPTimeoutError)`` on transport failure.
Raises
ExceptionDescription
HTTPCircuitOpenErrorWhen the circuit breaker is open.
HTTPRetryExhaustedErrorWhen all retry attempts are exhausted.
async def post(
    url: str,
    **kwargs: Any
) -> Result[HttpResponse, HTTPClientError]

Perform a POST request.

Parameters
ParameterTypeDescription
`url`strTarget URL. **kwargs: Forwarded to request.
Returns
TypeDescription
Result[HttpResponse, HTTPClientError]``Ok(HttpResponse)`` on 2xx; ``Err(HTTPStatusError)`` on 4xx/5xx; ``Err(HTTPConnectionError | HTTPTimeoutError)`` on transport failure.
Raises
ExceptionDescription
HTTPCircuitOpenErrorWhen the circuit breaker is open.
HTTPRetryExhaustedErrorWhen all retry attempts are exhausted.
async def put(
    url: str,
    **kwargs: Any
) -> Result[HttpResponse, HTTPClientError]

Perform a PUT request.

Parameters
ParameterTypeDescription
`url`strTarget URL. **kwargs: Forwarded to request.
Returns
TypeDescription
Result[HttpResponse, HTTPClientError]``Ok(HttpResponse)`` on 2xx; ``Err(HTTPStatusError)`` on 4xx/5xx; ``Err(HTTPConnectionError | HTTPTimeoutError)`` on transport failure.
Raises
ExceptionDescription
HTTPCircuitOpenErrorWhen the circuit breaker is open.
HTTPRetryExhaustedErrorWhen all retry attempts are exhausted.
async def delete(
    url: str,
    **kwargs: Any
) -> Result[HttpResponse, HTTPClientError]

Perform a DELETE request.

Parameters
ParameterTypeDescription
`url`strTarget URL. **kwargs: Forwarded to request.
Returns
TypeDescription
Result[HttpResponse, HTTPClientError]``Ok(HttpResponse)`` on 2xx; ``Err(HTTPStatusError)`` on 4xx/5xx; ``Err(HTTPConnectionError | HTTPTimeoutError)`` on transport failure.
Raises
ExceptionDescription
HTTPCircuitOpenErrorWhen the circuit breaker is open.
HTTPRetryExhaustedErrorWhen all retry attempts are exhausted.
async def patch(
    url: str,
    **kwargs: Any
) -> Result[HttpResponse, HTTPClientError]

Perform a PATCH request.

Parameters
ParameterTypeDescription
`url`strTarget URL. **kwargs: Forwarded to request.
Returns
TypeDescription
Result[HttpResponse, HTTPClientError]``Ok(HttpResponse)`` on 2xx; ``Err(HTTPStatusError)`` on 4xx/5xx; ``Err(HTTPConnectionError | HTTPTimeoutError)`` on transport failure.
Raises
ExceptionDescription
HTTPCircuitOpenErrorWhen the circuit breaker is open.
HTTPRetryExhaustedErrorWhen all retry attempts are exhausted.
async def head(
    url: str,
    **kwargs: Any
) -> Result[HttpResponse, HTTPClientError]

Perform a HEAD request.

Parameters
ParameterTypeDescription
`url`strTarget URL. **kwargs: Forwarded to request.
Returns
TypeDescription
Result[HttpResponse, HTTPClientError]``Ok(HttpResponse)`` on 2xx; ``Err(HTTPStatusError)`` on 4xx/5xx; ``Err(HTTPConnectionError | HTTPTimeoutError)`` on transport failure.
Raises
ExceptionDescription
HTTPCircuitOpenErrorWhen the circuit breaker is open.
HTTPRetryExhaustedErrorWhen all retry attempts are exhausted.
async def post_multipart(
    url: str,
    fields: dict[str, str | bytes | tuple[str, bytes, str]],
    **kwargs: Any
) -> Result[HttpResponse, HTTPClientError]

Upload multipart/form-data.

Builds an aiohttp.FormData payload from fields and POSTs it. Each value can be:

  • str — sent as a plain text field
  • bytes — sent as a file-like binary field (filename inferred from the field name)
  • (filename, data, content_type) — full control over the part headers
Parameters
ParameterTypeDescription
`url`strTarget URL.
`fields`dict[str, str | bytes | tuple[str, bytes, str]]Mapping of field names to values. See above for supported value types. **kwargs: Additional keyword arguments forwarded to request.
Returns
TypeDescription
Result[HttpResponse, HTTPClientError]``Ok(HttpResponse)`` on 2xx; ``Err(...)`` on failure.

Example

result = await client.post_multipart(
"https://api.example.com/upload",
fields={
"description": "My file",
"file": ("report.pdf", pdf_bytes, "application/pdf"),
},
)
async def stream(
    method: str,
    url: str,
    **kwargs: Any
) -> AsyncIterator[AsyncIterator[bytes]]

Async context manager that streams the response body as raw bytes chunks.

Unlike request, this method does not buffer the entire response body in memory — suitable for large file downloads or chunked responses.

Parameters
ParameterTypeDescription
`method`strHTTP method (``GET``, ``POST``, etc.)
`url`strTarget URL. **kwargs: Additional keyword arguments forwarded to ``aiohttp``.
Yields
TypeDescription
AsyncIterator[AsyncIterator[bytes]]An AsyncIterator of ``bytes`` chunks.
Raises
ExceptionDescription
HTTPConnectionErrorWhen the connection cannot be established.
HTTPTimeoutErrorWhen the request times out.

Example

async with client.stream("GET", large_file_url) as chunks:
async for chunk in chunks:
await file.write(chunk)
async def sse(
    url: str,
    **kwargs: Any
) -> AsyncIterator[AsyncIterator[ServerSentEvent]]

Async context manager that consumes a Server-Sent Events (SSE) stream.

Opens a persistent GET connection and parses the text/event-stream response into ServerSentEvent objects.

Parameters
ParameterTypeDescription
`url`strSSE endpoint URL. **kwargs: Additional keyword arguments forwarded to ``aiohttp``.
Yields
TypeDescription
AsyncIterator[AsyncIterator[ServerSentEvent]]An AsyncIterator of ServerSentEvent objects.
Raises
ExceptionDescription
HTTPConnectionErrorWhen the connection cannot be established.

Example

from lexigram.logging import get_logger
logger = get_logger(__name__)
async with client.sse("https://api.example.com/events") as events:
async for event in events:
logger.info("sse_event", event_type=event.event, data=event.data)
async def session_context(
    cls,
    config: HTTPClientConfig | None = None,
    retry_policy: RetryPolicyProtocol | None = None,
    circuit_breaker: CircuitBreakerProtocol | None = None,
    interceptors: Iterable[InterceptorProtocol] = ()
) -> AsyncIterator[HTTPClient]

Async context manager that starts and stops the client automatically.

Parameters
ParameterTypeDescription
`config`HTTPClientConfig | NoneOptional HTTPClientConfig; framework defaults apply.
`retry_policy`RetryPolicyProtocol | NoneOptional retry policy; framework default when ``None``.
`circuit_breaker`CircuitBreakerProtocol | NoneOptional circuit breaker; disabled when ``None``.
`interceptors`Iterable[InterceptorProtocol]Zero or more interceptors.
Yields
TypeDescription
AsyncIterator[HTTPClient]A started HTTPClient instance.

Example

async with HTTPClient.session_context() as client: … response = await client.get(“https://api.example.com”)


Top-level configuration for HTTPClient.

Attributes: pool: Connection pool settings. proxy: Optional HTTP/HTTPS proxy URL (e.g. "http://proxy.example.com:8080"). Applied to all requests made by the client. trust_env: Whether to read proxy settings from environment variables (HTTP_PROXY, HTTPS_PROXY, NO_PROXY). Defaults to True — set to False to ignore environment proxies. cookie_jar: Whether to enable an in-memory cookie jar that persists cookies across requests within a single HTTPClient session. Defaults to True.


Async outbound HTTP client backed by aiohttp with connection pooling.

Call configure to configure the HTTP client with custom settings and optional resilience integration (retry, circuit breaker).

Usage (defaults)

@module(imports=[HTTPModule.configure()])
class AppModule(Module):
pass

Usage (configured)

from lexigram.http.config import HTTPClientConfig
@module(
imports=[HTTPModule.configure(HTTPClientConfig(timeout=30))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: Any | None = None
) -> DynamicModule

Create an HTTPModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneHTTPClientConfig or ``None`` for framework defaults.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(cls) -> DynamicModule

Return a no-op HTTPModule for unit testing.

Registers an HTTP client with default configuration. No real outbound connections are made unless explicitly called.

Returns
TypeDescription
DynamicModuleA DynamicModule with default HTTP client configuration.

Framework provider for the HTTP client.

Registers HTTPClient (and its HTTPClientProtocol binding) as a container-scoped singleton. Resilience dependencies are resolved from the container during boot and injected into the client — no service locator is used.

Parameters
ParameterTypeDescription
`config`Optional HTTP client configuration. Framework defaults apply when not provided.
def __init__(config: HTTPClientConfig | None = None) -> None
def from_config(
    cls,
    config: HTTPClientConfig,
    **context: Any
) -> Self

Create provider from config object.

async def register(container: ContainerRegistrarProtocol) -> None

Register HTTPClient bindings with the container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe DI registrar.
async def boot(container: ContainerResolverProtocol) -> None

Construct and start the HTTP client with injected resilience.

Resolves RetryPolicyProtocol and optionally CircuitBreakerRegistryProtocol from the container. Falls back to framework defaults when the container has no binding for them.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI resolver.
async def shutdown() -> None

Stop and dispose the HTTP client.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Return the health of the HTTP provider.

Parameters
ParameterTypeDescription
`timeout`floatUnused; retained for interface compatibility.
Returns
TypeDescription
HealthCheckResultHealthCheckResult reflecting whether the client is active.

Payload fired when an outbound HTTP request is dispatched.

Attributes: method: HTTP verb in upper-case (e.g. "GET"). url: Full URL of the outbound request.


Payload fired when an outbound HTTP response is received.

Attributes: method: HTTP verb of the originating request. url: URL that was requested. status_code: HTTP status code of the received response.


HTTP request completed successfully.

Consumed by: request metrics, audit logging, performance analysis.


Context information for an outbound HTTP request.

Used by interceptors to inspect and modify the request before it is sent.

Attributes: method: HTTP method (upper-cased). url: Target URL. headers: Request headers as a plain dict. service_name: Logical service name if known (service-mesh use cases). attempt: Current retry attempt number (0-indexed). start_time: Timestamp when the request was initiated.


HTTP request exhausted all retry attempts.

Consumed by: retry management, circuit breaker logic, error reporting.


HTTP request timed out before completion.

Consumed by: timeout tracking, performance monitoring, alerting.


Context information for a received HTTP response.

Used by interceptors to inspect response metadata.

Attributes: status: HTTP status code. headers: Response headers. content_length: Optional response body length in bytes. duration: Round-trip duration in seconds. success: True when status < 400. error: Human-readable error message when the request failed.


Wraps an aiohttp response for line-oriented streaming.

Exposes status, raise_for_status(), and aiter_lines() without leaking the underlying aiohttp type to callers.

Parameters
ParameterTypeDescription
`resp`The raw aiohttp ClientResponse, owned by the enclosing context manager for the duration of the stream.
def __init__(resp: aiohttp.ClientResponse) -> None
property status() -> int

HTTP status code of the response.

def raise_for_status() -> None

Raise HttpStatusError for 4xx/5xx.

Raises
ExceptionDescription
HttpStatusErrorWhen the status code is 400 or higher.
async def aiter_lines() -> AsyncIterator[str]

Yield decoded text lines from the streaming response body.

Yields
TypeDescription
AsyncIterator[str]Each line decoded from UTF-8 with trailing CR/LF stripped.

def build_url(
    base: str,
    path: str = '',
    params: dict[str, Any] | None = None
) -> str

Build a complete URL from base, path, and query parameters.

Parameters
ParameterTypeDescription
`base`strBase URL (e.g. ``"https://api.example.com"``).
`path`strURL path to append (e.g. ``"/users/123"``).
`params`dict[str, Any] | NoneOptional query parameters; ``None`` values are omitted.
Returns
TypeDescription
strComplete URL with path and encoded query string.

Example

build_url(“https://api.example.com”, “/users”, {“page”: 1}) ‘https://api.example.com/users?page=1’


def extract_json_type(content_type: str) -> str | None

Extract the JSON MIME type from a Content-Type header value.

Parameters
ParameterTypeDescription
`content_type`strRaw ``Content-Type`` header value.
Returns
TypeDescription
str | NoneThe base MIME type if it contains ``"json"``, otherwise ``None``.

Example

extract_json_type(“application/json; charset=utf-8”) ‘application/json’ extract_json_type(“text/html”) None


def format_timeout(timeout: float | None) -> str

Format a timeout value for human-readable display.

Parameters
ParameterTypeDescription
`timeout`float | NoneTimeout in seconds, or ``None`` for no timeout.
Returns
TypeDescription
strFormatted string such as ``"30.0s"`` or ``"no timeout"``.

def merge_headers(
    *header_dicts: Mapping[str, Any],
    normalize: bool = True
) -> dict[str, str]

Merge multiple header mappings; later entries win on duplicate keys.

Parameters
ParameterTypeDescription
`normalize`boolWhen ``True``, lowercase keys and strip values (default).
Returns
TypeDescription
dict[str, str]Merged ``dict[str, str]``.

def parse_headers(headers: Mapping[str, Any]) -> dict[str, str]

Normalise HTTP headers: lowercase keys, strip whitespace from values.

Parameters
ParameterTypeDescription
`headers`Mapping[str, Any]Any mapping of header values.
Returns
TypeDescription
dict[str, str]Normalised ``dict[str, str]``.

Example

parse_headers({“Content-Type”: ” application/json ”}) {‘content-type’: ‘application/json’}


def parse_url_parts(url: str) -> dict[str, Any]

Parse a URL into its component parts.

Parameters
ParameterTypeDescription
`url`strFull URL to parse.
Returns
TypeDescription
dict[str, Any]Dictionary with keys ``scheme``, ``host``, ``port``, ``path``, ``params``, ``fragment``.

def validate_host(host: str) -> None

Validate a hostname or IPv4 address.

Parameters
ParameterTypeDescription
`host`strHostname or IP address to validate.
Raises
ExceptionDescription
HTTPErrorIf the host is empty or has an invalid format.

def validate_port(port: int) -> None

Validate a TCP port number.

Parameters
ParameterTypeDescription
`port`intPort number (must be 1–65535).
Raises
ExceptionDescription
HTTPErrorIf the port is not an integer or is out of range.

def validate_positive_int(
    value: int,
    field: str = 'value'
) -> None

Validate that value is a positive integer.

Parameters
ParameterTypeDescription
`value`intInteger to validate.
`field`strField name used in the error message.
Raises
ExceptionDescription
HTTPErrorIf ``value`` is not a positive integer.

def validate_timeout(timeout: float | None) -> None

Validate a timeout value.

Parameters
ParameterTypeDescription
`timeout`float | NoneTimeout in seconds, or ``None`` for no timeout.
Raises
ExceptionDescription
HTTPErrorIf ``timeout`` is not a positive number.

def validate_url(
    url: str,
    require_scheme: bool = True
) -> None

Validate a URL string.

Parameters
ParameterTypeDescription
`url`strURL to validate.
`require_scheme`boolWhen ``True``, ``http``/``https`` scheme is required.
Raises
ExceptionDescription
HTTPErrorIf the URL is missing, malformed, or missing a required scheme.

Raised when the circuit breaker is open and requests are rejected.

Base exception for all HTTP client errors.

Raised when a connection to a remote host fails.

Raised when a request/response interceptor fails.

Raised when all retry attempts for a request have been exhausted.

Raised when a connection or request times out.