Skip to content
GitHub

API Reference

Protocol for blob storage operations.

This interface defines the contract for file storage backends (S3, GCS, Azure Blob, local filesystem, etc.).

Example

class S3BlobStore:
async def upload(self, path: str, data: bytes, **options) -> FileInfo:
await self._client.put_object(Bucket=self._bucket, Key=path, Body=data)
return FileInfo(path=path, size=len(data), ...)
upload
async def upload(
    path: str,
    data: bytes | AsyncIterator[bytes],
    content_type: str | None = None,
    **options: Any
) -> FileInfo

Upload data to the storage backend.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`data`bytes | AsyncIterator[bytes]File content as bytes or async iterator.
`content_type`str | NoneMIME type of the content. **options: Additional upload options.
Returns
TypeDescription
FileInfoFileInfo with path, size, and metadata.
download
async def download(path: str) -> bytes

Download file content into memory.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
bytesFile content as bytes.
stream
def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file content (memory efficient).

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`chunk_size`intSize of each chunk in bytes.
Yields
TypeDescription
AsyncIterator[bytes]File content in chunks.
delete
async def delete(path: str) -> None

Delete a file.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
exists
async def exists(path: str) -> bool

Check if file exists.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
boolTrue if file exists.
info
async def info(path: str) -> FileInfo

Get file metadata.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
FileInfoFileInfo with size, content_type, etc.
list
def list(prefix: str = '') -> AsyncIterator[FileInfo]

List files with a given prefix.

Parameters
ParameterTypeDescription
`prefix`strPath prefix to filter by.
Yields
TypeDescription
AsyncIterator[FileInfo]FileInfo for each matching file.
get_url
async def get_url(path: str) -> str

Get public URL (if applicable).

Parameters
ParameterTypeDescription
`path`strStorage path/key.
Returns
TypeDescription
strPublic URL string.
get_presigned_url
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Get a temporary secure URL.

Parameters
ParameterTypeDescription
`path`strStorage path/key.
`expires_in`timedeltaURL validity window as a timedelta (default one hour). Pass ``timedelta(minutes=5)`` for secure short-lived downloads or ``timedelta(hours=24)`` for bulk exports.
`method`strHTTP method (GET or PUT).
Returns
TypeDescription
strPresigned URL string.
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check.

Returns
TypeDescription
HealthCheckResultStructured health check result.

Azure Blob Storage driver.

Wraps azure-storage-blob (async variant) to provide fully async Blob Storage operations.

Install the optional dependency with

pip install lexigram-storage[azure]
# i.e. azure-storage-blob>=12.20.0
pip install lexigram-storage[azure]
# i.e. azure-storage-blob>=12.20.0
Parameters
ParameterTypeDescription
`account_name`Azure storage account name.
`account_key`Storage account access key (plain string or ``SecretStr``).
`container`Blob container (equivalent to a bucket).
__init__
def __init__(
    account_name: str,
    account_key: str | SecretStr,
    container: str
) -> None

Initialise the Azure Blob Storage driver.

Parameters
ParameterTypeDescription
`account_name`strAzure storage account name.
`account_key`str | SecretStrStorage account access key.
`container`strBlob container name.
Raises
ExceptionDescription
ImportErrorWhen ``azure-storage-blob`` is not installed.
upload
async def upload(
    path: str,
    data: Uploadable,
    content_type: str | None = None,
    **options: Any
) -> FileInfo

Upload data to Azure Blob Storage at path.

Parameters
ParameterTypeDescription
`path`strDestination blob name / key.
`data`UploadableContent to upload.
`content_type`str | NoneOptional MIME type override. **options: Additional upload options.
Returns
TypeDescription
FileInfoFileInfo for the stored blob.
Raises
ExceptionDescription
StorageErrorOn any Azure SDK error.
download
async def download(path: str) -> bytes

Download the blob at path into memory.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
Returns
TypeDescription
bytesRaw file bytes.
Raises
ExceptionDescription
FileNotFoundErrorWhen the blob does not exist.
StorageErrorOn any other Azure SDK error.
stream
async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncGenerator[bytes, None]

Yield successive chunks from the Azure blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
`chunk_size`intBytes per chunk (default 8 KiB).
Yields
TypeDescription
AsyncGenerator[bytes, None]Raw byte chunks.
delete
async def delete(path: str) -> None

Delete the Azure blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
Raises
ExceptionDescription
FileNotFoundErrorWhen the blob does not exist.
StorageErrorOn any other Azure SDK error.
exists
async def exists(path: str) -> bool

Return True if path exists in the Azure container.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
info
async def info(path: str) -> FileInfo

Return metadata for the Azure blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
Returns
TypeDescription
FileInfoFileInfo.
Raises
ExceptionDescription
FileNotFoundErrorWhen the blob does not exist.
StorageErrorOn any other Azure SDK error.
list
async def list(prefix: str = '') -> AsyncGenerator[FileInfo, None]

Yield FileInfo for blobs matching prefix.

Parameters
ParameterTypeDescription
`prefix`strBlob name prefix filter (empty string lists all blobs).
Yields
TypeDescription
AsyncGenerator[FileInfo, None]FileInfo entries.
Raises
ExceptionDescription
StorageErrorOn any Azure SDK error.
get_url
async def get_url(path: str) -> str

Return a public URL for the Azure blob at path.

The URL is only accessible when the container has public access enabled. For private containers, use get_presigned_url.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
get_presigned_url
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Return a SAS (Shared Access Signature) URL for the blob at path.

Parameters
ParameterTypeDescription
`path`strBlob name / key.
`expires_in`timedeltaValidity window (default one hour).
`method`strHTTP verb (``"GET"`` for read, ``"PUT"`` for write).
Returns
TypeDescription
strA time-limited SAS URL string.
Raises
ExceptionDescription
StorageErrorWhen SAS token generation fails.
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check against Azure Blob Storage.

Lists at most one blob from the container to verify credentials and container accessibility.

Returns
TypeDescription
HealthCheckResultHealthCheckResult.

File was successfully deleted from storage.

Consumed by: file cleanup tracking, audit logging, quota reclamation.


File was successfully downloaded from storage.

Consumed by: download tracking, audit logging, usage analytics.


Information about a stored file.

File was successfully uploaded to storage.

Consumed by: file tracking, audit logging, quota management.


Google Cloud Storage driver.

Wraps gcloud-aio-storage to provide fully async GCS operations.

Install the optional dependency with

pip install lexigram-storage[gcs]
# i.e. gcloud-aio-storage>=9.0.0
pip install lexigram-storage[gcs]
# i.e. gcloud-aio-storage>=9.0.0
Parameters
ParameterTypeDescription
`bucket`GCS bucket name.
`project_id`Google Cloud project ID (used for logging / context only; the gcloud library resolves the project from ADC when needed).
`credentials_path`Path to a service-account JSON key file. When ``None`` the driver falls back to Application Default Credentials (ADC), which covers Cloud Run, GKE Workload Identity, etc.
__init__
def __init__(
    bucket: str,
    project_id: str | None = None,
    credentials_path: str | None = None
) -> None

Initialise the GCS driver.

Parameters
ParameterTypeDescription
`bucket`strGCS bucket name.
`project_id`str | NoneGoogle Cloud project ID (informational; ADC handles auth).
`credentials_path`str | NonePath to service-account JSON credentials file, or ``None`` to use Application Default Credentials.
Raises
ExceptionDescription
ImportErrorWhen ``gcloud-aio-storage`` is not installed.
upload
async def upload(
    path: str,
    data: Uploadable,
    content_type: str | None = None,
    **options: Any
) -> FileInfo

Upload data to the GCS path.

Parameters
ParameterTypeDescription
`path`strDestination object name / key.
`data`UploadableContent to upload.
`content_type`str | NoneOptional MIME type override. **options: Additional upload options.
Returns
TypeDescription
FileInfoFileInfo for the stored object.
Raises
ExceptionDescription
StorageErrorOn any GCS API error.
download
async def download(path: str) -> bytes

Download the object at path into memory.

Parameters
ParameterTypeDescription
`path`strObject name / key.
Returns
TypeDescription
bytesRaw file bytes.
Raises
ExceptionDescription
FileNotFoundErrorWhen the object does not exist.
StorageErrorOn any other GCS API error.
stream
async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncGenerator[bytes, None]

Yield successive chunks from the GCS object at path.

Downloads the entire object into memory first, then yields chunks from the in-memory buffer. For objects larger than available RAM, prefer using a streaming-capable GCS library.

Parameters
ParameterTypeDescription
`path`strObject name / key.
`chunk_size`intBytes per chunk (default 8 KiB).
Yields
TypeDescription
AsyncGenerator[bytes, None]Raw byte chunks.
delete
async def delete(path: str) -> None

Delete the GCS object at path.

Parameters
ParameterTypeDescription
`path`strObject name / key.
Raises
ExceptionDescription
FileNotFoundErrorWhen the object does not exist.
StorageErrorOn any other GCS API error.
exists
async def exists(path: str) -> bool

Return True if path exists in the GCS bucket.

Parameters
ParameterTypeDescription
`path`strObject name / key.
info
async def info(path: str) -> FileInfo

Return metadata for the GCS object at path.

Parameters
ParameterTypeDescription
`path`strObject name / key.
Returns
TypeDescription
FileInfoFileInfo.
Raises
ExceptionDescription
FileNotFoundErrorWhen the object does not exist.
StorageErrorOn any other GCS API error.
list
async def list(prefix: str = '') -> AsyncGenerator[FileInfo, None]

Yield FileInfo for objects matching prefix.

Parameters
ParameterTypeDescription
`prefix`strKey prefix filter (empty string lists all objects).
Yields
TypeDescription
AsyncGenerator[FileInfo, None]FileInfo entries.
Raises
ExceptionDescription
StorageErrorOn any GCS API error.
get_url
async def get_url(path: str) -> str

Return a public URL for the GCS object at path.

The URL is only accessible when the object (or bucket) has been made publicly readable. For private objects, use get_presigned_url.

Parameters
ParameterTypeDescription
`path`strObject name / key.
get_presigned_url
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Return a signed URL for the GCS object at path.

Uses the V4 signing API from gcloud-aio-storage when available, falling back to an unsigned public URL for read access.

Parameters
ParameterTypeDescription
`path`strObject name / key.
`expires_in`timedeltaValidity window (default one hour).
`method`strHTTP verb (``"GET"`` or ``"PUT"``).
Returns
TypeDescription
strA signed (or public) URL string.
Raises
ExceptionDescription
StorageErrorWhen URL signing fails.
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform a lightweight connectivity check against GCS.

Lists at most one object from the bucket to verify credentials and bucket accessibility.

Returns
TypeDescription
HealthCheckResultHealthCheckResult.

Local file system storage driver
__init__
def __init__(
    root_dir: str = './storage',
    base_url: str = 'http://localhost:8000/storage'
)
upload
async def upload(
    path: str,
    data: Uploadable,
    content_type: str | None = None,
    **options: Any
) -> FileInfo

Upload data to local file system with atomic write guarantee.

Uses temp file + atomic rename to ensure no partial writes on failure. Optionally validates checksum if provided in options.

download
async def download(path: str) -> bytes

Download file content from local file system

stream
async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncGenerator[bytes, None]

Stream file content from local file system

delete
async def delete(path: str) -> None

Delete file from local file system

exists
async def exists(path: str) -> bool

Check if file exists in local file system

info
async def info(path: str) -> FileInfo

Get file info from local file system

list
async def list(prefix: str = '') -> AsyncGenerator[FileInfo, None]

List files with prefix from local file system

get_url
async def get_url(path: str) -> str

Get public URL for local file

get_presigned_url
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Get pre-signed URL for local file (same as public URL).

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check on local storage

copy
async def copy(
    src: str,
    dst: str
) -> FileInfo

Copy a file efficiently within the local filesystem.

move
async def move(
    src: str,
    dst: str
) -> FileInfo

Move a file efficiently within the local filesystem.


In-memory storage driver using a dictionary
__init__
def __init__() -> None
upload
async def upload(
    path: str,
    data: Uploadable,
    content_type: str | None = None,
    **options: Any
) -> FileInfo

Upload data to memory storage

download
async def download(path: str) -> bytes

Download file content from memory

stream
async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncGenerator[bytes, None]

Stream file content from memory

delete
async def delete(path: str) -> None

Delete file from memory

exists
async def exists(path: str) -> bool

Check if file exists in memory

info
async def info(path: str) -> FileInfo

Get file info from memory

list
async def list(prefix: str = '') -> AsyncGenerator[FileInfo, None]

List files with prefix from memory

get_url
async def get_url(path: str) -> str

Get URL for memory storage (not applicable)

get_presigned_url
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Raise StorageUnsupportedOperationError — not applicable to in-memory storage.

The in-memory driver has no network endpoint, so presigned URLs cannot be generated. Use a cloud-backed driver (S3, GCS, Azure) for presigned URL support.

Parameters
ParameterTypeDescription
`path`strFile path that would have been signed.
`expires_in`timedeltaIgnored.
`method`strIgnored.
Raises
ExceptionDescription
StorageUnsupportedOperationErrorAlways — presigned URLs are not supported by the in-memory driver.
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check on memory storage


Configuration for a single named storage backend.

Used in StorageConfig.backends to declare multiple blob stores that the framework registers as named DI bindings.

Example

backends:

  • name: primary driver: s3 primary: true s3: bucket: my-app-primary region: us-east-1
  • name: avatars driver: s3 s3: bucket: my-app-avatars region: us-east-1
  • name: local driver: local local: root_dir: ./storage/public
Parameters
ParameterTypeDescription
`name`Unique backend identifier. Used as the Named() DI key.
`primary`Whether this is the primary backend. Primary backends also receive the unnamed BlobStoreProtocol binding.
`driver`Storage driver name. One of: local, s3, gcs, azure, r2, memory.
`local`Local filesystem driver config (when driver='local').
`s3`AWS S3 driver config (when driver='s3').
`gcs`Google Cloud Storage config (when driver='gcs').
`azure`Azure Blob Storage config (when driver='azure').
`r2`Cloudflare R2 config (when driver='r2').
`memory`In-memory storage config (when driver='memory').

Payload fired when a blob object is deleted.

Attributes: bucket: Name of the bucket or container the object was deleted from. key: Storage key (path) of the deleted object.


Payload fired when a blob object is successfully stored.

Attributes: bucket: Name of the bucket or container the object was stored in. key: Storage key (path) of the stored object.


AWS S3 storage driver with multipart upload for large files
__init__
def __init__(
    bucket: str,
    region: str | None = None,
    access_key: str | None = None,
    secret_key: str | None = None,
    endpoint_url: str | None = None,
    public_url: str | None = None,
    multipart_threshold: int = DEFAULT_MULTIPART_THRESHOLD,
    multipart_chunk_size: int = DEFAULT_MULTIPART_CHUNK_SIZE,
    encryption: EncryptionConfig | None = None
)

Initialize S3 driver.

Parameters
ParameterTypeDescription
`bucket`strS3 bucket name
`region`str | NoneAWS region
`access_key`str | NoneAWS access key (optional if using IAM roles)
`secret_key`str | NoneAWS secret key (optional if using IAM roles)
`endpoint_url`str | NoneCustom endpoint URL (for MinIO, LocalStack, R2 API, etc.)
`public_url`str | NoneCustom public URL for serving files (e.g., R2 custom domain). When set, get_url() returns this instead of the endpoint_url.
`multipart_threshold`intFile size threshold for multipart upload (bytes)
`multipart_chunk_size`intChunk size for multipart upload (bytes)
`encryption`EncryptionConfig | NoneOptional server-side encryption configuration. When ``enabled=True``, all ``put_object`` and ``create_multipart_upload`` calls will include the appropriate ``ServerSideEncryption`` / ``SSEKMSKeyId`` parameters.
download
async def download(path: str) -> bytes

Download file content from S3 (robust to different body shapes).

Some S3 client implementations return an async context manager for the response body while others provide an object with a read() coroutine method. Handle both shapes defensively and return bytes.

stream
async def stream(
    path: str,
    chunk_size: int = 8192
) -> AsyncGenerator[bytes, None]

Stream file content from S3

delete
async def delete(path: str) -> None

Delete file from S3

exists
async def exists(path: str) -> bool

Check if file exists in S3

info
async def info(path: str) -> FileInfo

Get file info from S3

list
async def list(prefix: str = '') -> AsyncGenerator[FileInfo, None]

List files with prefix from S3

get_url
async def get_url(path: str) -> str

Get public URL for S3 object

get_presigned_url
async def get_presigned_url(
    path: str,
    expires_in: timedelta = timedelta(hours=1),
    method: str = 'GET'
) -> str

Get pre-signed URL for S3 object.

Parameters
ParameterTypeDescription
`path`strStorage path / object key.
`expires_in`timedeltaValidity window (default one hour).
`method`strHTTP verb — ``"GET"`` or ``"PUT"``.
Returns
TypeDescription
strPre-signed URL string.
health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Perform health check on S3 storage.


Hierarchical root configuration for Lexigram Storage.

Attributes: name: Configuration name (default: “storage”) enabled: Whether storage module is enabled default_driver: Default storage driver (local, s3, gcs, azure, memory) drivers: Driver-specific configurations service: Storage operation settings

Example

# From environment variables (``LEX_STORAGE__*``)
config = StorageConfig()
# Explicit values
config = StorageConfig(default_driver="s3", drivers={"s3": StorageS3Config(...)})
# From environment variables (``LEX_STORAGE__*``)
config = StorageConfig()
# Explicit values
config = StorageConfig(default_driver="s3", drivers={"s3": StorageS3Config(...)})
is_production
property is_production() -> bool

Check if running in production environment.

is_development
property is_development() -> bool

Check if running in development environment.

is_test
property is_test() -> bool

Check if running in test environment.

from_named
def from_named(
    cls,
    entry: NamedStorageConfig
) -> StorageConfig

Build a single-backend StorageConfig from a NamedStorageConfig entry.

Used internally by StorageProvider to create per-backend configs from a multi-backend declaration. The resulting config has the driver-specific config placed in the drivers dict under the driver type key, so the existing DriverRegistry can consume it unchanged.

Parameters
ParameterTypeDescription
`entry`NamedStorageConfigThe named backend entry to materialise.
Returns
TypeDescription
StorageConfigA StorageConfig configured for the single named backend.
validate_production_security
def validate_production_security() -> StorageConfig

Block insecure storage configurations in production.

This validator fires when the LEX_ENV environment variable is set to "production" (case-insensitive). It rejects known-weak placeholder credentials ("change-me", "password", etc.) in S3 and Azure driver configs, raising ValueError immediately so the application fails fast at startup rather than leaking credentials at request time.

The environment variable checked is LEX_ENV (default: "development"). Set LEX_ENV=production in your deployment environment to activate production-grade security checks.


Object storage backend (S3, GCS, Azure, R2, local filesystem).

Registers BlobStoreProtocol for constructor injection.

Call configure to configure and register a storage driver, or stub for an isolated in-memory setup with no external service dependencies.

Usage

from lexigram.storage.config import StorageConfig
@module(
imports=[StorageModule.configure(StorageConfig(default_driver="s3"))]
)
class AppModule(Module):
pass
from lexigram.storage.config import StorageConfig
@module(
imports=[StorageModule.configure(StorageConfig(default_driver="s3"))]
)
class AppModule(Module):
pass
configure
def configure(
    cls,
    config: StorageConfig | Any | None = None,
    enable_encryption: bool = False
) -> DynamicModule

Create a StorageModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`StorageConfig | Any | NoneStorageConfig or ``None`` to resolve config from the container at boot time.
`enable_encryption`boolEnable server-side encryption for all stored objects. Defaults to ``False``; configure encryption keys via EncryptionConfig.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
Raises
ExceptionDescription
TypeErrorIf *config* is not a ``StorageConfig`` or ``None``.
stub
def stub(
    cls,
    config: StorageConfig | None = None
) -> DynamicModule

Create a StorageModule suitable for unit and integration testing.

Uses an in-memory backend with no external service dependencies.

Parameters
ParameterTypeDescription
`config`StorageConfig | NoneOptional StorageConfig override. Uses safe in-memory defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

DI provider for storage services.

Registers both the DriverRegistry (for extensibility) and the configured BlobStoreProtocol driver singleton into the container.

Usage

# Pass config directly
app.add_provider(StorageProvider(config=StorageConfig()))
# Or let the provider resolve config from the container (requires ConfigProvider)
app.add_provider(StorageProvider())
# Later in a service:
class MyService:
def __init__(self, store: BlobStoreProtocol) -> None: ...
# Pass config directly
app.add_provider(StorageProvider(config=StorageConfig()))
# Or let the provider resolve config from the container (requires ConfigProvider)
app.add_provider(StorageProvider())
# Later in a service:
class MyService:
def __init__(self, store: BlobStoreProtocol) -> None: ...
__init__
def __init__(config: StorageConfig | None = None) -> None
config
property config() -> StorageConfig | None

Return the storage configuration.

config
def config(value: StorageConfig | None) -> None

Set the storage configuration.

from_config
def from_config(
    cls,
    config: StorageConfig,
    **context: Any
) -> Self

Create provider from config object.

register
async def register(container: ContainerRegistrarProtocol) -> None

Register storage services into the DI container.

If no config was supplied at construction time, resolves it from the container via ConfigProtocol. Binds:

  • DriverRegistry — the extensible driver factory (singleton).
  • BlobStoreProtocol — the configured backend driver instance (singleton). In multi-backend mode, also registers named bindings.
Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolDI registrar supplied by the framework.
boot
async def boot(container: ContainerResolverProtocol) -> None

Startup hook — verify storage connectivity after all registrations.

shutdown
async def shutdown() -> None

Shutdown hook - cleanup resources owned by driver.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Health check for storage.


Options for file uploads.