API Reference
Protocols
Section titled “Protocols”BlobStoreProtocol
Section titled “BlobStoreProtocol”Protocol for blob storage operations.
This interface defines the contract for file storage backends (S3, GCS, Azure Blob, local filesystem, etc.).
Example
class S3BlobStore: async def upload(self, path: str, data: bytes, **options) -> FileInfo: await self._client.put_object(Bucket=self._bucket, Key=path, Body=data) return FileInfo(path=path, size=len(data), ...)async def upload( path: str, data: bytes | AsyncIterator[bytes], content_type: str | None = None, **options: Any ) -> FileInfo
Upload data to the storage backend.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
| `data` | bytes | AsyncIterator[bytes] | File content as bytes or async iterator. |
| `content_type` | str | None | MIME type of the content. **options: Additional upload options. |
| Type | Description |
|---|---|
| FileInfo | FileInfo with path, size, and metadata. |
Download file content into memory.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
| Type | Description |
|---|---|
| bytes | File content as bytes. |
Stream file content (memory efficient).
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
| `chunk_size` | int | Size of each chunk in bytes. |
| Type | Description |
|---|---|
| AsyncIterator[bytes] | File content in chunks. |
Delete a file.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
Check if file exists.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
| Type | Description |
|---|---|
| bool | True if file exists. |
async def info(path: str) -> FileInfo
Get file metadata.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
| Type | Description |
|---|---|
| FileInfo | FileInfo with size, content_type, etc. |
def list(prefix: str = '') -> AsyncIterator[FileInfo]
List files with a given prefix.
| Parameter | Type | Description |
|---|---|---|
| `prefix` | str | Path prefix to filter by. |
| Type | Description |
|---|---|
| AsyncIterator[FileInfo] | FileInfo for each matching file. |
Get public URL (if applicable).
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
| Type | Description |
|---|---|
| str | Public URL string. |
async def get_presigned_url( path: str, expires_in: timedelta = timedelta(hours=1), method: str = 'GET' ) -> str
Get a temporary secure URL.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path/key. |
| `expires_in` | timedelta | URL validity window as a timedelta (default one hour). Pass ``timedelta(minutes=5)`` for secure short-lived downloads or ``timedelta(hours=24)`` for bulk exports. |
| `method` | str | HTTP method (GET or PUT). |
| Type | Description |
|---|---|
| str | Presigned URL string. |
Perform health check.
| Type | Description |
|---|---|
| HealthCheckResult | Structured health check result. |
Classes
Section titled “Classes”AzureDriver
Section titled “AzureDriver”Azure Blob Storage driver.
Wraps azure-storage-blob (async variant) to provide fully async
Blob Storage operations.
Install the optional dependency with
pip install lexigram-storage[azure]# i.e. azure-storage-blob>=12.20.0| Parameter | Type | Description |
|---|---|---|
| `account_name` | Azure storage account name. | |
| `account_key` | Storage account access key (plain string or ``SecretStr``). | |
| `container` | Blob container (equivalent to a bucket). |
Initialise the Azure Blob Storage driver.
| Parameter | Type | Description |
|---|---|---|
| `account_name` | str | Azure storage account name. |
| `account_key` | str | SecretStr | Storage account access key. |
| `container` | str | Blob container name. |
| Exception | Description |
|---|---|
| ImportError | When ``azure-storage-blob`` is not installed. |
async def upload( path: str, data: Uploadable, options: UploadOptions | None = None ) -> FileInfo
Upload data to Azure Blob Storage at path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Destination blob name / key. |
| `data` | Uploadable | Content to upload. |
| `options` | UploadOptions | None | Optional ``UploadOptions`` (content-type, visibility, metadata). |
| Exception | Description |
|---|---|
| StorageError | On any Azure SDK error. |
Download the blob at path into memory.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Blob name / key. |
| Type | Description |
|---|---|
| bytes | Raw file bytes. |
| Exception | Description |
|---|---|
| FileNotFoundError | When the blob does not exist. |
| StorageError | On any other Azure SDK error. |
Yield successive chunks from the Azure blob at path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Blob name / key. |
| `chunk_size` | int | Bytes per chunk (default 8 KiB). |
| Type | Description |
|---|---|
| AsyncIterator[bytes] | Raw byte chunks. |
Delete the Azure blob at path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Blob name / key. |
| Exception | Description |
|---|---|
| FileNotFoundError | When the blob does not exist. |
| StorageError | On any other Azure SDK error. |
Return True if path exists in the Azure container.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Blob name / key. |
async def info(path: str) -> FileInfo
Return metadata for the Azure blob at path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Blob name / key. |
| Exception | Description |
|---|---|
| FileNotFoundError | When the blob does not exist. |
| StorageError | On any other Azure SDK error. |
async def list(prefix: str = '') -> AsyncIterator[FileInfo]
Yield FileInfo for blobs matching prefix.
| Parameter | Type | Description |
|---|---|---|
| `prefix` | str | Blob name prefix filter (empty string lists all blobs). |
| Exception | Description |
|---|---|
| StorageError | On any Azure SDK error. |
Return a public URL for the Azure blob at path.
The URL is only accessible when the container has public access enabled. For private containers, use get_presigned_url.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Blob name / key. |
async def get_presigned_url( path: str, expires_in: timedelta = timedelta(hours=1), method: str = 'GET' ) -> str
Return a SAS (Shared Access Signature) URL for the blob at path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Blob name / key. |
| `expires_in` | timedelta | Validity window (default one hour). |
| `method` | str | HTTP verb (``"GET"`` for read, ``"PUT"`` for write). |
| Type | Description |
|---|---|
| str | A time-limited SAS URL string. |
| Exception | Description |
|---|---|
| StorageError | When SAS token generation fails. |
Perform a lightweight connectivity check against Azure Blob Storage.
Lists at most one blob from the container to verify credentials and container accessibility.
| Type | Description |
|---|---|
| HealthCheckResult | HealthCheckResult. |
FileDeletedEvent
Section titled “FileDeletedEvent”File was successfully deleted from storage.
Consumed by: file cleanup tracking, audit logging, quota reclamation.
FileDownloadedEvent
Section titled “FileDownloadedEvent”File was successfully downloaded from storage.
Consumed by: download tracking, audit logging, usage analytics.
FileInfo
Section titled “FileInfo”Information about a stored file.
FileUploadedEvent
Section titled “FileUploadedEvent”File was successfully uploaded to storage.
Consumed by: file tracking, audit logging, quota management.
GCSDriver
Section titled “GCSDriver”Google Cloud Storage driver.
Wraps gcloud-aio-storage to provide fully async GCS operations.
Install the optional dependency with
pip install lexigram-storage[gcs]# i.e. gcloud-aio-storage>=9.0.0| Parameter | Type | Description |
|---|---|---|
| `bucket` | GCS bucket name. | |
| `project_id` | Google Cloud project ID (used for logging / context only; the gcloud library resolves the project from ADC when needed). | |
| `credentials_path` | Path to a service-account JSON key file. When ``None`` the driver falls back to Application Default Credentials (ADC), which covers Cloud Run, GKE Workload Identity, etc. |
def __init__( bucket: str, project_id: str | None = None, credentials_path: str | None = None ) -> None
Initialise the GCS driver.
| Parameter | Type | Description |
|---|---|---|
| `bucket` | str | GCS bucket name. |
| `project_id` | str | None | Google Cloud project ID (informational; ADC handles auth). |
| `credentials_path` | str | None | Path to service-account JSON credentials file, or ``None`` to use Application Default Credentials. |
| Exception | Description |
|---|---|
| ImportError | When ``gcloud-aio-storage`` is not installed. |
async def upload( path: str, data: Uploadable, options: UploadOptions | None = None ) -> FileInfo
Upload data to the GCS path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Destination object name / key. |
| `data` | Uploadable | Content to upload. |
| `options` | UploadOptions | None | Optional ``UploadOptions`` (content-type, visibility, metadata). |
| Exception | Description |
|---|---|
| StorageError | On any GCS API error. |
Download the object at path into memory.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Object name / key. |
| Type | Description |
|---|---|
| bytes | Raw file bytes. |
| Exception | Description |
|---|---|
| FileNotFoundError | When the object does not exist. |
| StorageError | On any other GCS API error. |
Yield successive chunks from the GCS object at path.
Downloads the entire object into memory first, then yields chunks from the in-memory buffer. For objects larger than available RAM, prefer using a streaming-capable GCS library.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Object name / key. |
| `chunk_size` | int | Bytes per chunk (default 8 KiB). |
| Type | Description |
|---|---|
| AsyncIterator[bytes] | Raw byte chunks. |
Delete the GCS object at path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Object name / key. |
| Exception | Description |
|---|---|
| FileNotFoundError | When the object does not exist. |
| StorageError | On any other GCS API error. |
Return True if path exists in the GCS bucket.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Object name / key. |
async def info(path: str) -> FileInfo
Return metadata for the GCS object at path.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Object name / key. |
| Exception | Description |
|---|---|
| FileNotFoundError | When the object does not exist. |
| StorageError | On any other GCS API error. |
async def list(prefix: str = '') -> AsyncIterator[FileInfo]
Yield FileInfo for objects matching prefix.
| Parameter | Type | Description |
|---|---|---|
| `prefix` | str | Key prefix filter (empty string lists all objects). |
| Exception | Description |
|---|---|
| StorageError | On any GCS API error. |
Return a public URL for the GCS object at path.
The URL is only accessible when the object (or bucket) has been made publicly readable. For private objects, use get_presigned_url.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Object name / key. |
async def get_presigned_url( path: str, expires_in: timedelta = timedelta(hours=1), method: str = 'GET' ) -> str
Return a signed URL for the GCS object at path.
Uses the V4 signing API from gcloud-aio-storage when available,
falling back to an unsigned public URL for read access.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Object name / key. |
| `expires_in` | timedelta | Validity window (default one hour). |
| `method` | str | HTTP verb (``"GET"`` or ``"PUT"``). |
| Type | Description |
|---|---|
| str | A signed (or public) URL string. |
| Exception | Description |
|---|---|
| StorageError | When URL signing fails. |
Perform a lightweight connectivity check against GCS.
Lists at most one object from the bucket to verify credentials and bucket accessibility.
| Type | Description |
|---|---|
| HealthCheckResult | HealthCheckResult. |
LocalDriver
Section titled “LocalDriver”Local file system storage driver
async def upload( path: str, data: Uploadable, options: UploadOptions | None = None ) -> FileInfo
Upload data to local file system with atomic write guarantee.
Uses temp file + atomic rename to ensure no partial writes on failure. Optionally validates checksum if provided in options.
Download file content from local file system
Stream file content from local file system
Delete file from local file system
Check if file exists in local file system
async def info(path: str) -> FileInfo
Get file info from local file system
async def list(prefix: str = '') -> AsyncIterator[FileInfo]
List files with prefix from local file system
Get public URL for local file
async def get_presigned_url( path: str, expires_in: timedelta = timedelta(hours=1), method: str = 'GET' ) -> str
Get pre-signed URL for local file (same as public URL).
Perform health check on local storage
async def copy( src: str, dst: str ) -> FileInfo
Copy a file efficiently within the local filesystem.
async def move( src: str, dst: str ) -> FileInfo
Move a file efficiently within the local filesystem.
MemoryDriver
Section titled “MemoryDriver”In-memory storage driver using a dictionary
async def upload( path: str, data: Uploadable, options: UploadOptions | None = None ) -> FileInfo
Upload data to memory storage
Download file content from memory
Stream file content from memory
Delete file from memory
Check if file exists in memory
async def info(path: str) -> FileInfo
Get file info from memory
async def list(prefix: str = '') -> AsyncIterator[FileInfo]
List files with prefix from memory
Get URL for memory storage (not applicable)
async def get_presigned_url( path: str, expires_in: timedelta = timedelta(hours=1), method: str = 'GET' ) -> str
Raise StorageUnsupportedOperationError — not applicable to in-memory storage.
The in-memory driver has no network endpoint, so presigned URLs cannot be generated. Use a cloud-backed driver (S3, GCS, Azure) for presigned URL support.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | File path that would have been signed. |
| `expires_in` | timedelta | Ignored. |
| `method` | str | Ignored. |
| Exception | Description |
|---|---|
| StorageUnsupportedOperationError | Always — presigned URLs are not supported by the in-memory driver. |
Perform health check on memory storage
NamedStorageConfig
Section titled “NamedStorageConfig”Configuration for a single named storage backend.
Used in StorageConfig.backends to declare multiple blob stores that the framework registers as named DI bindings.
Example
backends:
- name: primary driver: s3 primary: true s3: bucket: my-app-primary region: us-east-1
- name: avatars driver: s3 s3: bucket: my-app-avatars region: us-east-1
- name: local driver: local local: root_dir: ./storage/public
| Parameter | Type | Description |
|---|---|---|
| `name` | Unique backend identifier. Used as the Named() DI key. | |
| `primary` | Whether this is the primary backend. Primary backends also receive the unnamed BlobStoreProtocol binding. | |
| `driver` | Storage driver name. One of: local, s3, gcs, azure, r2, memory. | |
| `local` | Local filesystem driver config (when driver='local'). | |
| `s3` | AWS S3 driver config (when driver='s3'). | |
| `gcs` | Google Cloud Storage config (when driver='gcs'). | |
| `azure` | Azure Blob Storage config (when driver='azure'). | |
| `r2` | Cloudflare R2 config (when driver='r2'). | |
| `memory` | In-memory storage config (when driver='memory'). |
ObjectDeletedHook
Section titled “ObjectDeletedHook”Payload fired when a blob object is deleted.
Attributes: bucket: Name of the bucket or container the object was deleted from. key: Storage key (path) of the deleted object.
ObjectStoredHook
Section titled “ObjectStoredHook”Payload fired when a blob object is successfully stored.
Attributes: bucket: Name of the bucket or container the object was stored in. key: Storage key (path) of the stored object.
S3Driver
Section titled “S3Driver”AWS S3 storage driver with multipart upload for large files
def __init__( bucket: str, region: str | None = None, access_key: str | None = None, secret_key: str | None = None, endpoint_url: str | None = None, multipart_threshold: int = DEFAULT_MULTIPART_THRESHOLD, multipart_chunk_size: int = DEFAULT_MULTIPART_CHUNK_SIZE, encryption: EncryptionConfig | None = None )
Initialize S3 driver.
| Parameter | Type | Description |
|---|---|---|
| `bucket` | str | S3 bucket name |
| `region` | str | None | AWS region |
| `access_key` | str | None | AWS access key (optional if using IAM roles) |
| `secret_key` | str | None | AWS secret key (optional if using IAM roles) |
| `endpoint_url` | str | None | Custom endpoint URL (for MinIO, LocalStack, etc.) |
| `multipart_threshold` | int | File size threshold for multipart upload (bytes) |
| `multipart_chunk_size` | int | Chunk size for multipart upload (bytes) |
| `encryption` | EncryptionConfig | None | Optional server-side encryption configuration. When ``enabled=True``, all ``put_object`` and ``create_multipart_upload`` calls will include the appropriate ``ServerSideEncryption`` / ``SSEKMSKeyId`` parameters. |
Download file content from S3 (robust to different body shapes).
Some S3 client implementations return an async context manager for the
response body while others provide an object with a read() coroutine
method. Handle both shapes defensively and return bytes.
Stream file content from S3
Delete file from S3
Check if file exists in S3
async def info(path: str) -> FileInfo
Get file info from S3
async def list(prefix: str = '') -> AsyncIterator[FileInfo]
List files with prefix from S3
Get public URL for S3 object
async def get_presigned_url( path: str, expires_in: timedelta = timedelta(hours=1), method: str = 'GET' ) -> str
Get pre-signed URL for S3 object.
| Parameter | Type | Description |
|---|---|---|
| `path` | str | Storage path / object key. |
| `expires_in` | timedelta | Validity window (default one hour). |
| `method` | str | HTTP verb — ``"GET"`` or ``"PUT"``. |
| Type | Description |
|---|---|
| str | Pre-signed URL string. |
Perform health check on S3 storage.
StorageConfig
Section titled “StorageConfig”Hierarchical root configuration for Lexigram Storage.
Attributes: name: Configuration name (default: “storage”) enabled: Whether storage module is enabled default_driver: Default storage driver (local, s3, gcs, azure, memory) drivers: Driver-specific configurations service: Storage operation settings
Example
# From environment variables (``LEX_STORAGE__*``)config = StorageConfig()
# Explicit valuesconfig = StorageConfig(default_driver="s3", drivers={"s3": StorageS3Config(...)})def from_named( cls, entry: NamedStorageConfig ) -> StorageConfig
Build a single-backend StorageConfig from a NamedStorageConfig entry.
Used internally by StorageProvider to create per-backend configs from a multi-backend declaration. The resulting config has the driver-specific config placed in the drivers dict under the driver type key, so the existing DriverRegistry can consume it unchanged.
| Parameter | Type | Description |
|---|---|---|
| `entry` | NamedStorageConfig | The named backend entry to materialise. |
| Type | Description |
|---|---|
| StorageConfig | A StorageConfig configured for the single named backend. |
def validate_production_security() -> StorageConfig
Block insecure storage configurations in production.
This validator fires when the LEX_ENV environment variable is
set to "production" (case-insensitive). It rejects known-weak
placeholder credentials ("change-me", "password", etc.) in S3
and Azure driver configs, raising ValueError immediately so the
application fails fast at startup rather than leaking credentials at
request time.
The environment variable checked is LEX_ENV (default:
"development"). Set LEX_ENV=production in your deployment
environment to activate production-grade security checks.
StorageModule
Section titled “StorageModule”Object storage backend (S3, GCS, Azure, R2, local filesystem).
Registers BlobStoreProtocol for constructor injection.
Call configure to configure and register a storage driver, or stub for an isolated in-memory setup with no external service dependencies.
Usage
from lexigram.storage.config import StorageConfig
@module( imports=[StorageModule.configure(StorageConfig(default_driver="s3"))])class AppModule(Module): passdef configure( cls, config: StorageConfig | Any | None = None, enable_encryption: bool = False ) -> DynamicModule
Create a StorageModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | StorageConfig | Any | None | StorageConfig or ``None`` to resolve config from the container at boot time. |
| `enable_encryption` | bool | Enable server-side encryption for all stored objects. Defaults to ``False``; configure encryption keys via EncryptionConfig. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
| Exception | Description |
|---|---|
| TypeError | If *config* is not a ``StorageConfig`` or ``None``. |
def stub( cls, config: StorageConfig | None = None ) -> DynamicModule
Create a StorageModule suitable for unit and integration testing.
Uses an in-memory backend with no external service dependencies.
| Parameter | Type | Description |
|---|---|---|
| `config` | StorageConfig | None | Optional StorageConfig override. Uses safe in-memory defaults when ``None``. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
StorageProvider
Section titled “StorageProvider”DI provider for storage services.
Registers both the DriverRegistry (for extensibility) and the configured BlobStoreProtocol driver singleton into the container.
Usage
# Pass config directlyapp.add_provider(StorageProvider(config=StorageConfig()))
# Or let the provider resolve config from the container (requires ConfigProvider)app.add_provider(StorageProvider())
# Later in a service:class MyService: def __init__(self, store: BlobStoreProtocol) -> None: ...def __init__(config: StorageConfig | None = None) -> None
property config() -> StorageConfig | None
Return the storage configuration.
def from_config( cls, config: StorageConfig, **context: Any ) -> Self
Create provider from config object.
async def register(container: ContainerRegistrarProtocol) -> None
Register storage services into the DI container.
If no config was supplied at construction time, resolves it from the container via ConfigProtocol. Binds:
- DriverRegistry — the extensible driver factory (singleton).
- BlobStoreProtocol — the configured backend driver instance (singleton). In multi-backend mode, also registers named bindings.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerRegistrarProtocol | DI registrar supplied by the framework. |
async def boot(container: ContainerResolverProtocol) -> None
Startup hook — verify storage connectivity after all registrations.
Shutdown hook - cleanup resources owned by driver.
Health check for storage.
UploadOptions
Section titled “UploadOptions”Options for file uploads.