Skip to content
GitHubDiscord

API Reference

Protocol for blob storage operations.

This interface defines the contract for file storage backends (S3, GCS, Azure Blob, local filesystem, etc.).

Example

class S3BlobStore:
async def upload(self, path: str, data: bytes, **options) -> FileInfo:
await self._client.put_object(Bucket=self._bucket, Key=path, Body=data)
return FileInfo(path=path, size=len(data), ...)
async def upload(
    path: str,
    data: bytes | AsyncIterator[bytes],
    content_type: str | None = None,
    **options: Any
) -> FileInfo

Upload data to the storage backend.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`data`bytes | AsyncIterator[bytes]File content as bytes or async iterator.
`content_type`str | NoneMIME type of the content. **options: Additional upload options.
Returns
TypeDescription
FileInfoFileInfo with path, size, and metadata.
async def download(path: str) -> bytes

Download file content into memory.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
bytesFile content as bytes.
def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file content (memory efficient).

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`chunk_size`intSize of each chunk in bytes.
Yields
TypeDescription
AsyncIterator[bytes]File content in chunks.
async def delete(path: str) -> None

Delete a file.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
async def exists(path: str) -> bool

Check if file exists.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
boolTrue if file exists.
async def info(path: str) -> FileInfo

Get file metadata.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
FileInfoFileInfo with size, content_type, etc.
def list(prefix: str = '') -> AsyncIterator[FileInfo]

List files with a given prefix.

Parameters
ParameterTypeDescription
`prefix`strPath prefix to filter by.
Yields
TypeDescription
AsyncIterator[FileInfo]FileInfo for each matching file.
async def get_url(path: str) -> str

Get public URL (if applicable).

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
strPublic URL string.
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Get a temporary secure URL.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`expires_in`timedeltaURL validity window as a timedelta (default one hour). Pass ``timedelta(minutes=5)`` for secure short-lived downloads or ``timedelta(hours=24)`` for bulk exports.
`method`strHTTP method (GET or PUT).
Returns
TypeDescription
strPresigned URL string.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Azure Blob Storage driver.

Wraps azure-storage-blob (async variant) to provide fully async Blob Storage operations.

Install the optional dependency with

pip install lexigram-storage[azure]
# i.e. azure-storage-blob>=12.20.0
Parameters
ParameterTypeDescription
`account_name`Azure storage account name.
`account_key`Storage account access key (plain string or ``SecretStr``).
`container`Blob container (equivalent to a bucket).
def __init__(
    account_name: str,
    account_key: str | SecretStr,
    container: str
) -> None

Initialise the Azure Blob Storage driver.

Parameters
ParameterTypeDescription
`account_name`strAzure storage account name.
`account_key`str | SecretStrStorage account access key.
`container`strBlob container name.
Raises
ExceptionDescription
ImportErrorWhen ``azure-storage-blob`` is not installed.
async def upload(
    path: str,
    data: Uploadable,
    options: UploadOptions | None = None
) -> FileInfo

Upload data to Azure Blob Storage at path.

Parameters
ParameterTypeDescription
`path`strDestination blob name / key.
`data`UploadableContent to upload.
`options`UploadOptions | NoneOptional ``UploadOptions`` (content-type, visibility, metadata).
Returns
TypeDescription
FileInfoFileInfo for the stored blob.
Raises
ExceptionDescription
StorageErrorOn any Azure SDK error.
async def download(path: str) -> bytes

Download the blob at path into memory.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
Returns
TypeDescription
bytesRaw file bytes.
Raises
ExceptionDescription
FileNotFoundErrorWhen the blob does not exist.
StorageErrorOn any other Azure SDK error.
async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Yield successive chunks from the Azure blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
`chunk_size`intBytes per chunk (default 8 KiB).
Yields
TypeDescription
AsyncIterator[bytes]Raw byte chunks.
async def delete(path: str) -> None

Delete the Azure blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
Raises
ExceptionDescription
FileNotFoundErrorWhen the blob does not exist.
StorageErrorOn any other Azure SDK error.
async def exists(path: str) -> bool

Return True if path exists in the Azure container.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
async def info(path: str) -> FileInfo

Return metadata for the Azure blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
Returns
TypeDescription
FileInfoFileInfo.
Raises
ExceptionDescription
FileNotFoundErrorWhen the blob does not exist.
StorageErrorOn any other Azure SDK error.
async def list(prefix: str = '') -> AsyncIterator[FileInfo]

Yield FileInfo for blobs matching prefix.

Parameters
ParameterTypeDescription
`prefix`strBlob name prefix filter (empty string lists all blobs).
Yields
TypeDescription
AsyncIterator[FileInfo]FileInfo entries.
Raises
ExceptionDescription
StorageErrorOn any Azure SDK error.
async def get_url(path: str) -> str

Return a public URL for the Azure blob at path.

The URL is only accessible when the container has public access enabled. For private containers, use get_presigned_url.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Return a SAS (Shared Access Signature) URL for the blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
`expires_in`timedeltaValidity window (default one hour).
`method`strHTTP verb (``"GET"`` for read, ``"PUT"`` for write).
Returns
TypeDescription
strA time-limited SAS URL string.
Raises
ExceptionDescription
StorageErrorWhen SAS token generation fails.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check against Azure Blob Storage.

Lists at most one blob from the container to verify credentials and container accessibility.

Returns
TypeDescription
HealthCheckResultHealthCheckResult.

File was successfully deleted from storage.

Consumed by: file cleanup tracking, audit logging, quota reclamation.


File was successfully downloaded from storage.

Consumed by: download tracking, audit logging, usage analytics.


Information about a stored file.

File was successfully uploaded to storage.

Consumed by: file tracking, audit logging, quota management.


Google Cloud Storage driver.

Wraps gcloud-aio-storage to provide fully async GCS operations.

Install the optional dependency with

pip install lexigram-storage[gcs]
# i.e. gcloud-aio-storage>=9.0.0
Parameters
ParameterTypeDescription
`bucket`GCS bucket name.
`project_id`Google Cloud project ID (used for logging / context only; the gcloud library resolves the project from ADC when needed).
`credentials_path`Path to a service-account JSON key file. When ``None`` the driver falls back to Application Default Credentials (ADC), which covers Cloud Run, GKE Workload Identity, etc.
def __init__(
    bucket: str,
    project_id: str | None = None,
    credentials_path: str | None = None
) -> None

Initialise the GCS driver.

Parameters
ParameterTypeDescription
`bucket`strGCS bucket name.
`project_id`str | NoneGoogle Cloud project ID (informational; ADC handles auth).
`credentials_path`str | NonePath to service-account JSON credentials file, or ``None`` to use Application Default Credentials.
Raises
ExceptionDescription
ImportErrorWhen ``gcloud-aio-storage`` is not installed.
async def upload(
    path: str,
    data: Uploadable,
    options: UploadOptions | None = None
) -> FileInfo

Upload data to the GCS path.

Parameters
ParameterTypeDescription
`path`strDestination object name / key.
`data`UploadableContent to upload.
`options`UploadOptions | NoneOptional ``UploadOptions`` (content-type, visibility, metadata).
Returns
TypeDescription
FileInfoFileInfo for the stored object.
Raises
ExceptionDescription
StorageErrorOn any GCS API error.
async def download(path: str) -> bytes

Download the object at path into memory.

Parameters
ParameterTypeDescription
`path`strObject name / key.
Returns
TypeDescription
bytesRaw file bytes.
Raises
ExceptionDescription
FileNotFoundErrorWhen the object does not exist.
StorageErrorOn any other GCS API error.
async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Yield successive chunks from the GCS object at path.

Downloads the entire object into memory first, then yields chunks from the in-memory buffer. For objects larger than available RAM, prefer using a streaming-capable GCS library.

Parameters
ParameterTypeDescription
`path`strObject name / key.
`chunk_size`intBytes per chunk (default 8 KiB).
Yields
TypeDescription
AsyncIterator[bytes]Raw byte chunks.
async def delete(path: str) -> None

Delete the GCS object at path.

Parameters
ParameterTypeDescription
`path`strObject name / key.
Raises
ExceptionDescription
FileNotFoundErrorWhen the object does not exist.
StorageErrorOn any other GCS API error.
async def exists(path: str) -> bool

Return True if path exists in the GCS bucket.

Parameters
ParameterTypeDescription
`path`strObject name / key.
async def info(path: str) -> FileInfo

Return metadata for the GCS object at path.

Parameters
ParameterTypeDescription
`path`strObject name / key.
Returns
TypeDescription
FileInfoFileInfo.
Raises
ExceptionDescription
FileNotFoundErrorWhen the object does not exist.
StorageErrorOn any other GCS API error.
async def list(prefix: str = '') -> AsyncIterator[FileInfo]

Yield FileInfo for objects matching prefix.

Parameters
ParameterTypeDescription
`prefix`strKey prefix filter (empty string lists all objects).
Yields
TypeDescription
AsyncIterator[FileInfo]FileInfo entries.
Raises
ExceptionDescription
StorageErrorOn any GCS API error.
async def get_url(path: str) -> str

Return a public URL for the GCS object at path.

The URL is only accessible when the object (or bucket) has been made publicly readable. For private objects, use get_presigned_url.

Parameters
ParameterTypeDescription
`path`strObject name / key.
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Return a signed URL for the GCS object at path.

Uses the V4 signing API from gcloud-aio-storage when available, falling back to an unsigned public URL for read access.

Parameters
ParameterTypeDescription
`path`strObject name / key.
`expires_in`timedeltaValidity window (default one hour).
`method`strHTTP verb (``"GET"`` or ``"PUT"``).
Returns
TypeDescription
strA signed (or public) URL string.
Raises
ExceptionDescription
StorageErrorWhen URL signing fails.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check against GCS.

Lists at most one object from the bucket to verify credentials and bucket accessibility.

Returns
TypeDescription
HealthCheckResultHealthCheckResult.

Local file system storage driver
def __init__(
    root_dir: str = './storage',
    base_url: str = 'http://localhost:8000/storage'
)
async def upload(
    path: str,
    data: Uploadable,
    options: UploadOptions | None = None
) -> FileInfo

Upload data to local file system with atomic write guarantee.

Uses temp file + atomic rename to ensure no partial writes on failure. Optionally validates checksum if provided in options.

async def download(path: str) -> bytes

Download file content from local file system

async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file content from local file system

async def delete(path: str) -> None

Delete file from local file system

async def exists(path: str) -> bool

Check if file exists in local file system

async def info(path: str) -> FileInfo

Get file info from local file system

async def list(prefix: str = '') -> AsyncIterator[FileInfo]

List files with prefix from local file system

async def get_url(path: str) -> str

Get public URL for local file

async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Get pre-signed URL for local file (same as public URL).

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check on local storage

async def copy(
    src: str,
    dst: str
) -> FileInfo

Copy a file efficiently within the local filesystem.

async def move(
    src: str,
    dst: str
) -> FileInfo

Move a file efficiently within the local filesystem.


In-memory storage driver using a dictionary
def __init__() -> None
async def upload(
    path: str,
    data: Uploadable,
    options: UploadOptions | None = None
) -> FileInfo

Upload data to memory storage

async def download(path: str) -> bytes

Download file content from memory

async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file content from memory

async def delete(path: str) -> None

Delete file from memory

async def exists(path: str) -> bool

Check if file exists in memory

async def info(path: str) -> FileInfo

Get file info from memory

async def list(prefix: str = '') -> AsyncIterator[FileInfo]

List files with prefix from memory

async def get_url(path: str) -> str

Get URL for memory storage (not applicable)

async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Raise StorageUnsupportedOperationError — not applicable to in-memory storage.

The in-memory driver has no network endpoint, so presigned URLs cannot be generated. Use a cloud-backed driver (S3, GCS, Azure) for presigned URL support.

Parameters
ParameterTypeDescription
`path`strFile path that would have been signed.
`expires_in`timedeltaIgnored.
`method`strIgnored.
Raises
ExceptionDescription
StorageUnsupportedOperationErrorAlways — presigned URLs are not supported by the in-memory driver.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check on memory storage


Configuration for a single named storage backend.

Used in StorageConfig.backends to declare multiple blob stores that the framework registers as named DI bindings.

Example

backends:

  • name: primary driver: s3 primary: true s3: bucket: my-app-primary region: us-east-1
  • name: avatars driver: s3 s3: bucket: my-app-avatars region: us-east-1
  • name: local driver: local local: root_dir: ./storage/public
Parameters
ParameterTypeDescription
`name`Unique backend identifier. Used as the Named() DI key.
`primary`Whether this is the primary backend. Primary backends also receive the unnamed BlobStoreProtocol binding.
`driver`Storage driver name. One of: local, s3, gcs, azure, r2, memory.
`local`Local filesystem driver config (when driver='local').
`s3`AWS S3 driver config (when driver='s3').
`gcs`Google Cloud Storage config (when driver='gcs').
`azure`Azure Blob Storage config (when driver='azure').
`r2`Cloudflare R2 config (when driver='r2').
`memory`In-memory storage config (when driver='memory').

Payload fired when a blob object is deleted.

Attributes: bucket: Name of the bucket or container the object was deleted from. key: Storage key (path) of the deleted object.


Payload fired when a blob object is successfully stored.

Attributes: bucket: Name of the bucket or container the object was stored in. key: Storage key (path) of the stored object.


AWS S3 storage driver with multipart upload for large files
def __init__(
    bucket: str,
    region: str | None = None,
    access_key: str | None = None,
    secret_key: str | None = None,
    endpoint_url: str | None = None,
    multipart_threshold: int = DEFAULT_MULTIPART_THRESHOLD,
    multipart_chunk_size: int = DEFAULT_MULTIPART_CHUNK_SIZE,
    encryption: EncryptionConfig | None = None
)

Initialize S3 driver.

Parameters
ParameterTypeDescription
`bucket`strS3 bucket name
`region`str | NoneAWS region
`access_key`str | NoneAWS access key (optional if using IAM roles)
`secret_key`str | NoneAWS secret key (optional if using IAM roles)
`endpoint_url`str | NoneCustom endpoint URL (for MinIO, LocalStack, etc.)
`multipart_threshold`intFile size threshold for multipart upload (bytes)
`multipart_chunk_size`intChunk size for multipart upload (bytes)
`encryption`EncryptionConfig | NoneOptional server-side encryption configuration. When ``enabled=True``, all ``put_object`` and ``create_multipart_upload`` calls will include the appropriate ``ServerSideEncryption`` / ``SSEKMSKeyId`` parameters.
async def download(path: str) -> bytes

Download file content from S3 (robust to different body shapes).

Some S3 client implementations return an async context manager for the response body while others provide an object with a read() coroutine method. Handle both shapes defensively and return bytes.

async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file content from S3

async def delete(path: str) -> None

Delete file from S3

async def exists(path: str) -> bool

Check if file exists in S3

async def info(path: str) -> FileInfo

Get file info from S3

async def list(prefix: str = '') -> AsyncIterator[FileInfo]

List files with prefix from S3

async def get_url(path: str) -> str

Get public URL for S3 object

async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Get pre-signed URL for S3 object.

Parameters
ParameterTypeDescription
`path`strStorage path / object key.
`expires_in`timedeltaValidity window (default one hour).
`method`strHTTP verb — ``"GET"`` or ``"PUT"``.
Returns
TypeDescription
strPre-signed URL string.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check on S3 storage.


Hierarchical root configuration for Lexigram Storage.

Attributes: name: Configuration name (default: “storage”) enabled: Whether storage module is enabled default_driver: Default storage driver (local, s3, gcs, azure, memory) drivers: Driver-specific configurations service: Storage operation settings

Example

# From environment variables (``LEX_STORAGE__*``)
config = StorageConfig()
# Explicit values
config = StorageConfig(default_driver="s3", drivers={"s3": StorageS3Config(...)})
def from_named(
    cls,
    entry: NamedStorageConfig
) -> StorageConfig

Build a single-backend StorageConfig from a NamedStorageConfig entry.

Used internally by StorageProvider to create per-backend configs from a multi-backend declaration. The resulting config has the driver-specific config placed in the drivers dict under the driver type key, so the existing DriverRegistry can consume it unchanged.

Parameters
ParameterTypeDescription
`entry`NamedStorageConfigThe named backend entry to materialise.
Returns
TypeDescription
StorageConfigA StorageConfig configured for the single named backend.
def validate_production_security() -> StorageConfig

Block insecure storage configurations in production.

This validator fires when the LEX_ENV environment variable is set to "production" (case-insensitive). It rejects known-weak placeholder credentials ("change-me", "password", etc.) in S3 and Azure driver configs, raising ValueError immediately so the application fails fast at startup rather than leaking credentials at request time.

The environment variable checked is LEX_ENV (default: "development"). Set LEX_ENV=production in your deployment environment to activate production-grade security checks.


Object storage backend (S3, GCS, Azure, R2, local filesystem).

Registers BlobStoreProtocol for constructor injection.

Call configure to configure and register a storage driver, or stub for an isolated in-memory setup with no external service dependencies.

Usage

from lexigram.storage.config import StorageConfig
@module(
imports=[StorageModule.configure(StorageConfig(default_driver="s3"))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: StorageConfig | Any | None = None,
    enable_encryption: bool = False
) -> DynamicModule

Create a StorageModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`StorageConfig | Any | NoneStorageConfig or ``None`` to resolve config from the container at boot time.
`enable_encryption`boolEnable server-side encryption for all stored objects. Defaults to ``False``; configure encryption keys via EncryptionConfig.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
Raises
ExceptionDescription
TypeErrorIf *config* is not a ``StorageConfig`` or ``None``.
def stub(
    cls,
    config: StorageConfig | None = None
) -> DynamicModule

Create a StorageModule suitable for unit and integration testing.

Uses an in-memory backend with no external service dependencies.

Parameters
ParameterTypeDescription
`config`StorageConfig | NoneOptional StorageConfig override. Uses safe in-memory defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

DI provider for storage services.

Registers both the DriverRegistry (for extensibility) and the configured BlobStoreProtocol driver singleton into the container.

Usage

# Pass config directly
app.add_provider(StorageProvider(config=StorageConfig()))
# Or let the provider resolve config from the container (requires ConfigProvider)
app.add_provider(StorageProvider())
# Later in a service:
class MyService:
def __init__(self, store: BlobStoreProtocol) -> None: ...
def __init__(config: StorageConfig | None = None) -> None
property config() -> StorageConfig | None

Return the storage configuration.

def from_config(
    cls,
    config: StorageConfig,
    **context: Any
) -> Self

Create provider from config object.

async def register(container: ContainerRegistrarProtocol) -> None

Register storage services into the DI container.

If no config was supplied at construction time, resolves it from the container via ConfigProtocol. Binds:

  • DriverRegistry — the extensible driver factory (singleton).
  • BlobStoreProtocol — the configured backend driver instance (singleton). In multi-backend mode, also registers named bindings.
Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolDI registrar supplied by the framework.
async def boot(container: ContainerResolverProtocol) -> None

Startup hook — verify storage connectivity after all registrations.

async def shutdown() -> None

Shutdown hook - cleanup resources owned by driver.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check for storage.


Options for file uploads.