API Reference
Protocols
Section titled “Protocols”IdentityResolverProtocol
Section titled “IdentityResolverProtocol”Protocol for resolving external OAuth identities to internal user IDs.
This protocol is used to map OAuth external IDs (like Google’s ‘sub’ claim) to internal UUIDs. This is necessary because OAuth providers use their own ID format which may not be valid UUIDs.
Resolve an external OAuth ID to an internal user UUID.
| Parameter | Type | Description |
|---|---|---|
| `external_id` | str | The external ID from the OAuth provider (e.g., Google's sub claim). |
| `provider` | str | The OAuth provider name (e.g., "google", "apple"). |
| Type | Description |
|---|---|
| str | None | The internal user UUID if found, None otherwise. |
Synchronous version of resolve_user_id.
| Parameter | Type | Description |
|---|---|---|
| `external_id` | str | The external ID from the OAuth provider. |
| `provider` | str | The OAuth provider name. |
| Type | Description |
|---|---|
| str | None | The internal user UUID if found, None otherwise. |
OAuthIdentityStore
Section titled “OAuthIdentityStore”Protocol for OAuth identity storage.
OAuthIdentityStore manages the linking between local user accounts and external OAuth2 identity providers. This enables users to authenticate using social login while maintaining a consistent local user identity.
Example
Using OAuth identity store
store = await container.resolve(OAuthIdentityStore)
# Link OAuth identity to useridentity = await store.create_oauth_identity( user_id="user-123", provider="google", provider_user_id="google-456")
# Find user by OAuth identityidentity = await store.get_oauth_identity("google", "google-456")if identity: user = await store.get_user_by_oauth_identity(identity.id)async def create_oauth_identity( user_id: str, provider: str, provider_user_id: str ) -> OAuthIdentity
Create an OAuth identity link for a user.
| Parameter | Type | Description |
|---|---|---|
| `user_id` | str | The local user ID to link the OAuth identity to. |
| `provider` | str | The OAuth provider name (e.g., "google", "github"). |
| `provider_user_id` | str | The user's ID at the OAuth provider. |
| Type | Description |
|---|---|
| OAuthIdentity | The created OAuthIdentity instance. |
async def get_oauth_identity( provider: str, provider_user_id: str ) -> OAuthIdentity | None
Get OAuth identity by provider and provider user ID.
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | The OAuth provider name. |
| `provider_user_id` | str | The user's ID at the OAuth provider. |
| Type | Description |
|---|---|
| OAuthIdentity | None | The OAuthIdentity if found, None otherwise. |
async def get_oauth_identities_for_user(user_id: str) -> list[OAuthIdentity]
Get all OAuth identities for a user
Delete OAuth identity
Delete all OAuth identities for a user
Get local user_id by OAuth provider and external user ID.
This is the key method for resolving OAuth external IDs to local user IDs. Used when OAuth tokens contain non-UUID user identifiers (like Google’s sub claim).
| Parameter | Type | Description |
|---|---|---|
| `provider` | str | The OAuth provider name (e.g., "google", "github", "apple"). |
| `provider_user_id` | str | The user's ID at the OAuth provider. |
| Type | Description |
|---|---|
| str | None | The local user_id if found, None otherwise. |
Resolve a user_id that may be either a UUID or an OAuth external ID.
This method handles the common issue where OAuth providers (like Google) use non-UUID identifiers (e.g., “101158382316025899191”) which cannot be used directly in database queries expecting UUIDs.
Resolution logic:
- If user_id_or_oauth_id is a valid UUID format, check if user exists
- If not a valid UUID, treat it as an OAuth provider_user_id and look up
| Parameter | Type | Description |
|---|---|---|
| `user_id_or_oauth_id` | str | Either a local user UUID or an OAuth provider's external ID. |
| `provider` | str | The OAuth provider to search in (default: "google"). |
| Type | Description |
|---|---|
| str | None | The resolved local user_id if found, None otherwise. |
TokenValidatorProtocol
Section titled “TokenValidatorProtocol”Validates and decodes authentication tokens.
Validate a token and return its payload, or None if invalid.
Revoke a token, preventing future validation.
Classes
Section titled “Classes”APIKeyAuthenticator
Section titled “APIKeyAuthenticator”Authenticates requests by validating an API key.
Key extraction order:
request_context["headers"][config.header_name]request_context["query_params"][config.query_param]
If the key is absent or the lookup returns None an
Err(AuthenticationError) is returned. On success Ok(User)
is returned.
Example
config = APIKeyConfig( lookup={"sk_live_abc123": admin_user},)authenticator = APIKeyAuthenticator(config)
result = await authenticator.authenticate({ "headers": {"X-API-Key": "sk_live_abc123"}, "query_params": {},})
if result.is_ok(): user = result.unwrap()def __init__(config: APIKeyConfig) -> None
Initialise the authenticator.
| Parameter | Type | Description |
|---|---|---|
| `config` | APIKeyConfig | Key-extraction and lookup configuration. |
async def authenticate(request_context: dict[str, Any]) -> Result[User, AuthenticationError]
Authenticate a request using its API key.
| Parameter | Type | Description |
|---|---|---|
| `request_context` | dict[str, Any] | A mapping that **must** contain at least one of: * ``"headers"``: ``dict[str, str]`` of HTTP request headers. * ``"query_params"``: ``dict[str, str]`` of URL query params. |
| Type | Description |
|---|---|
| Result[User, AuthenticationError] | ``Ok(User)`` when a valid API key is found. ``Err(AuthenticationError)`` when the key is absent, invalid, or the lookup returns ``None``. |
APIKeyConfig
Section titled “APIKeyConfig”Configuration for APIKeyAuthenticator.
Attributes:
header_name: HTTP header to inspect for the API key.
Defaults to "X-API-Key".
query_param: URL query parameter name to fall back to when the header
is absent. Defaults to "api_key".
lookup: Either a dict[str, User] mapping raw key strings to their
owners, or an async/sync callable that accepts the raw key and
returns User | None. This field is required.
AuthAuthenticationFailedHook
Section titled “AuthAuthenticationFailedHook”Payload fired when an authentication attempt fails.
Attributes: method: Authentication method that was attempted. reason: Short description of why authentication failed.
AuthBundleProvider
Section titled “AuthBundleProvider”Composite provider that wires the full Lexigram auth stack.
Composes AuthenticationProvider, TokenProvider, SessionProvider, and AuthorizationProvider so that callers only need to register a single provider:
.. code-block:: python
container.add_provider(AuthBundleProvider(config=auth_config))Dependencies registered by each sub-provider are available in the container after register completes.
| Parameter | Type | Description |
|---|---|---|
| `config` | Shared AuthConfig forwarded to every sub-provider. When ``None``, each sub-provider uses its own defaults. | |
| `initial_roles` | Optional initial RBAC roles forwarded to AuthorizationProvider. | |
| `enable_passkeys` | When ``True``, append PasskeyProvider to the sub-provider list (requires the WebAuthn extra to be installed). | |
| `kwargs` | Extra keyword arguments forwarded to AuthenticationProvider. |
def __init__( config: AuthConfig | None = None, initial_roles: dict[str, Any] | None = None, enable_passkeys: bool = False, **kwargs: Any ) -> None
def from_config( cls, config: AuthConfig, **context: Any ) -> Self
Create provider from config object.
async def register(container: ContainerRegistrarProtocol) -> None
Register all auth sub-providers with the container.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerRegistrarProtocol | The DI container registrar. |
async def boot(container: ContainerResolverProtocol) -> None
Boot all auth sub-providers in registration order.
| Parameter | Type | Description |
|---|---|---|
| `container` | ContainerResolverProtocol | The DI container resolver. |
Shut down all auth sub-providers in reverse registration order.
Aggregate health check across all sub-providers.
Returns DEGRADED if any sub-provider is unhealthy.
| Parameter | Type | Description |
|---|---|---|
| `timeout` | float | Per-provider timeout budget in seconds. |
| Type | Description |
|---|---|
| HealthCheckResult | An aggregated HealthCheckResult. |
AuthConfig
Section titled “AuthConfig”Hierarchical root configuration for Lexigram Auth.
Attributes: name: Configuration name (default: “auth”) enabled: Whether the auth module is enabled users: Initial users to create roles: Role definitions for RBAC rbac: RBAC system configuration token: JWT configuration middleware: Authentication middleware configuration secret_key: Secret key for signing tokens admin_email: Initial admin email admin_password: Initial admin password login_rate_limit: Rate limit for login endpoints oauth2_providers: OAuth2 provider configurations
def validate_security() -> AuthConfig
Ensure secure settings in production.
AuthModule
Section titled “AuthModule”Full authentication and authorization stack (JWT, OAuth2/OIDC, RBAC, policies).
Call configure to configure the auth bundle with an AuthConfig.
Usage
from lexigram.auth.config import AuthConfig
@module( imports=[AuthModule.configure(AuthConfig(secret_key="..."))])class AppModule(Module): passdef configure( cls, config: Any | None = None, initial_roles: dict[str, Any] | None = None, is_global: bool = True ) -> DynamicModule
Create an AuthModule with explicit configuration.
| Parameter | Type | Description |
|---|---|---|
| `config` | Any | None | AuthConfig or ``None`` for framework defaults (development-only ephemeral secrets). |
| `initial_roles` | dict[str, Any] | None | Optional RBAC role seed forwarded to the authorization sub-provider. |
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule descriptor. |
def stub(cls) -> DynamicModule
Return an in-memory AuthModule suitable for unit and integration testing.
Uses ephemeral in-memory storage with a fixed test secret key. No external token services, databases, or OAuth providers are configured.
| Type | Description |
|---|---|
| DynamicModule | A DynamicModule backed by in-memory auth storage. |
AuthResult
Section titled “AuthResult”Result of an authentication attempt.
AuthStatus
Section titled “AuthStatus”Authentication status values.
AuthToken
Section titled “AuthToken”Authentication token response.
AuthTokenIssuedHook
Section titled “AuthTokenIssuedHook”Payload fired when an access or refresh token is issued.
Attributes:
user_id: Identifier of the user the token was issued for.
token_type: Token kind (e.g. "access", "refresh").
AuthTokenRefreshedHook
Section titled “AuthTokenRefreshedHook”Payload fired when an access token is refreshed.
Attributes: user_id: Identifier of the user whose token was refreshed. token_type: Token kind that was refreshed.
AuthTokenRevokedHook
Section titled “AuthTokenRevokedHook”Payload fired when a token is explicitly revoked.
Attributes: user_id: Identifier of the user whose token was revoked. token_type: Token kind that was revoked.
AuthUserAuthenticatedHook
Section titled “AuthUserAuthenticatedHook”Payload fired when a user successfully authenticates.
Attributes:
user_id: Identifier of the authenticated user.
method: Authentication method used (e.g. "password", "oauth2").
AuthenticationFailed
Section titled “AuthenticationFailed”Emitted when an authentication attempt fails.
Attributes: email: Email that was used in the failed attempt. reason: Human-readable reason for the failure. ip: Remote IP address of the request, if available.
AuthenticationProvider
Section titled “AuthenticationProvider”User authentication ONLY (login/logout/validation).
config is the primary parameter and drives all defaults. The optional
keyword arguments are override points for callers that need to supply
custom implementations (e.g., a production-grade UserStoreProtocol backed
by a database, or a pre-built token manager).
| Parameter | Type | Description |
|---|---|---|
| `config` | The resolved AuthConfig. JWT credentials (``config.token.secret_key``, ``config.token.algorithm``) are used to build a JWTTokenManager automatically when no ``token_manager`` override is provided. | |
| `password_policy` | Override the password policy derived from ``config.password``. When *None* the policy is built from config; falls back to ``PasswordPolicy()`` when config is also absent. | |
| `user_store` | Override the user store. Defaults to InMemoryUserStore. | |
| `token_manager` | Supply a fully constructed token manager. When *None* and ``config.token`` is present the manager is built automatically from the JWT configuration. | |
| `cache_service` | Optional cache backend passed to the auto-built token manager for token blacklisting. Ignored when ``token_manager`` is provided directly. | |
| `mfa_service` | Optional MFA manager for multi-factor authentication flows. |
def __init__( config: Annotated[AuthConfig, Inject] | None = None, *, password_policy: PasswordPolicy | None = None, user_store: UserStoreProtocol | None = None, token_manager: Any = None, cache_service: Any = None, mfa_service: MFAManager | None = None ) -> None
property service() -> AuthenticationService
Get or create the authentication service.
Fetch a user by their ID.
Satisfies AuthProviderProtocol.
| Parameter | Type | Description |
|---|---|---|
| `user_id` | str | The unique identifier of the user to retrieve. |
| Type | Description |
|---|---|
| Any | None | The user object, or ``None`` if not found. |
async def register(container: ContainerRegistrarProtocol) -> None
Register authentication services with the container.
Registers both concrete implementations and their corresponding protocols to enable dependency injection across extensions without direct imports.
Initialize authentication provider and register with kernel health registry.
Shutdown authentication provider.
async def verify_token(token: str) -> Result[VerifiedToken, TokenError]
Verify a JWT token and return a Result with the decoded payload.
Satisfies AuthProviderProtocol.
Delegates to token_manager.verify_token() when a token manager
is configured. Returns an Err result if no token manager is set
or if the token is invalid for any expected domain reason.
Infrastructure failures (cache unavailable, network errors) are still raised as exceptions and must be handled by the caller.
| Parameter | Type | Description |
|---|---|---|
| `token` | str | The JWT token string to verify. |
| Type | Description |
|---|---|
| Result[VerifiedToken, TokenError] | ``Ok(VerifiedToken)`` if the token is valid and not revoked, or ``Err(TokenError)`` for expected domain failures. |
Validate a session token and return user information.
Delegates to the underlying authentication service’s
get_user_from_token method. Returns a
Result[VerifiedToken, TokenError] or None when no token
manager is configured.
| Parameter | Type | Description |
|---|---|---|
| `token` | str | The session or JWT token to validate. |
| Type | Description |
|---|---|
| Any | ``Result[VerifiedToken, TokenError]`` on success/failure, or ``None``. |
Check authentication provider health.
AuthenticationService
Section titled “AuthenticationService”Service for core authentication operations.
Handles user login, registration, token creation and validation.
def __init__( password_policy: PasswordPolicy, user_store: UserStoreProtocol, token_manager: JWTTokenManager, lockout_config: LockoutConfig | None = None, event_bus: EventBusProtocol | None = None, tracker: LoginAttemptTracker | None = None, hooks: HookRegistryProtocol | None = None ) -> None
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None
Attach an optional hook registry after provider boot wiring.
async def authenticate_user( email: str, password: str ) -> Result[User, InvalidCredentialsError | AccountLockedError]
Authenticate a user with email and password.
Always performs a password hash verification regardless of whether the user exists. This constant-time behaviour prevents user-enumeration attacks via timing side-channels.
If the account has exceeded LockoutConfig.max_attempts
within the observation window an AccountLockedError is returned
immediately without performing credential verification.
| Type | Description |
|---|---|
| Result[User, InvalidCredentialsError | AccountLockedError] | ``Ok(User)`` if authentication succeeds, ``Err(AccountLockedError)`` if the account is temporarily locked, ``Err(InvalidCredentialsError)`` otherwise. |
async def register_user(request: RegisterRequest) -> Result[User, EmailExistsError | PasswordPolicyError]
Register a new user.
| Parameter | Type | Description |
|---|---|---|
| `request` | RegisterRequest | Registration request with email, name, and password. |
| Type | Description |
|---|---|
| Result[User, EmailExistsError | PasswordPolicyError] | ``Ok(User)`` on success. ``Err(PasswordPolicyError)`` if passwords do not match or the password violates the policy. ``Err(EmailExistsError)`` if the email is already registered. |
Create an authentication token for a user.
async def verify_token(token: str) -> Result[VerifiedToken, TokenError]
Verify and decode an authentication token.
| Type | Description |
|---|---|
| Result[VerifiedToken, TokenError] | ``Ok(VerifiedToken)`` if valid, ``Err(TokenError)`` otherwise. |
async def refresh_token(refresh_token: str) -> Result[AuthToken, TokenError]
Refresh an access token using a refresh token.
| Parameter | Type | Description |
|---|---|---|
| `refresh_token` | str | The refresh token string. |
| Type | Description |
|---|---|
| Result[AuthToken, TokenError] | ``Ok(AuthToken)`` if the refresh succeeds. ``Err(TokenError)`` if the refresh token is invalid or expired. |
async def get_user_from_token(token: str) -> Result[VerifiedToken, TokenError]
Get user information from token.
| Type | Description |
|---|---|
| Result[VerifiedToken, TokenError] | ``Ok(VerifiedToken)`` if valid, ``Err(TokenError)`` otherwise. |
Cancel and await all pending background event tasks.
AuthorizationProvider
Section titled “AuthorizationProvider”Role-based access control and permission management.
def __init__( config: Annotated[AuthConfig, Inject] | None = None, initial_roles: dict[str, Any] | None = None, **kwargs: Any ) -> None
property auth_config() -> AuthConfig | None
async def register(container: ContainerRegistrarProtocol) -> None
Register authorization services with the container.
async def boot(container: ContainerResolverProtocol) -> None
Initialize authorization provider.
Shutdown authorization provider.
Check authorization provider health.
JWTConfig
Section titled “JWTConfig”JWT Configuration
def validate_jwt_security() -> JWTConfig
Ensure secure JWT configuration in production.
JWTTokenManager
Section titled “JWTTokenManager”JWT token management with key rotation support.
The JWTTokenManager handles creation, validation, and renewal of JWT tokens for user authentication. It supports multiple signing keys for seamless key rotation without invalidating existing tokens.
Attributes: current_key_id: ID of the currently active signing key. keys: Dictionary of key_id -> key material. algorithm: JWT signing algorithm (HS256, RS256, etc.). access_expiration_hours: Expiration time for access tokens. refresh_expiration_days: Expiration time for refresh tokens. cache_service: Optional cache backend for token validation caching. rotation_interval: Interval in seconds between key rotations.
Example
Basic token operations
manager = JWTTokenManager( current_key_id="v1", keys={"v1": SecretStr("secure-secret-key")},)
# Create access tokentoken = await manager.create_access_token(user_id="user123")
# Verify and decodepayload = await manager.verify_token(token)Note
In production, secrets should not be hardcoded. Use environment variables or a secrets management service.
def __init__( current_key_id: str, keys: dict[str, str | SecretStr | dict[str, str | SecretStr]] | None = None, algorithm: str = const.DEFAULT_TOKEN_ALGORITHM, access_expiration_hours: int = 24, refresh_expiration_days: int = 30, cache_service: CacheBackendProtocol | None = None, rotation_interval_days: int = 90, grace_period_seconds: int = const.DEFAULT_JWT_KEY_ROTATION_GRACE_PERIOD_SECONDS, logger: Logger | None = None, *, audit_logger: AuditLoggerProtocol | None = None, binding_config: TokenBindingConfig | None = None, required_audience: str | None = None ) -> None
Initialize the JWT token manager.
| Parameter | Type | Description |
|---|---|---|
| `current_key_id` | str | The ID of the currently active signing key. |
| `keys` | dict[str, str | SecretStr | dict[str, str | SecretStr]] | None | Dictionary mapping key IDs to key material. If None, current_key_id is treated as a single secret. |
| `algorithm` | str | JWT signing algorithm (e.g., "HS256", "RS256"). |
| `access_expiration_hours` | int | Hours until access tokens expire. |
| `refresh_expiration_days` | int | Days until refresh tokens expire. |
| `cache_service` | CacheBackendProtocol | None | Optional cache backend for token validation. |
| `rotation_interval_days` | int | Days between key rotations. |
| `grace_period_seconds` | int | Seconds after key rotation during which tokens signed by the outgoing key remain valid. Defaults to 3600 (1 hour). Set to 0 to invalidate all old-key tokens immediately on rotation. |
| `audit_logger` | AuditLoggerProtocol | None | Optional AuditLoggerProtocol used to record token revocation events. When not provided, no audit entries are written. |
| `binding_config` | TokenBindingConfig | None | Optional TokenBindingConfig for opt-in client binding. When set, tokens embed a ``bind`` claim containing a SHA-256 hash of active binding factors (IP, fingerprint). Tokens issued without a ``bind`` claim continue to verify successfully so that binding can be enabled incrementally. |
| `required_audience` | str | None | When set, every call to ``verify_token`` will enforce that the token's ``aud`` claim matches this value. Pass ``allow_missing_audience=True`` to ``verify_token`` to bypass the check on a per-call basis for trusted internal paths. |
Live view of the key material managed by the key store.
The key ID currently used for signing new tokens.
Allow external callers to update the active key ID on the store.
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None
Attach an optional hook registry after provider boot wiring.
Rotate to a new signing key, delegating lifecycle to the key store.
Old keys are retained for grace_period_seconds (constructor
parameter, default 3600 s) so tokens they signed remain verifiable
during the overlap window.
| Parameter | Type | Description |
|---|---|---|
| `new_key_id` | str | ID for new key. |
| `new_secret` | str | dict | New secret key (string for symmetric or dict for asymmetric). |
Return current key metadata (for inspection/operations).
Extract user information from access token.
| Type | Description |
|---|---|
| Result[VerifiedToken, ContractsTokenError] | ``Ok(VerifiedToken)`` if the token is a valid access token, or ``Err(TokenError)`` for expected domain failures. |
LockoutConfig
Section titled “LockoutConfig”Configuration for account lockout on repeated failed login attempts.
Attributes:
max_failed_attempts: Number of failed attempts within the observation
window that triggers a lockout. Defaults to 5.
lockout_duration_seconds: Length of the rolling observation window in
seconds. The lockout is lifted automatically once all recorded
failures fall outside this window. Defaults to 300 (5 minutes).
max_attempts: Canonical alias for max_failed_attempts. Both
fields default to 5 and are kept in sync by __post_init__.
Note
For distributed deployments pass a CacheBackendProtocol to
LoginAttemptTracker instead of relying on the in-process
dict managed by AuthenticationService.
LoginAttemptTracker
Section titled “LoginAttemptTracker”Tracks failed login attempts and enforces account lockout.
Supports both in-process state (no external dependency) and distributed state via an injected CacheBackendProtocol. When a cache backend is provided all attempt records are stored there so that multiple application instances share the same view — preventing an attacker from bypassing lockout by rotating across instances.
| Parameter | Type | Description |
|---|---|---|
| `max_attempts` | Consecutive failures within *lockout_duration_seconds* that trigger a lockout. Defaults to 5. | |
| `lockout_duration_seconds` | Rolling window size **and** cache TTL. Defaults to 900 seconds (15 minutes). | |
| `cache` | Optional cache backend for distributed state. When ``None`` an in-process ``dict`` is used instead. |
Example
tracker = LoginAttemptTracker(max_attempts=5, lockout_duration_seconds=900)
await tracker.record_failure("user@example.com")locked = await tracker.is_locked("user@example.com") # False until 5 failures
await tracker.clear("user@example.com") # reset on successful logindef __init__( max_attempts: int = 5, lockout_duration_seconds: int = 900, cache: CacheBackendProtocol | None = None ) -> None
Return True if identifier has exceeded the failure threshold.
| Parameter | Type | Description |
|---|---|---|
| `identifier` | str | Username, e-mail address, or IP used as the tracking key. |
Record a failed authentication attempt for identifier.
| Parameter | Type | Description |
|---|---|---|
| `identifier` | str | Username, e-mail address, or IP used as the tracking key. |
Remove all recorded failures for identifier (call on success).
| Parameter | Type | Description |
|---|---|---|
| `identifier` | str | Username, e-mail address, or IP used as the tracking key. |
MongoDBOAuthIdentityStore
Section titled “MongoDBOAuthIdentityStore”MongoDB-backed OAuth identity store
async def create_oauth_identity( user_id: str, provider: str, provider_user_id: str ) -> OAuthIdentity
Create OAuth identity link
async def get_oauth_identity( provider: str, provider_user_id: str ) -> OAuthIdentity | None
Get OAuth identity by provider and provider user ID
async def get_oauth_identities_for_user(user_id: str) -> list[OAuthIdentity]
Get all OAuth identities for a user
Delete OAuth identity
Delete all OAuth identities for a user
Get local user_id by OAuth provider and external user ID.
Resolve user_id from either UUID or OAuth external ID.
Resolution logic:
- If user_id_or_oauth_id is a valid UUID format, check if user exists
- If not a valid UUID, treat it as an OAuth provider_user_id and look up
Synchronous resolution is not supported for database-backed store.
OAuthIdentity
Section titled “OAuthIdentity”OAuth identity linking user to provider
PasswordChanged
Section titled “PasswordChanged”Emitted when a user successfully changes their password.
Attributes: user_id: ID of the user whose password was changed.
PasswordHasher
Section titled “PasswordHasher”Bcrypt password hasher implementing the PasswordHasherProtocol.
Provides secure password hashing using bcrypt with UTF-8 aware truncation.
hash and verify are static methods so they can be called directly
on the class (await PasswordHasher.hash(password)) or on an instance.
Hash a password using bcrypt with UTF-8 aware truncation.
Verify a password against its hash asynchronously.
Check if the hash needs to be rehashed.
Rehash the password if needed.
PasswordPolicy
Section titled “PasswordPolicy”Password policy configuration.
Lazy-loads common passwords file only when needed. Implements PasswordPolicyProtocol for dependency injection compatibility.
def __init__( min_length: int = 8, max_length: int = 128, require_uppercase: bool = True, require_lowercase: bool = True, require_digits: bool = False, require_special: bool = False, prevent_common: bool = True, prevent_reuse: bool = False, history_size: int = 5, common_passwords_file: str | None = None, banned_patterns: list[str] | None = None )
def from_config( cls, config: PasswordConfig ) -> PasswordPolicy
Build a PasswordPolicy from a PasswordConfig dataclass.
| Parameter | Type | Description |
|---|---|---|
| `config` | PasswordConfig | Password complexity configuration from ``AuthConfig.password``. |
| Type | Description |
|---|---|
| PasswordPolicy | A configured PasswordPolicy instance. |
Validate password against policy.
| Parameter | Type | Description |
|---|---|---|
| `password` | str | Plain text password to validate. |
| Exception | Description |
|---|---|
| ValueError | If the password violates the policy. |
Return True if the password satisfies the policy without raising.
| Parameter | Type | Description |
|---|---|---|
| `password` | str | Plain text password. |
| Type | Description |
|---|---|
| bool | True if valid, False otherwise. |
PersistentTokenRevocationStore
Section titled “PersistentTokenRevocationStore”Persistent token revocation list backed by a CacheBackendProtocol.
Stores revoked token IDs in the cache so that revocations survive process restarts and are consistent across all nodes in a distributed deployment.
A sentinel value ("1") is written under
lexigram:auth:revoked:{token_id} with the configured TTL. When
expires_at is provided to revoke, the TTL is capped to the
token’s remaining lifetime so stale entries are not kept longer than needed.
| Parameter | Type | Description |
|---|---|---|
| `cache` | Cache backend used to persist revoked token identifiers. | |
| `ttl` | Default expiry in seconds for revocation records. Defaults to ``86400`` (24 h). |
Mark token_id as revoked.
If the token is already expired (expires_at is in the past) the
call is a no-op — there is no point persisting an entry for a token
that the signature validation layer will reject on its own.
| Parameter | Type | Description |
|---|---|---|
| `token_id` | str | Unique identifier of the JWT/opaque token to revoke. |
| `expires_at` | datetime | None | Optional token expiry time. Used to compute a shorter TTL so the cache entry is not held longer than the token's own lifetime. |
Return True if token_id is present in the revocation list.
| Parameter | Type | Description |
|---|---|---|
| `token_id` | str | The token identifier to check. |
| Type | Description |
|---|---|
| bool | ``True`` when the token has been explicitly revoked and the revocation entry has not yet expired, ``False`` otherwise. |
RBACConfig
Section titled “RBACConfig”RBAC system configuration.
RoleDefinition
Section titled “RoleDefinition”Definition of a role with its permissions.
SQLAlchemyOAuthIdentityStore
Section titled “SQLAlchemyOAuthIdentityStore”Database-backed OAuth identity store
async def create_oauth_identity( user_id: str, provider: str, provider_user_id: str ) -> OAuthIdentity
Create OAuth identity link
async def get_oauth_identity( provider: str, provider_user_id: str ) -> OAuthIdentity | None
Get OAuth identity by provider and provider user ID
async def get_oauth_identities_for_user(user_id: str) -> list[OAuthIdentity]
Get all OAuth identities for a user
Delete OAuth identity
Delete all OAuth identities for a user
Get local user_id by OAuth provider and external user ID.
Resolve user_id from either UUID or OAuth external ID.
Resolution logic:
- If user_id_or_oauth_id is a valid UUID format, check if user exists
- If not a valid UUID, treat it as an OAuth provider_user_id and look up
Synchronous resolution is not supported for database-backed store.
This method is required by IdentityResolverProtocol but cannot be implemented safely for an async database provider.
SessionCookieBackend
Section titled “SessionCookieBackend”Session-cookie authentication backend for SSR flows.
Reads session ID from cookie, resolves user from SessionRepositoryProtocol.
Because session records contain only a user_id, resolving the full
AuthenticatedUserProtocol requires
a user_fetcher callable that looks up the concrete user object from
whatever store the application uses (database, cache, etc.). This keeps
the backend decoupled from any specific user-store implementation.
Constructor args:
session_repository: Persistence backend for session records.
user_fetcher: Async callable that accepts a user_id string and
returns the matching AuthenticatedUserProtocol, or
None when no user exists for that ID.
cookie_name: Name of the session cookie (default "session_id").
secure: Whether to set the Secure flag (default True).
http_only: Whether to set the HttpOnly flag (default True).
same_site: The SameSite policy (default "lax").
Example
backend = SessionCookieBackend( session_repository=sql_session_repo, user_fetcher=user_service.get_authenticated_user,)
# In an SSR handler:user = await backend.authenticate(request)if user is None: # redirect to /login ...def __init__( session_repository: SessionRepositoryProtocol, user_fetcher: Callable[[str], Awaitable[AuthenticatedUserProtocol | None]], cookie_name: str = 'session_id', secure: bool = True, http_only: bool = True, same_site: str = 'lax' ) -> None
async def authenticate(request: Any) -> AuthenticatedUserProtocol | None
Extract session cookie, validate, return user or None.
Reads cookie_name from request.cookies, fetches the
corresponding active session record, resolves the user via
user_fetcher, and refreshes the last_active_at timestamp.
| Parameter | Type | Description |
|---|---|---|
| `request` | Any | The incoming request object. Must expose a ``cookies`` mapping attribute (Starlette / ASGI-compatible). |
| Type | Description |
|---|---|
| AuthenticatedUserProtocol | None | The authenticated user if the session is valid, ``None`` otherwise. |
Create a session record and set the session cookie on response.
| Parameter | Type | Description |
|---|---|---|
| `response` | Any | The outgoing response object. Must expose a ``set_cookie`` method with ``key``, ``value``, ``max_age``, ``secure``, ``httponly``, and ``samesite`` kwargs (Starlette / ASGI-compatible). |
| `user_id` | str | Identifier of the user to create a session for. |
| `expires_in` | int | Session lifetime in **seconds** (default 86 400 = 1 day). |
| Type | Description |
|---|---|
| str | The newly created ``session_id``. |
Invalidate the session and clear the cookie.
| Parameter | Type | Description |
|---|---|---|
| `request` | Any | The incoming request. ``request.cookies`` is read to find the current session ID. |
| `response` | Any | The outgoing response. ``delete_cookie`` is called to remove the session cookie from the browser. |
SessionCreated
Section titled “SessionCreated”Emitted when a new session/token is issued.
Attributes: session_id: Identifier for the new session. user_id: ID of the user who owns the session.
SessionRevoked
Section titled “SessionRevoked”Emitted when a session/token is invalidated.
Attributes: session_id: Identifier of the revoked session. user_id: ID of the user who owned the session.
TokenRevoked
Section titled “TokenRevoked”Emitted when a specific token is revoked.
Attributes: token_id: JTI or opaque identifier of the revoked token. user_id: ID of the user who owned the token. reason: Human-readable reason for revocation.
User model representing an authenticated user.
This model serves as the core user representation within the auth package. It implements the AuthenticatedUserProtocol protocol.
Check if user has a specific role.
Check if user has a specific permission.
def with_role(role: str) -> User
Return a new User with the given role added.
def without_role(role: str) -> User
Return a new User with the given role removed.
def with_permission(permission: str) -> User
Return a new User with the given permission added.
def without_permission(permission: str) -> User
Return a new User with the given permission removed.
def record_login() -> User
Return a new User with the current login recorded.
Return a safe public representation of the user.
def from_dict( cls, data: dict[str, Any] ) -> User
Create from dictionary.
UserAuthenticated
Section titled “UserAuthenticated”Emitted when a user successfully authenticates.
Attributes:
user_id: ID of the authenticated user.
method: Authentication method used (e.g. "password").
ip: Remote IP address of the request, if available.
UserLockedOut
Section titled “UserLockedOut”Emitted when a login attempt is rejected because the account is locked.
Attributes: user_id: ID of the locked-out user. email: Email address of the locked-out user.
UserLoggedIn
Section titled “UserLoggedIn”Emitted after a successful password-based login.
Attributes: user_id: ID of the authenticated user. email: Email address used to log in.
UserLoggedOut
Section titled “UserLoggedOut”Emitted when a user explicitly logs out.
Attributes: user_id: ID of the user who logged out.
UserLoginFailed
Section titled “UserLoginFailed”Emitted when a login attempt fails due to bad credentials.
Attributes: email: Email address used in the failed attempt. reason: Human-readable reason for the failure.
UserRegistered
Section titled “UserRegistered”Emitted when a new user account is created.
Attributes: user_id: ID of the newly created user. email: Email address of the newly created user.
UserService
Section titled “UserService”Service for user management operations.
Handles user CRUD, password management, and administrative operations.
def __init__( password_policy: PasswordPolicy, user_store: Any, event_bus: EventBusProtocol | None = None ) -> None
async def create_user( name: str, email: str, password: str, roles: list[str] | None = None ) -> Result[User, EmailExistsError | PasswordPolicyError]
Create a new user.
| Type | Description |
|---|---|
| Result[User, EmailExistsError | PasswordPolicyError] | ``Ok(User)`` on success. ``Err(PasswordPolicyError)`` if the password does not satisfy policy. ``Err(EmailExistsError)`` if the email address is already registered. |
Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.
async def get_user(user_id: str) -> User | None
Get user by ID.
async def update_user(user: User) -> Result[User, UserNotFoundError | ValidationError]
Update user information.
| Type | Description |
|---|---|
| Result[User, UserNotFoundError | ValidationError] | ``Ok(User)`` on success. ``Err(UserNotFoundError)`` if the user does not exist. ``Err(ValidationError)`` if the supplied user data is invalid (e.g. empty email). |
Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.
Delete a user.
| Type | Description |
|---|---|
| Result[None, UserNotFoundError] | ``Ok(None)`` on success. ``Err(UserNotFoundError)`` if the user does not exist. |
| Exception | Description |
|---|---|
| AuthorizationError | If attempting to delete the protected ``admin`` account (security boundary — not a recoverable Result path). |
Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.
Deactivate (lock) a user account.
Sets is_active = False so the user cannot log in until unlocked.
| Type | Description |
|---|---|
| Result[None, UserNotFoundError] | ``Ok(None)`` on success. ``Err(UserNotFoundError)`` if the user does not exist. |
Infrastructure exceptions propagate unchanged.
Reactivate (unlock) a previously locked user account.
Sets is_active = True so the user may log in again.
| Type | Description |
|---|---|
| Result[None, UserNotFoundError] | ``Ok(None)`` on success. ``Err(UserNotFoundError)`` if the user does not exist. |
Infrastructure exceptions propagate unchanged.
async def change_user_password( user_id: str, current_password: str, new_password: str ) -> Result[None, InvalidCredentialsError | PasswordPolicyError]
Change a user’s password (requires current password).
| Type | Description |
|---|---|
| Result[None, InvalidCredentialsError | PasswordPolicyError] | ``Ok(None)`` on success. ``Err(InvalidCredentialsError)`` if the current password is wrong or the user does not exist. ``Err(PasswordPolicyError)`` if the new password violates policy or has been used recently. |
Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.
Set a user’s password (admin operation).
async def list_users( skip: int = 0, limit: int = 100 ) -> list[User]
List users with pagination.
Count total users.
Cancel and await all pending background event tasks.
UserStatus
Section titled “UserStatus”User account status values.
Functions
Section titled “Functions”optional_auth
Section titled “optional_auth”
Decorator that attaches optional auth: user is populated if present, never blocked.
Can be used directly without invocation (unlike require_auth which always
requires parentheses)
@optional_authasync def public_endpoint(request): user = getattr(request.state, "user", None) ...| Parameter | Type | Description |
|---|---|---|
| `func` | Callable | The async route handler to wrap. |
| Type | Description |
|---|---|
| Callable | The wrapped handler which always passes through. |
require_auth
Section titled “require_auth”
def require_auth( roles: list[str] | None = None, permissions: list[str] | None = None, optional: bool = False ) -> Callable[[Callable], Callable]
Decorator that protects a route handler with authentication and RBAC/ABAC checks.
| Parameter | Type | Description |
|---|---|---|
| `roles` | list[str] | None | Required role names. Any single matching role is sufficient. |
| `permissions` | list[str] | None | Required permission strings. All must be satisfied. |
| `optional` | bool | When ``True``, allow unauthenticated requests to proceed (user will simply be ``None`` in ``request.state``). |
| Type | Description |
|---|---|
| Callable[[Callable], Callable] | A decorator that wraps the route handler. |
| Exception | Description |
|---|---|
| ValueError | If no request object can be found in the handler's arguments. |
require_permissions
Section titled “require_permissions”
Shorthand decorator requiring the user to hold all of permissions.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| Callable[[Callable], Callable] | A decorator equivalent to ``require_auth(permissions=list(permissions))``. |
require_roles
Section titled “require_roles”
Shorthand decorator requiring the user to have at least one of roles.
| Parameter | Type | Description |
|---|
| Type | Description |
|---|---|
| Callable[[Callable], Callable] | A decorator equivalent to ``require_auth(roles=list(roles))``. |
Exceptions
Section titled “Exceptions”AlreadyVerifiedError
Section titled “AlreadyVerifiedError”Account is already verified.
AuthError
Section titled “AuthError”Base exception for all auth errors.
AuthorizationError
Section titled “AuthorizationError”Raised when user lacks required permissions.
InvalidTokenError
Section titled “InvalidTokenError”Raised when a token is malformed or invalid.
TokenAudienceError
Section titled “TokenAudienceError”Token audience claim does not match expected.
TokenBlacklistedError
Section titled “TokenBlacklistedError”Token has been explicitly revoked.
TokenError
Section titled “TokenError”Base exception for token-related errors.
TokenExpiredError
Section titled “TokenExpiredError”Raised when a token has expired.
TokenExpiredVerificationError
Section titled “TokenExpiredVerificationError”Account verification has expired.
TokenInvalidError
Section titled “TokenInvalidError”Token is structurally invalid or has wrong type.
TokenNotFoundError
Section titled “TokenNotFoundError”Token record does not exist.
VerificationError
Section titled “VerificationError”Base class for expected, recoverable account-verification failures.
All subtypes signal situations the caller should handle gracefully (e.g. redirect to a re-verification page or surface a user-facing error).