Skip to content
GitHubDiscord

API Reference

Protocol for resolving external OAuth identities to internal user IDs.

This protocol is used to map OAuth external IDs (like Google’s ‘sub’ claim) to internal UUIDs. This is necessary because OAuth providers use their own ID format which may not be valid UUIDs.

async def resolve_user_id(
    external_id: str,
    provider: str = 'google'
) -> str | None

Resolve an external OAuth ID to an internal user UUID.

Parameters
ParameterTypeDescription
`external_id`strThe external ID from the OAuth provider (e.g., Google's sub claim).
`provider`strThe OAuth provider name (e.g., "google", "apple").
Returns
TypeDescription
str | NoneThe internal user UUID if found, None otherwise.
def resolve_user_id_sync(
    external_id: str,
    provider: str = 'google'
) -> str | None

Synchronous version of resolve_user_id.

Parameters
ParameterTypeDescription
`external_id`strThe external ID from the OAuth provider.
`provider`strThe OAuth provider name.
Returns
TypeDescription
str | NoneThe internal user UUID if found, None otherwise.

Protocol for OAuth identity storage.

OAuthIdentityStore manages the linking between local user accounts and external OAuth2 identity providers. This enables users to authenticate using social login while maintaining a consistent local user identity.

Example

Using OAuth identity store

store = await container.resolve(OAuthIdentityStore)
# Link OAuth identity to user
identity = await store.create_oauth_identity(
user_id="user-123",
provider="google",
provider_user_id="google-456"
)
# Find user by OAuth identity
identity = await store.get_oauth_identity("google", "google-456")
if identity:
user = await store.get_user_by_oauth_identity(identity.id)
async def create_oauth_identity(
    user_id: str,
    provider: str,
    provider_user_id: str
) -> OAuthIdentity

Create an OAuth identity link for a user.

Parameters
ParameterTypeDescription
`user_id`strThe local user ID to link the OAuth identity to.
`provider`strThe OAuth provider name (e.g., "google", "github").
`provider_user_id`strThe user's ID at the OAuth provider.
Returns
TypeDescription
OAuthIdentityThe created OAuthIdentity instance.
async def get_oauth_identity(
    provider: str,
    provider_user_id: str
) -> OAuthIdentity | None

Get OAuth identity by provider and provider user ID.

Parameters
ParameterTypeDescription
`provider`strThe OAuth provider name.
`provider_user_id`strThe user's ID at the OAuth provider.
Returns
TypeDescription
OAuthIdentity | NoneThe OAuthIdentity if found, None otherwise.
async def get_oauth_identities_for_user(user_id: str) -> list[OAuthIdentity]

Get all OAuth identities for a user

async def delete_oauth_identity(
    provider: str,
    provider_user_id: str
) -> bool

Delete OAuth identity

async def delete_oauth_identities_for_user(user_id: str) -> int

Delete all OAuth identities for a user

async def get_user_by_oauth_identity(
    provider: str,
    provider_user_id: str
) -> str | None

Get local user_id by OAuth provider and external user ID.

This is the key method for resolving OAuth external IDs to local user IDs. Used when OAuth tokens contain non-UUID user identifiers (like Google’s sub claim).

Parameters
ParameterTypeDescription
`provider`strThe OAuth provider name (e.g., "google", "github", "apple").
`provider_user_id`strThe user's ID at the OAuth provider.
Returns
TypeDescription
str | NoneThe local user_id if found, None otherwise.
async def resolve_user_id(
    user_id_or_oauth_id: str,
    provider: str = 'google'
) -> str | None

Resolve a user_id that may be either a UUID or an OAuth external ID.

This method handles the common issue where OAuth providers (like Google) use non-UUID identifiers (e.g., “101158382316025899191”) which cannot be used directly in database queries expecting UUIDs.

Resolution logic:

  1. If user_id_or_oauth_id is a valid UUID format, check if user exists
  2. If not a valid UUID, treat it as an OAuth provider_user_id and look up
Parameters
ParameterTypeDescription
`user_id_or_oauth_id`strEither a local user UUID or an OAuth provider's external ID.
`provider`strThe OAuth provider to search in (default: "google").
Returns
TypeDescription
str | NoneThe resolved local user_id if found, None otherwise.

Validates and decodes authentication tokens.
async def validate(token: str) -> TokenPayload | None

Validate a token and return its payload, or None if invalid.

async def revoke(token: str) -> bool

Revoke a token, preventing future validation.


Authenticates requests by validating an API key.

Key extraction order:

  1. request_context["headers"][config.header_name]
  2. request_context["query_params"][config.query_param]

If the key is absent or the lookup returns None an Err(AuthenticationError) is returned. On success Ok(User) is returned.

Example

config = APIKeyConfig(
lookup={"sk_live_abc123": admin_user},
)
authenticator = APIKeyAuthenticator(config)
result = await authenticator.authenticate({
"headers": {"X-API-Key": "sk_live_abc123"},
"query_params": {},
})
if result.is_ok():
user = result.unwrap()
def __init__(config: APIKeyConfig) -> None

Initialise the authenticator.

Parameters
ParameterTypeDescription
`config`APIKeyConfigKey-extraction and lookup configuration.
async def authenticate(request_context: dict[str, Any]) -> Result[User, AuthenticationError]

Authenticate a request using its API key.

Parameters
ParameterTypeDescription
`request_context`dict[str, Any]A mapping that **must** contain at least one of: * ``"headers"``: ``dict[str, str]`` of HTTP request headers. * ``"query_params"``: ``dict[str, str]`` of URL query params.
Returns
TypeDescription
Result[User, AuthenticationError]``Ok(User)`` when a valid API key is found. ``Err(AuthenticationError)`` when the key is absent, invalid, or the lookup returns ``None``.

Configuration for APIKeyAuthenticator.

Attributes: header_name: HTTP header to inspect for the API key. Defaults to "X-API-Key". query_param: URL query parameter name to fall back to when the header is absent. Defaults to "api_key". lookup: Either a dict[str, User] mapping raw key strings to their owners, or an async/sync callable that accepts the raw key and returns User | None. This field is required.


Payload fired when an authentication attempt fails.

Attributes: method: Authentication method that was attempted. reason: Short description of why authentication failed.


Composite provider that wires the full Lexigram auth stack.

Composes AuthenticationProvider, TokenProvider, SessionProvider, and AuthorizationProvider so that callers only need to register a single provider:

.. code-block:: python

container.add_provider(AuthBundleProvider(config=auth_config))

Dependencies registered by each sub-provider are available in the container after register completes.

Parameters
ParameterTypeDescription
`config`Shared AuthConfig forwarded to every sub-provider. When ``None``, each sub-provider uses its own defaults.
`initial_roles`Optional initial RBAC roles forwarded to AuthorizationProvider.
`enable_passkeys`When ``True``, append PasskeyProvider to the sub-provider list (requires the WebAuthn extra to be installed).
`kwargs`Extra keyword arguments forwarded to AuthenticationProvider.
def __init__(
    config: AuthConfig | None = None,
    initial_roles: dict[str, Any] | None = None,
    enable_passkeys: bool = False,
    **kwargs: Any
) -> None
def from_config(
    cls,
    config: AuthConfig,
    **context: Any
) -> Self

Create provider from config object.

async def register(container: ContainerRegistrarProtocol) -> None

Register all auth sub-providers with the container.

Parameters
ParameterTypeDescription
`container`ContainerRegistrarProtocolThe DI container registrar.
async def boot(container: ContainerResolverProtocol) -> None

Boot all auth sub-providers in registration order.

Parameters
ParameterTypeDescription
`container`ContainerResolverProtocolThe DI container resolver.
async def shutdown() -> None

Shut down all auth sub-providers in reverse registration order.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Aggregate health check across all sub-providers.

Returns DEGRADED if any sub-provider is unhealthy.

Parameters
ParameterTypeDescription
`timeout`floatPer-provider timeout budget in seconds.
Returns
TypeDescription
HealthCheckResultAn aggregated HealthCheckResult.

Hierarchical root configuration for Lexigram Auth.

Attributes: name: Configuration name (default: “auth”) enabled: Whether the auth module is enabled users: Initial users to create roles: Role definitions for RBAC rbac: RBAC system configuration token: JWT configuration middleware: Authentication middleware configuration secret_key: Secret key for signing tokens admin_email: Initial admin email admin_password: Initial admin password login_rate_limit: Rate limit for login endpoints oauth2_providers: OAuth2 provider configurations

def validate_security() -> AuthConfig

Ensure secure settings in production.


Full authentication and authorization stack (JWT, OAuth2/OIDC, RBAC, policies).

Call configure to configure the auth bundle with an AuthConfig.

Usage

from lexigram.auth.config import AuthConfig
@module(
imports=[AuthModule.configure(AuthConfig(secret_key="..."))]
)
class AppModule(Module):
pass
def configure(
    cls,
    config: Any | None = None,
    initial_roles: dict[str, Any] | None = None,
    is_global: bool = True
) -> DynamicModule

Create an AuthModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`Any | NoneAuthConfig or ``None`` for framework defaults (development-only ephemeral secrets).
`initial_roles`dict[str, Any] | NoneOptional RBAC role seed forwarded to the authorization sub-provider.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
def stub(cls) -> DynamicModule

Return an in-memory AuthModule suitable for unit and integration testing.

Uses ephemeral in-memory storage with a fixed test secret key. No external token services, databases, or OAuth providers are configured.

Returns
TypeDescription
DynamicModuleA DynamicModule backed by in-memory auth storage.

Result of an authentication attempt.

Authentication status values.

Authentication token response.

Payload fired when an access or refresh token is issued.

Attributes: user_id: Identifier of the user the token was issued for. token_type: Token kind (e.g. "access", "refresh").


Payload fired when an access token is refreshed.

Attributes: user_id: Identifier of the user whose token was refreshed. token_type: Token kind that was refreshed.


Payload fired when a token is explicitly revoked.

Attributes: user_id: Identifier of the user whose token was revoked. token_type: Token kind that was revoked.


Payload fired when a user successfully authenticates.

Attributes: user_id: Identifier of the authenticated user. method: Authentication method used (e.g. "password", "oauth2").


Emitted when an authentication attempt fails.

Attributes: email: Email that was used in the failed attempt. reason: Human-readable reason for the failure. ip: Remote IP address of the request, if available.


User authentication ONLY (login/logout/validation).

config is the primary parameter and drives all defaults. The optional keyword arguments are override points for callers that need to supply custom implementations (e.g., a production-grade UserStoreProtocol backed by a database, or a pre-built token manager).

Parameters
ParameterTypeDescription
`config`The resolved AuthConfig. JWT credentials (``config.token.secret_key``, ``config.token.algorithm``) are used to build a JWTTokenManager automatically when no ``token_manager`` override is provided.
`password_policy`Override the password policy derived from ``config.password``. When *None* the policy is built from config; falls back to ``PasswordPolicy()`` when config is also absent.
`user_store`Override the user store. Defaults to InMemoryUserStore.
`token_manager`Supply a fully constructed token manager. When *None* and ``config.token`` is present the manager is built automatically from the JWT configuration.
`cache_service`Optional cache backend passed to the auto-built token manager for token blacklisting. Ignored when ``token_manager`` is provided directly.
`mfa_service`Optional MFA manager for multi-factor authentication flows.
def __init__(
    config: Annotated[AuthConfig, Inject] | None = None,
    *,
    password_policy: PasswordPolicy | None = None,
    user_store: UserStoreProtocol | None = None,
    token_manager: Any = None,
    cache_service: Any = None,
    mfa_service: MFAManager | None = None
) -> None
property service() -> AuthenticationService

Get or create the authentication service.

async def get_user(user_id: str) -> Any | None

Fetch a user by their ID.

Satisfies AuthProviderProtocol.

Parameters
ParameterTypeDescription
`user_id`strThe unique identifier of the user to retrieve.
Returns
TypeDescription
Any | NoneThe user object, or ``None`` if not found.
async def register(container: ContainerRegistrarProtocol) -> None

Register authentication services with the container.

Registers both concrete implementations and their corresponding protocols to enable dependency injection across extensions without direct imports.

async def boot(container: BootContainerProtocol) -> None

Initialize authentication provider and register with kernel health registry.

async def shutdown() -> None

Shutdown authentication provider.

async def verify_token(token: str) -> Result[VerifiedToken, TokenError]

Verify a JWT token and return a Result with the decoded payload.

Satisfies AuthProviderProtocol.

Delegates to token_manager.verify_token() when a token manager is configured. Returns an Err result if no token manager is set or if the token is invalid for any expected domain reason.

Infrastructure failures (cache unavailable, network errors) are still raised as exceptions and must be handled by the caller.

Parameters
ParameterTypeDescription
`token`strThe JWT token string to verify.
Returns
TypeDescription
Result[VerifiedToken, TokenError]``Ok(VerifiedToken)`` if the token is valid and not revoked, or ``Err(TokenError)`` for expected domain failures.
async def validate_session(token: str) -> Any

Validate a session token and return user information.

Delegates to the underlying authentication service’s get_user_from_token method. Returns a Result[VerifiedToken, TokenError] or None when no token manager is configured.

Parameters
ParameterTypeDescription
`token`strThe session or JWT token to validate.
Returns
TypeDescription
Any``Result[VerifiedToken, TokenError]`` on success/failure, or ``None``.
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check authentication provider health.


Service for core authentication operations.

Handles user login, registration, token creation and validation.

def __init__(
    password_policy: PasswordPolicy,
    user_store: UserStoreProtocol,
    token_manager: JWTTokenManager,
    lockout_config: LockoutConfig | None = None,
    event_bus: EventBusProtocol | None = None,
    tracker: LoginAttemptTracker | None = None,
    hooks: HookRegistryProtocol | None = None
) -> None
def set_hook_registry(hooks: HookRegistryProtocol | None) -> None

Attach an optional hook registry after provider boot wiring.

async def authenticate_user(
    email: str,
    password: str
) -> Result[User, InvalidCredentialsError | AccountLockedError]

Authenticate a user with email and password.

Always performs a password hash verification regardless of whether the user exists. This constant-time behaviour prevents user-enumeration attacks via timing side-channels.

If the account has exceeded LockoutConfig.max_attempts within the observation window an AccountLockedError is returned immediately without performing credential verification.

Returns
TypeDescription
Result[User, InvalidCredentialsError | AccountLockedError]``Ok(User)`` if authentication succeeds, ``Err(AccountLockedError)`` if the account is temporarily locked, ``Err(InvalidCredentialsError)`` otherwise.
async def register_user(request: RegisterRequest) -> Result[User, EmailExistsError | PasswordPolicyError]

Register a new user.

Parameters
ParameterTypeDescription
`request`RegisterRequestRegistration request with email, name, and password.
Returns
TypeDescription
Result[User, EmailExistsError | PasswordPolicyError]``Ok(User)`` on success. ``Err(PasswordPolicyError)`` if passwords do not match or the password violates the policy. ``Err(EmailExistsError)`` if the email is already registered.
def create_token(user: User) -> AuthToken

Create an authentication token for a user.

async def verify_token(token: str) -> Result[VerifiedToken, TokenError]

Verify and decode an authentication token.

Returns
TypeDescription
Result[VerifiedToken, TokenError]``Ok(VerifiedToken)`` if valid, ``Err(TokenError)`` otherwise.
async def refresh_token(refresh_token: str) -> Result[AuthToken, TokenError]

Refresh an access token using a refresh token.

Parameters
ParameterTypeDescription
`refresh_token`strThe refresh token string.
Returns
TypeDescription
Result[AuthToken, TokenError]``Ok(AuthToken)`` if the refresh succeeds. ``Err(TokenError)`` if the refresh token is invalid or expired.
async def get_user_from_token(token: str) -> Result[VerifiedToken, TokenError]

Get user information from token.

Returns
TypeDescription
Result[VerifiedToken, TokenError]``Ok(VerifiedToken)`` if valid, ``Err(TokenError)`` otherwise.
async def shutdown() -> None

Cancel and await all pending background event tasks.


Role-based access control and permission management.
def __init__(
    config: Annotated[AuthConfig, Inject] | None = None,
    initial_roles: dict[str, Any] | None = None,
    **kwargs: Any
) -> None
property auth_config() -> AuthConfig | None
async def register(container: ContainerRegistrarProtocol) -> None

Register authorization services with the container.

async def boot(container: ContainerResolverProtocol) -> None

Initialize authorization provider.

async def shutdown() -> None

Shutdown authorization provider.

async def health_check(timeout: float = 5.0) -> HealthCheckResult

Check authorization provider health.


JWT Configuration
def validate_jwt_security() -> JWTConfig

Ensure secure JWT configuration in production.


JWT token management with key rotation support.

The JWTTokenManager handles creation, validation, and renewal of JWT tokens for user authentication. It supports multiple signing keys for seamless key rotation without invalidating existing tokens.

Attributes: current_key_id: ID of the currently active signing key. keys: Dictionary of key_id -> key material. algorithm: JWT signing algorithm (HS256, RS256, etc.). access_expiration_hours: Expiration time for access tokens. refresh_expiration_days: Expiration time for refresh tokens. cache_service: Optional cache backend for token validation caching. rotation_interval: Interval in seconds between key rotations.

Example

Basic token operations

manager = JWTTokenManager(
current_key_id="v1",
keys={"v1": SecretStr("secure-secret-key")},
)
# Create access token
token = await manager.create_access_token(user_id="user123")
# Verify and decode
payload = await manager.verify_token(token)

Note

In production, secrets should not be hardcoded. Use environment variables or a secrets management service.

def __init__(
    current_key_id: str,
    keys: dict[str, str | SecretStr | dict[str, str | SecretStr]] | None = None,
    algorithm: str = const.DEFAULT_TOKEN_ALGORITHM,
    access_expiration_hours: int = 24,
    refresh_expiration_days: int = 30,
    cache_service: CacheBackendProtocol | None = None,
    rotation_interval_days: int = 90,
    grace_period_seconds: int = const.DEFAULT_JWT_KEY_ROTATION_GRACE_PERIOD_SECONDS,
    logger: Logger | None = None,
    *,
    audit_logger: AuditLoggerProtocol | None = None,
    binding_config: TokenBindingConfig | None = None,
    required_audience: str | None = None
) -> None

Initialize the JWT token manager.

Parameters
ParameterTypeDescription
`current_key_id`strThe ID of the currently active signing key.
`keys`dict[str, str | SecretStr | dict[str, str | SecretStr]] | NoneDictionary mapping key IDs to key material. If None, current_key_id is treated as a single secret.
`algorithm`strJWT signing algorithm (e.g., "HS256", "RS256").
`access_expiration_hours`intHours until access tokens expire.
`refresh_expiration_days`intDays until refresh tokens expire.
`cache_service`CacheBackendProtocol | NoneOptional cache backend for token validation.
`rotation_interval_days`intDays between key rotations.
`grace_period_seconds`intSeconds after key rotation during which tokens signed by the outgoing key remain valid. Defaults to 3600 (1 hour). Set to 0 to invalidate all old-key tokens immediately on rotation.
`audit_logger`AuditLoggerProtocol | NoneOptional AuditLoggerProtocol used to record token revocation events. When not provided, no audit entries are written.
`binding_config`TokenBindingConfig | NoneOptional TokenBindingConfig for opt-in client binding. When set, tokens embed a ``bind`` claim containing a SHA-256 hash of active binding factors (IP, fingerprint). Tokens issued without a ``bind`` claim continue to verify successfully so that binding can be enabled incrementally.
`required_audience`str | NoneWhen set, every call to ``verify_token`` will enforce that the token's ``aud`` claim matches this value. Pass ``allow_missing_audience=True`` to ``verify_token`` to bypass the check on a per-call basis for trusted internal paths.
property keys() -> dict[str, Any]

Live view of the key material managed by the key store.

property current_key_id() -> str

The key ID currently used for signing new tokens.

def current_key_id(value: str) -> None

Allow external callers to update the active key ID on the store.

def set_hook_registry(hooks: HookRegistryProtocol | None) -> None

Attach an optional hook registry after provider boot wiring.

async def rotate_key(
    new_key_id: str,
    new_secret: str | dict
) -> None

Rotate to a new signing key, delegating lifecycle to the key store.

Old keys are retained for grace_period_seconds (constructor parameter, default 3600 s) so tokens they signed remain verifiable during the overlap window.

Parameters
ParameterTypeDescription
`new_key_id`strID for new key.
`new_secret`str | dictNew secret key (string for symmetric or dict for asymmetric).
def list_keys() -> dict[str, dict[str, Any]]

Return current key metadata (for inspection/operations).

async def get_user_from_token(token: str) -> Result[VerifiedToken, ContractsTokenError]

Extract user information from access token.

Returns
TypeDescription
Result[VerifiedToken, ContractsTokenError]``Ok(VerifiedToken)`` if the token is a valid access token, or ``Err(TokenError)`` for expected domain failures.

Configuration for account lockout on repeated failed login attempts.

Attributes: max_failed_attempts: Number of failed attempts within the observation window that triggers a lockout. Defaults to 5. lockout_duration_seconds: Length of the rolling observation window in seconds. The lockout is lifted automatically once all recorded failures fall outside this window. Defaults to 300 (5 minutes). max_attempts: Canonical alias for max_failed_attempts. Both fields default to 5 and are kept in sync by __post_init__.

Note

For distributed deployments pass a CacheBackendProtocol to LoginAttemptTracker instead of relying on the in-process dict managed by AuthenticationService.


Tracks failed login attempts and enforces account lockout.

Supports both in-process state (no external dependency) and distributed state via an injected CacheBackendProtocol. When a cache backend is provided all attempt records are stored there so that multiple application instances share the same view — preventing an attacker from bypassing lockout by rotating across instances.

Parameters
ParameterTypeDescription
`max_attempts`Consecutive failures within *lockout_duration_seconds* that trigger a lockout. Defaults to 5.
`lockout_duration_seconds`Rolling window size **and** cache TTL. Defaults to 900 seconds (15 minutes).
`cache`Optional cache backend for distributed state. When ``None`` an in-process ``dict`` is used instead.

Example

tracker = LoginAttemptTracker(max_attempts=5, lockout_duration_seconds=900)
await tracker.record_failure("user@example.com")
locked = await tracker.is_locked("user@example.com") # False until 5 failures
await tracker.clear("user@example.com") # reset on successful login
def __init__(
    max_attempts: int = 5,
    lockout_duration_seconds: int = 900,
    cache: CacheBackendProtocol | None = None
) -> None
async def is_locked(identifier: str) -> bool

Return True if identifier has exceeded the failure threshold.

Parameters
ParameterTypeDescription
`identifier`strUsername, e-mail address, or IP used as the tracking key.
async def record_failure(identifier: str) -> None

Record a failed authentication attempt for identifier.

Parameters
ParameterTypeDescription
`identifier`strUsername, e-mail address, or IP used as the tracking key.
async def clear(identifier: str) -> None

Remove all recorded failures for identifier (call on success).

Parameters
ParameterTypeDescription
`identifier`strUsername, e-mail address, or IP used as the tracking key.

MongoDB-backed OAuth identity store
def __init__(
    db_provider: DatabaseProviderProtocol,
    collection_name: str = 'oauth_identities'
)
async def create_oauth_identity(
    user_id: str,
    provider: str,
    provider_user_id: str
) -> OAuthIdentity

Create OAuth identity link

async def get_oauth_identity(
    provider: str,
    provider_user_id: str
) -> OAuthIdentity | None

Get OAuth identity by provider and provider user ID

async def get_oauth_identities_for_user(user_id: str) -> list[OAuthIdentity]

Get all OAuth identities for a user

async def delete_oauth_identity(
    provider: str,
    provider_user_id: str
) -> bool

Delete OAuth identity

async def delete_oauth_identities_for_user(user_id: str) -> int

Delete all OAuth identities for a user

async def get_user_by_oauth_identity(
    provider: str,
    provider_user_id: str
) -> str | None

Get local user_id by OAuth provider and external user ID.

async def resolve_user_id(
    user_id_or_oauth_id: str,
    provider: str = 'google'
) -> str | None

Resolve user_id from either UUID or OAuth external ID.

Resolution logic:

  1. If user_id_or_oauth_id is a valid UUID format, check if user exists
  2. If not a valid UUID, treat it as an OAuth provider_user_id and look up
def resolve_user_id_sync(
    external_id: str,
    provider: str = 'google'
) -> str | None

Synchronous resolution is not supported for database-backed store.


OAuth identity linking user to provider
def __init__(
    user_id: str,
    provider: str,
    provider_user_id: str,
    created_at: datetime | None = None,
    updated_at: datetime | None = None
)

Emitted when a user successfully changes their password.

Attributes: user_id: ID of the user whose password was changed.


Bcrypt password hasher implementing the PasswordHasherProtocol.

Provides secure password hashing using bcrypt with UTF-8 aware truncation. hash and verify are static methods so they can be called directly on the class (await PasswordHasher.hash(password)) or on an instance.

def __init__(rounds: int = _DEFAULT_BCRYPT_ROUNDS) -> None
async def hash(password: str) -> str

Hash a password using bcrypt with UTF-8 aware truncation.

async def verify(
    password: str,
    hashed_password: str
) -> bool

Verify a password against its hash asynchronously.

def needs_rehash(hashed_password: str) -> bool

Check if the hash needs to be rehashed.

async def rehash_if_needed(
    password: str,
    hashed_password: str | None
) -> str | None

Rehash the password if needed.


Password policy configuration.

Lazy-loads common passwords file only when needed. Implements PasswordPolicyProtocol for dependency injection compatibility.

def __init__(
    min_length: int = 8,
    max_length: int = 128,
    require_uppercase: bool = True,
    require_lowercase: bool = True,
    require_digits: bool = False,
    require_special: bool = False,
    prevent_common: bool = True,
    prevent_reuse: bool = False,
    history_size: int = 5,
    common_passwords_file: str | None = None,
    banned_patterns: list[str] | None = None
)
def from_config(
    cls,
    config: PasswordConfig
) -> PasswordPolicy

Build a PasswordPolicy from a PasswordConfig dataclass.

Parameters
ParameterTypeDescription
`config`PasswordConfigPassword complexity configuration from ``AuthConfig.password``.
Returns
TypeDescription
PasswordPolicyA configured PasswordPolicy instance.
def validate(password: str) -> None

Validate password against policy.

Parameters
ParameterTypeDescription
`password`strPlain text password to validate.
Raises
ExceptionDescription
ValueErrorIf the password violates the policy.
def is_valid(password: str) -> bool

Return True if the password satisfies the policy without raising.

Parameters
ParameterTypeDescription
`password`strPlain text password.
Returns
TypeDescription
boolTrue if valid, False otherwise.

Persistent token revocation list backed by a CacheBackendProtocol.

Stores revoked token IDs in the cache so that revocations survive process restarts and are consistent across all nodes in a distributed deployment.

A sentinel value ("1") is written under lexigram:auth:revoked:{token_id} with the configured TTL. When expires_at is provided to revoke, the TTL is capped to the token’s remaining lifetime so stale entries are not kept longer than needed.

Parameters
ParameterTypeDescription
`cache`Cache backend used to persist revoked token identifiers.
`ttl`Default expiry in seconds for revocation records. Defaults to ``86400`` (24 h).
def __init__(
    cache: CacheBackendProtocol,
    ttl: int = _DEFAULT_TTL
) -> None
async def revoke(
    token_id: str,
    expires_at: datetime | None = None
) -> None

Mark token_id as revoked.

If the token is already expired (expires_at is in the past) the call is a no-op — there is no point persisting an entry for a token that the signature validation layer will reject on its own.

Parameters
ParameterTypeDescription
`token_id`strUnique identifier of the JWT/opaque token to revoke.
`expires_at`datetime | NoneOptional token expiry time. Used to compute a shorter TTL so the cache entry is not held longer than the token's own lifetime.
async def is_revoked(token_id: str) -> bool

Return True if token_id is present in the revocation list.

Parameters
ParameterTypeDescription
`token_id`strThe token identifier to check.
Returns
TypeDescription
bool``True`` when the token has been explicitly revoked and the revocation entry has not yet expired, ``False`` otherwise.

RBAC system configuration.

Definition of a role with its permissions.

Database-backed OAuth identity store
def __init__(db_provider: DatabaseProviderProtocol)
async def create_oauth_identity(
    user_id: str,
    provider: str,
    provider_user_id: str
) -> OAuthIdentity

Create OAuth identity link

async def get_oauth_identity(
    provider: str,
    provider_user_id: str
) -> OAuthIdentity | None

Get OAuth identity by provider and provider user ID

async def get_oauth_identities_for_user(user_id: str) -> list[OAuthIdentity]

Get all OAuth identities for a user

async def delete_oauth_identity(
    provider: str,
    provider_user_id: str
) -> bool

Delete OAuth identity

async def delete_oauth_identities_for_user(user_id: str) -> int

Delete all OAuth identities for a user

async def get_user_by_oauth_identity(
    provider: str,
    provider_user_id: str
) -> str | None

Get local user_id by OAuth provider and external user ID.

async def resolve_user_id(
    user_id_or_oauth_id: str,
    provider: str = 'google'
) -> str | None

Resolve user_id from either UUID or OAuth external ID.

Resolution logic:

  1. If user_id_or_oauth_id is a valid UUID format, check if user exists
  2. If not a valid UUID, treat it as an OAuth provider_user_id and look up
def resolve_user_id_sync(
    external_id: str,
    provider: str = 'google'
) -> str | None

Synchronous resolution is not supported for database-backed store.

This method is required by IdentityResolverProtocol but cannot be implemented safely for an async database provider.


Session-cookie authentication backend for SSR flows.

Reads session ID from cookie, resolves user from SessionRepositoryProtocol.

Because session records contain only a user_id, resolving the full AuthenticatedUserProtocol requires a user_fetcher callable that looks up the concrete user object from whatever store the application uses (database, cache, etc.). This keeps the backend decoupled from any specific user-store implementation.

Constructor args: session_repository: Persistence backend for session records. user_fetcher: Async callable that accepts a user_id string and returns the matching AuthenticatedUserProtocol, or None when no user exists for that ID. cookie_name: Name of the session cookie (default "session_id"). secure: Whether to set the Secure flag (default True). http_only: Whether to set the HttpOnly flag (default True). same_site: The SameSite policy (default "lax").

Example

backend = SessionCookieBackend(
session_repository=sql_session_repo,
user_fetcher=user_service.get_authenticated_user,
)
# In an SSR handler:
user = await backend.authenticate(request)
if user is None:
# redirect to /login
...
def __init__(
    session_repository: SessionRepositoryProtocol,
    user_fetcher: Callable[[str], Awaitable[AuthenticatedUserProtocol | None]],
    cookie_name: str = 'session_id',
    secure: bool = True,
    http_only: bool = True,
    same_site: str = 'lax'
) -> None
async def authenticate(request: Any) -> AuthenticatedUserProtocol | None

Extract session cookie, validate, return user or None.

Reads cookie_name from request.cookies, fetches the corresponding active session record, resolves the user via user_fetcher, and refreshes the last_active_at timestamp.

Parameters
ParameterTypeDescription
`request`AnyThe incoming request object. Must expose a ``cookies`` mapping attribute (Starlette / ASGI-compatible).
Returns
TypeDescription
AuthenticatedUserProtocol | NoneThe authenticated user if the session is valid, ``None`` otherwise.
async def login(
    response: Any,
    user_id: str,
    expires_in: int = 86400
) -> str

Create a session record and set the session cookie on response.

Parameters
ParameterTypeDescription
`response`AnyThe outgoing response object. Must expose a ``set_cookie`` method with ``key``, ``value``, ``max_age``, ``secure``, ``httponly``, and ``samesite`` kwargs (Starlette / ASGI-compatible).
`user_id`strIdentifier of the user to create a session for.
`expires_in`intSession lifetime in **seconds** (default 86 400 = 1 day).
Returns
TypeDescription
strThe newly created ``session_id``.
async def logout(
    request: Any,
    response: Any
) -> None

Invalidate the session and clear the cookie.

Parameters
ParameterTypeDescription
`request`AnyThe incoming request. ``request.cookies`` is read to find the current session ID.
`response`AnyThe outgoing response. ``delete_cookie`` is called to remove the session cookie from the browser.

Emitted when a new session/token is issued.

Attributes: session_id: Identifier for the new session. user_id: ID of the user who owns the session.


Emitted when a session/token is invalidated.

Attributes: session_id: Identifier of the revoked session. user_id: ID of the user who owned the session.


Emitted when a specific token is revoked.

Attributes: token_id: JTI or opaque identifier of the revoked token. user_id: ID of the user who owned the token. reason: Human-readable reason for revocation.


User model representing an authenticated user.

This model serves as the core user representation within the auth package. It implements the AuthenticatedUserProtocol protocol.

def has_role(role: str) -> bool

Check if user has a specific role.

def has_permission(permission: str) -> bool

Check if user has a specific permission.

def with_role(role: str) -> User

Return a new User with the given role added.

def without_role(role: str) -> User

Return a new User with the given role removed.

def with_permission(permission: str) -> User

Return a new User with the given permission added.

def without_permission(permission: str) -> User

Return a new User with the given permission removed.

def record_login() -> User

Return a new User with the current login recorded.

def to_public_dict() -> dict[str, Any]

Return a safe public representation of the user.

def from_dict(
    cls,
    data: dict[str, Any]
) -> User

Create from dictionary.


Emitted when a user successfully authenticates.

Attributes: user_id: ID of the authenticated user. method: Authentication method used (e.g. "password"). ip: Remote IP address of the request, if available.


Emitted when a login attempt is rejected because the account is locked.

Attributes: user_id: ID of the locked-out user. email: Email address of the locked-out user.


Emitted after a successful password-based login.

Attributes: user_id: ID of the authenticated user. email: Email address used to log in.


Emitted when a user explicitly logs out.

Attributes: user_id: ID of the user who logged out.


Emitted when a login attempt fails due to bad credentials.

Attributes: email: Email address used in the failed attempt. reason: Human-readable reason for the failure.


Emitted when a new user account is created.

Attributes: user_id: ID of the newly created user. email: Email address of the newly created user.


Service for user management operations.

Handles user CRUD, password management, and administrative operations.

def __init__(
    password_policy: PasswordPolicy,
    user_store: Any,
    event_bus: EventBusProtocol | None = None
) -> None
async def create_user(
    name: str,
    email: str,
    password: str,
    roles: list[str] | None = None
) -> Result[User, EmailExistsError | PasswordPolicyError]

Create a new user.

Returns
TypeDescription
Result[User, EmailExistsError | PasswordPolicyError]``Ok(User)`` on success. ``Err(PasswordPolicyError)`` if the password does not satisfy policy. ``Err(EmailExistsError)`` if the email address is already registered.

Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.

async def get_user(user_id: str) -> User | None

Get user by ID.

async def update_user(user: User) -> Result[User, UserNotFoundError | ValidationError]

Update user information.

Returns
TypeDescription
Result[User, UserNotFoundError | ValidationError]``Ok(User)`` on success. ``Err(UserNotFoundError)`` if the user does not exist. ``Err(ValidationError)`` if the supplied user data is invalid (e.g. empty email).

Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.

async def delete_user(user_id: str) -> Result[None, UserNotFoundError]

Delete a user.

Returns
TypeDescription
Result[None, UserNotFoundError]``Ok(None)`` on success. ``Err(UserNotFoundError)`` if the user does not exist.
Raises
ExceptionDescription
AuthorizationErrorIf attempting to delete the protected ``admin`` account (security boundary — not a recoverable Result path).

Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.

async def lock_user(user_id: str) -> Result[None, UserNotFoundError]

Deactivate (lock) a user account.

Sets is_active = False so the user cannot log in until unlocked.

Returns
TypeDescription
Result[None, UserNotFoundError]``Ok(None)`` on success. ``Err(UserNotFoundError)`` if the user does not exist.

Infrastructure exceptions propagate unchanged.

async def unlock_user(user_id: str) -> Result[None, UserNotFoundError]

Reactivate (unlock) a previously locked user account.

Sets is_active = True so the user may log in again.

Returns
TypeDescription
Result[None, UserNotFoundError]``Ok(None)`` on success. ``Err(UserNotFoundError)`` if the user does not exist.

Infrastructure exceptions propagate unchanged.

async def change_user_password(
    user_id: str,
    current_password: str,
    new_password: str
) -> Result[None, InvalidCredentialsError | PasswordPolicyError]

Change a user’s password (requires current password).

Returns
TypeDescription
Result[None, InvalidCredentialsError | PasswordPolicyError]``Ok(None)`` on success. ``Err(InvalidCredentialsError)`` if the current password is wrong or the user does not exist. ``Err(PasswordPolicyError)`` if the new password violates policy or has been used recently.

Infrastructure exceptions (e.g. DB connectivity) propagate unchanged.

async def set_user_password(
    user_id: str,
    new_password: str,
    force: bool = False
) -> None

Set a user’s password (admin operation).

async def list_users(
    skip: int = 0,
    limit: int = 100
) -> list[User]

List users with pagination.

async def count_users() -> int

Count total users.

async def shutdown() -> None

Cancel and await all pending background event tasks.


User account status values.

def optional_auth(func: Callable) -> Callable

Decorator that attaches optional auth: user is populated if present, never blocked.

Can be used directly without invocation (unlike require_auth which always requires parentheses)

@optional_auth
async def public_endpoint(request):
user = getattr(request.state, "user", None)
...
Parameters
ParameterTypeDescription
`func`CallableThe async route handler to wrap.
Returns
TypeDescription
CallableThe wrapped handler which always passes through.

def require_auth(
    roles: list[str] | None = None,
    permissions: list[str] | None = None,
    optional: bool = False
) -> Callable[[Callable], Callable]

Decorator that protects a route handler with authentication and RBAC/ABAC checks.

Parameters
ParameterTypeDescription
`roles`list[str] | NoneRequired role names. Any single matching role is sufficient.
`permissions`list[str] | NoneRequired permission strings. All must be satisfied.
`optional`boolWhen ``True``, allow unauthenticated requests to proceed (user will simply be ``None`` in ``request.state``).
Returns
TypeDescription
Callable[[Callable], Callable]A decorator that wraps the route handler.
Raises
ExceptionDescription
ValueErrorIf no request object can be found in the handler's arguments.

def require_permissions(*permissions: str) -> Callable[[Callable], Callable]

Shorthand decorator requiring the user to hold all of permissions.

Parameters
ParameterTypeDescription
Returns
TypeDescription
Callable[[Callable], Callable]A decorator equivalent to ``require_auth(permissions=list(permissions))``.

def require_roles(*roles: str) -> Callable[[Callable], Callable]

Shorthand decorator requiring the user to have at least one of roles.

Parameters
ParameterTypeDescription
Returns
TypeDescription
Callable[[Callable], Callable]A decorator equivalent to ``require_auth(roles=list(roles))``.

Account is already verified.
def __init__(
    message: str = 'Account is already verified',
    user_id: str | None = None,
    **kwargs: Any
) -> None

Base exception for all auth errors.

Raised when user lacks required permissions.

Raised when a token is malformed or invalid.

Token audience claim does not match expected.
def __init__(
    message: str = 'Token audience mismatch',
    expected: str | None = None,
    actual: str | None = None,
    **kwargs: Any
) -> None

Token has been explicitly revoked.
def __init__(
    message: str = 'Token has been revoked',
    **kwargs: Any
) -> None

Base exception for token-related errors.

Raised when a token has expired.
def __init__(
    message: str = 'Token has expired',
    expiration_time: str | None = None,
    **kwargs: Any
) -> None

Account verification has expired.
def __init__(
    message: str = 'Verification has expired',
    user_id: str | None = None,
    **kwargs: Any
) -> None

Token is structurally invalid or has wrong type.
def __init__(
    message: str = 'Token is invalid',
    reason: str | None = None,
    **kwargs: Any
) -> None

Token record does not exist.
def __init__(
    message: str = 'Token not found',
    token_id: str | None = None,
    **kwargs: Any
) -> None

Base class for expected, recoverable account-verification failures.

All subtypes signal situations the caller should handle gracefully (e.g. redirect to a re-verification page or surface a user-facing error).

def __init__(
    message: str = 'Account verification error',
    **kwargs: Any
) -> None