Lexigram-Auth API
init.py
Section titled “init.py”lexigram-auth — Authentication and authorisation for the Lexigram platform.
Section titled “lexigram-auth — Authentication and authorisation for the Lexigram platform.”-
def getattr(name: str) -> Any
Lazy load attributes to avoid circular imports.
Section titled “Lazy load attributes to avoid circular imports.” -
def dir() -> list[str]
List available attributes for IDE support.
Section titled “List available attributes for IDE support.”
admin/contributor.py
Section titled “admin/contributor.py”Admin contributor for lexigram-auth — surfaces session, token, and login
Section titled “Admin contributor for lexigram-auth — surfaces session, token, and login”- class AuthAdminContributor:
init(self) -> None
Admin contributor for the lexigram-auth package.
Section titled “Admin contributor for the lexigram-auth package.”- def init(self) -> None
Initialize the contributor with no arguments.
Section titled “Initialize the contributor with no arguments.” - async def on_admin_boot(self, container: ContainerResolverProtocol) -> None
Resolve auth DI dependencies from the container.
Section titled “Resolve auth DI dependencies from the container.” - def get_dashboard_widgets(self) -> Sequence[DashboardWidgetDefinition]
Return the dashboard widget definitions for this contributor.
Section titled “Return the dashboard widget definitions for this contributor.” - def get_navigation_items(self) -> Sequence[NavigationContribution]
Return the navigation items for this contributor.
Section titled “Return the navigation items for this contributor.” - def get_health_definitions(self) -> Sequence[AdminHealthDefinition]
Return the health check definitions for this contributor.
Section titled “Return the health check definitions for this contributor.” - def get_actions(self) -> Sequence[AdminActionDefinition]
Return the action definitions for this contributor.
Section titled “Return the action definitions for this contributor.” - async def render_widget(self, widget_name: str, params: WidgetParams) -> Result[WidgetViewModel, AdminError]
Render a widget HTML via registry dispatch.
Section titled “Render a widget HTML via registry dispatch.”
- def init(self) -> None
admin/handlers/active_sessions.py
Section titled “admin/handlers/active_sessions.py”Active sessions widget handler.
Section titled “Active sessions widget handler.”- class ActiveSessionsWidgetHandler:
init(self, session_manager: SessionManagerProtocol) -> None
Handler for the active sessions widget.
Section titled “Handler for the active sessions widget.”- def init(self, session_manager: SessionManagerProtocol) -> None
Initialize handler with session manager.
Section titled “Initialize handler with session manager.” - async def get_data(self, params: WidgetParams) -> Result[ActiveSessionsViewModel, AdminError]
Fetch active sessions data.
Section titled “Fetch active sessions data.”
- def init(self, session_manager: SessionManagerProtocol) -> None
admin/handlers/failed_logins.py
Section titled “admin/handlers/failed_logins.py”Failed logins widget handler.
Section titled “Failed logins widget handler.”- class FailedLoginsWidgetHandler:
init(self, session_manager: SessionManagerProtocol) -> None
Handler for the failed logins widget.
Section titled “Handler for the failed logins widget.”- def init(self, session_manager: SessionManagerProtocol) -> None
Initialize handler with session manager.
Section titled “Initialize handler with session manager.” - async def get_data(self, params: WidgetParams) -> Result[FailedLoginsViewModel, AdminError]
Fetch failed login statistics.
Section titled “Fetch failed login statistics.”
- def init(self, session_manager: SessionManagerProtocol) -> None
admin/handlers/token_refresh_rate.py
Section titled “admin/handlers/token_refresh_rate.py”Token refresh rate widget handler.
Section titled “Token refresh rate widget handler.”- class TokenRefreshRateWidgetHandler:
init(self, session_manager: SessionManagerProtocol) -> None
Handler for the token refresh rate widget.
Section titled “Handler for the token refresh rate widget.”- def init(self, session_manager: SessionManagerProtocol) -> None
Initialize handler with session manager.
Section titled “Initialize handler with session manager.” - async def get_data(self, params: WidgetParams) -> Result[TokenRefreshRateViewModel, AdminError]
Fetch token refresh rate data.
Section titled “Fetch token refresh rate data.”
- def init(self, session_manager: SessionManagerProtocol) -> None
admin/renderer.py
Section titled “admin/renderer.py”Jinja2 widget renderer for auth admin widgets.
Section titled “Jinja2 widget renderer for auth admin widgets.”- class PackageWidgetRenderer:
init(self) -> None
Renders admin widget templates for lexigram-auth.
Section titled “Renders admin widget templates for lexigram-auth.”- def init(self) -> None
Initialize renderer with the package templates directory.
Section titled “Initialize renderer with the package templates directory.” - def render(self, template_name: str, context: dict[str, Any]) -> str
Render a template with context.
Section titled “Render a template with context.”
- def init(self) -> None
admin/viewmodels.py
Section titled “admin/viewmodels.py”Frozen viewmodels for auth admin widgets.
Section titled “Frozen viewmodels for auth admin widgets.”-
@dataclass(frozen=True)
-
class ActiveSessionsViewModel:
Data model for active sessions widget.
Section titled “Data model for active sessions widget.” -
@dataclass(frozen=True)
-
class TokenRefreshRateViewModel:
Data model for token refresh rate widget.
Section titled “Data model for token refresh rate widget.” -
@dataclass(frozen=True)
-
class FailedLoginsViewModel:
Data model for failed logins widget.
Section titled “Data model for failed logins widget.”
authn/_binding.py
Section titled “authn/_binding.py”JWT token binding configuration and helpers.
Section titled “JWT token binding configuration and helpers.”-
@dataclass
-
class TokenBindingConfig:
Configuration for opt-in JWT client binding.
Section titled “Configuration for opt-in JWT client binding.”bind_to_ip: bool = False bind_to_fingerprint: bool = False fingerprint_header: str = ‘X-Client-Fingerprint’
-
def compute_binding_hash(config: TokenBindingConfig, ctx: dict[str, str]) -> str | None
Compute a binding hash from the provided context.
Section titled “Compute a binding hash from the provided context.” -
def verify_binding(config: TokenBindingConfig, claims: dict[str, object], ctx: dict[str, str]) -> bool
Verify that the token binding hash matches the current context.
Section titled “Verify that the token binding hash matches the current context.”
authn/_jwt_creation.py
Section titled “authn/_jwt_creation.py”Token creation mixin for :class:~lexigram.auth.authn.jwt.JWTTokenManager.
Section titled “Token creation mixin for :class:~lexigram.auth.authn.jwt.JWTTokenManager.”- class _JWTCreationMixin:
init(self, ids: IdGeneratorProtocol | None = None) -> None
Mixin providing JWT token-creation methods for :class:
Section titled “Mixin providing JWT token-creation methods for :class:JWTTokenManager.”JWTTokenManager.- def init(self, ids: IdGeneratorProtocol | None = None) -> None
Initialize the JWT creation mixin.
Section titled “Initialize the JWT creation mixin.” - @property
- def current_key_id(self) -> str
Active signing key ID — provided by JWTTokenManager.
Section titled “Active signing key ID — provided by JWTTokenManager.”
- def _get_signing_key(self) -> str
Return raw signing key — provided by JWTTokenManager.
Section titled “Return raw signing key — provided by JWTTokenManager.”
- def create_access_token(self, user: User, additional_claims: dict[str, Any] | None = None, binding_context: dict[str, str] | None = None) -> str
Create a JWT access token for a user with current key.
Section titled “Create a JWT access token for a user with current key.” - def create_refresh_token(self, user: User, binding_context: dict[str, str] | None = None) -> str
Create a JWT refresh token for a user.
Section titled “Create a JWT refresh token for a user.” - def create_token_pair(self, user: User, additional_claims: dict[str, Any] | None = None, binding_context: dict[str, str] | None = None) -> AuthToken
Create both access and refresh tokens.
Section titled “Create both access and refresh tokens.” - def create_token(self, user: Any, additional_claims: dict[str, Any] | None = None, binding_context: dict[str, str] | None = None) -> AuthToken
Create auth tokens for a user, satisfying the TokenManagerProtocol protocol.
Section titled “Create auth tokens for a user, satisfying the TokenManagerProtocol protocol.”
- def init(self, ids: IdGeneratorProtocol | None = None) -> None
authn/_jwt_lifecycle.py
Section titled “authn/_jwt_lifecycle.py”Token verification and lifecycle mixin for :class:~lexigram.auth.authn.jwt.JWTTokenManager.
Section titled “Token verification and lifecycle mixin for :class:~lexigram.auth.authn.jwt.JWTTokenManager.”- class _JWTLifecycleMixin:
Mixin providing JWT token verification and lifecycle methods for :class:
Section titled “Mixin providing JWT token verification and lifecycle methods for :class:JWTTokenManager.”JWTTokenManager.- @property
- def keys(self) -> dict[str, Any]
Live key material — provided by JWTTokenManager.
Section titled “Live key material — provided by JWTTokenManager.” - @property
- def current_key_id(self) -> str
Active signing key ID — provided by JWTTokenManager.
Section titled “Active signing key ID — provided by JWTTokenManager.”
- def _get_verification_key(self, kid: str) -> str
Return raw verification key — provided by JWTTokenManager.
Section titled “Return raw verification key — provided by JWTTokenManager.”
- def create_token_pair(self, user: User, additional_claims: dict[str, Any] | None = None, binding_context: dict[str, str] | None = None) -> AuthToken
Create token pair — provided by _JWTCreationMixin.
Section titled “Create token pair — provided by _JWTCreationMixin.”
- async def _emit_action(self, hook_name: str, payload: object) -> None
Emit a token lifecycle hook when a registry is available.
Section titled “Emit a token lifecycle hook when a registry is available.”
- async def refresh_token(self, refresh_token: str) -> Result[AuthToken, ContractsTokenError]
Refresh an access token using a refresh token.
Section titled “Refresh an access token using a refresh token.” - async def refresh_with_rotation(self, refresh_token: str) -> Result[TokenPair, ContractsTokenError]
Rotate a refresh token and return a new access + refresh token pair.
Section titled “Rotate a refresh token and return a new access + refresh token pair.” - async def verify_token(self, token: str, token_type: str = ‘access’, expected_audience: str | None = None, required_scope: str | None = None, binding_context: dict[str, str] | None = None, allow_missing_audience: bool) -> Result[VerifiedToken, ContractsTokenError]
Verify and decode a JWT token with support for multiple keys.
Section titled “Verify and decode a JWT token with support for multiple keys.” - async def logout(self, token: str) -> Result[None, ContractsTokenError]
Invalidate a token by adding it to the blacklist.
Section titled “Invalidate a token by adding it to the blacklist.” - async def logout_all_user_tokens(self, user_id: str) -> Result[None, ContractsTokenError]
Invalidate all tokens for a user by writing a user-level blacklist entry.
Section titled “Invalidate all tokens for a user by writing a user-level blacklist entry.”
- async def _is_token_blacklisted(self, token: str) -> bool
Check if a token is blacklisted.
Section titled “Check if a token is blacklisted.”
- async def refresh_access_token(self, refresh_token: str) -> AuthToken
Create new access token from refresh token with rotation.
Section titled “Create new access token from refresh token with rotation.” - async def get_user_from_token(self, token: str) -> Result[VerifiedToken, ContractsTokenError]
Extract user information from access token.
Section titled “Extract user information from access token.”
authn/_key_utils.py
Section titled “authn/_key_utils.py”JWT key normalization utilities.
Section titled “JWT key normalization utilities.”- def normalize_jwt_keys(keys: dict[str, Any]) -> dict[str, SecretStr | dict[str, SecretStr]]
Coerce all JWT key material to :class:
Section titled “Coerce all JWT key material to :class:~lexigram.validation.SecretStr.”~lexigram.validation.SecretStr.
authn/account_verification.py
Section titled “authn/account_verification.py”Account verification service for Lexigram Auth.
Section titled “Account verification service for Lexigram Auth.”-
class AccountVerificationError:
Error during account verification operations.
Section titled “Error during account verification operations.” -
@inject
-
class AccountVerificationService:
Service for handling account verification flows.
Section titled “Service for handling account verification flows.”init(self, user_store: UserStoreProtocol, token_ttl_days: int = TOKEN_EXPIRY_DAYS) -> None
- def init(self, user_store: UserStoreProtocol, token_ttl_days: int = TOKEN_EXPIRY_DAYS) -> None
- def generate_verification_token(self) -> tuple[str, datetime]
Generate a secure random verification token.
Section titled “Generate a secure random verification token.” - async def send_verification(self, user_id: str) -> Result[tuple[str, datetime], AlreadyVerifiedError | UserNotFoundError]
Send verification email for a user.
Section titled “Send verification email for a user.” - async def verify(self, token: str) -> Result[None, ContractsVerificationError]
Verify user account with token.
Section titled “Verify user account with token.”
- async def _find_user_by_token(self, token: str) -> Any | None
Find user by verification token.
Section titled “Find user by verification token.”
- async def resend_verification(self, email: str) -> Result[tuple[str, datetime], AlreadyVerifiedError | UserNotFoundError]
Resend verification token.
Section titled “Resend verification token.” - def repr(self) -> str
authn/blacklist.py
Section titled “authn/blacklist.py”JWT token blacklist management.
Section titled “JWT token blacklist management.”- class JWTBlacklist:
init(self, cache: CacheBackendProtocol | None, algorithm: str, current_key_id_fn: Any, access_expiration_hours: int, refresh_expiration_days: int, audit_logger: AuditLoggerProtocol | None = None) -> None
Hash-based JWT blacklist supporting both cache and in-process storage.
Section titled “Hash-based JWT blacklist supporting both cache and in-process storage.”- def init(self, cache: CacheBackendProtocol | None, algorithm: str, current_key_id_fn: Any, access_expiration_hours: int, refresh_expiration_days: int, audit_logger: AuditLoggerProtocol | None = None) -> None
- async def revoke(self, token: str) -> Result[None, ContractsTokenError]
Add token to the blacklist.
Section titled “Add token to the blacklist.” - async def revoke_all_for_user(self, user_id: str) -> Result[None, ContractsTokenError]
Blacklist all tokens for user_id by writing a user-level sentinel.
Section titled “Blacklist all tokens for user_id by writing a user-level sentinel.” - async def is_blacklisted(self, token: str) -> bool
Return
Section titled “Return True if token has been revoked.”Trueif token has been revoked.
authn/google_oauth.py
Section titled “authn/google_oauth.py”Google OAuth verification and claim normalization helpers.
Section titled “Google OAuth verification and claim normalization helpers.”-
class GoogleOAuthService:
Verify Google OAuth tokens and normalize the verified claims.
Section titled “Verify Google OAuth tokens and normalize the verified claims.”init(self, client_id: str, http_client: HTTPClientProtocol | None, jwks_url: str, tokeninfo_url: str, userinfo_url: str, allowed_issuers: tuple[str, …], jwks_cache_ttl_seconds: int) -> None
- def init(self, client_id: str, http_client: HTTPClientProtocol | None, jwks_url: str, tokeninfo_url: str, userinfo_url: str, allowed_issuers: tuple[str, …], jwks_cache_ttl_seconds: int) -> None
- async def verify_token(self, token: str) -> VerifiedIdentityClaims
Verify a Google token, preferring ID-token JWKS validation.
Section titled “Verify a Google token, preferring ID-token JWKS validation.” - async def verify_id_token(self, token: str) -> VerifiedIdentityClaims
Verify a Google ID token against Google’s JWKS.
Section titled “Verify a Google ID token against Google’s JWKS.” - async def verify_userinfo_token(self, token: str) -> VerifiedIdentityClaims
Verify a Google access token via the userinfo endpoint.
Section titled “Verify a Google access token via the userinfo endpoint.” - async def verify_tokeninfo(self, token: str) -> VerifiedIdentityClaims
Verify a Google token using the tokeninfo endpoint fallback.
Section titled “Verify a Google token using the tokeninfo endpoint fallback.”
- async def _get_jwks(self) -> dict[str, Any]
Fetch Google’s JWKS, caching it for a short TTL.
Section titled “Fetch Google’s JWKS, caching it for a short TTL.” - def _select_jwk(self, jwks: dict[str, Any], kid: str | None) -> dict[str, Any]
Select the matching JWK for a token header.
Section titled “Select the matching JWK for a token header.” - async def _request_json(self, method: str, url: str, **kwargs: Any) -> dict[str, Any]
Fetch JSON via the injected HTTP client or a temporary httpx client.
Section titled “Fetch JSON via the injected HTTP client or a temporary httpx client.” - async def _response_json(self, response: HttpResponse | Any) -> dict[str, Any]
Normalise framework HTTP responses and test doubles to JSON dicts.
Section titled “Normalise framework HTTP responses and test doubles to JSON dicts.” - def _claims_from_payload(self, payload: dict[str, Any], issuer: str | None, audience: str | None) -> VerifiedIdentityClaims
Convert Google payloads into normalized verified identity claims.
Section titled “Convert Google payloads into normalized verified identity claims.” - def _timestamp_to_datetime(self, value: Any) -> datetime | None
Convert a numeric UNIX timestamp to UTC datetime.
Section titled “Convert a numeric UNIX timestamp to UTC datetime.” - @staticmethod
- def _looks_like_jwt(token: str) -> bool
Return True when the token resembles a JWT.
Section titled “Return True when the token resembles a JWT.”
-
global GOOGLE_ISSUERS: tuple[str, str]
-
global GOOGLE_JWKS_URL
-
global GOOGLE_TOKENINFO_URL
-
global GOOGLE_USERINFO_URL
authn/jwt.py
Section titled “authn/jwt.py”JWT token management for authentication.
Section titled “JWT token management for authentication.”- class JWTTokenManager:
init(self, current_key_id: str, keys: dict[str, str | SecretStr | dict[str, str | SecretStr]] | None = None, algorithm: str = const.DEFAULT_TOKEN_ALGORITHM, access_expiration_hours: int = 24, refresh_expiration_days: int = 30, cache_service: CacheBackendProtocol | None = None, rotation_interval_days: int = 90, grace_period_seconds: int = const.DEFAULT_JWT_KEY_ROTATION_GRACE_PERIOD_SECONDS, logger: Logger | None = None, audit_logger: AuditLoggerProtocol | None, binding_config: TokenBindingConfig | None, required_audience: str | None, ids: IdGeneratorProtocol | None, allow_unverified_dev: bool) -> None
JWT token management with key rotation support.
Section titled “JWT token management with key rotation support.”- def init(self, current_key_id: str, keys: dict[str, str | SecretStr | dict[str, str | SecretStr]] | None = None, algorithm: str = const.DEFAULT_TOKEN_ALGORITHM, access_expiration_hours: int = 24, refresh_expiration_days: int = 30, cache_service: CacheBackendProtocol | None = None, rotation_interval_days: int = 90, grace_period_seconds: int = const.DEFAULT_JWT_KEY_ROTATION_GRACE_PERIOD_SECONDS, logger: Logger | None = None, audit_logger: AuditLoggerProtocol | None, binding_config: TokenBindingConfig | None, required_audience: str | None, ids: IdGeneratorProtocol | None, allow_unverified_dev: bool) -> None
Initialize the JWT token manager.
Section titled “Initialize the JWT token manager.” - @property
- def keys(self) -> dict[str, Any]
Live view of the key material managed by the key store.
Section titled “Live view of the key material managed by the key store.”
- @property
- def _key_meta(self) -> dict[str, Any]
Live view of the key metadata managed by the key store.
Section titled “Live view of the key metadata managed by the key store.”
- @property
- def current_key_id(self) -> str
The key ID currently used for signing new tokens.
Section titled “The key ID currently used for signing new tokens.” - @setter
- def current_key_id(self, value: str) -> None
Allow external callers to update the active key ID on the store.
Section titled “Allow external callers to update the active key ID on the store.” - def repr(self) -> str
Return developer-friendly string representation.
Section titled “Return developer-friendly string representation.” - def set_hook_registry(self, hooks: HookRegistryProtocol | None) -> None
Attach an optional hook registry after provider boot wiring.
Section titled “Attach an optional hook registry after provider boot wiring.” - async def rotate_key(self, new_key_id: str, new_secret: str | dict) -> None
Rotate to a new signing key, delegating lifecycle to the key store.
Section titled “Rotate to a new signing key, delegating lifecycle to the key store.”
- async def _cleanup_old_keys(self) -> None
Delegate old-key cleanup to the key store.
Section titled “Delegate old-key cleanup to the key store.”
- def list_keys(self) -> dict[str, dict[str, Any]]
Return current key metadata (for inspection/operations).
Section titled “Return current key metadata (for inspection/operations).”
- def _get_signing_key(self) -> str
Return the raw signing key string for the current key ID.
Section titled “Return the raw signing key string for the current key ID.” - def _get_verification_key(self, kid: str) -> str
Return the raw verification key string for kid.
Section titled “Return the raw verification key string for kid.”
- async def get_user_from_token(self, token: str) -> Result[VerifiedToken, ContractsTokenError]
Extract user information from access token.
Section titled “Extract user information from access token.”
- def init(self, current_key_id: str, keys: dict[str, str | SecretStr | dict[str, str | SecretStr]] | None = None, algorithm: str = const.DEFAULT_TOKEN_ALGORITHM, access_expiration_hours: int = 24, refresh_expiration_days: int = 30, cache_service: CacheBackendProtocol | None = None, rotation_interval_days: int = 90, grace_period_seconds: int = const.DEFAULT_JWT_KEY_ROTATION_GRACE_PERIOD_SECONDS, logger: Logger | None = None, audit_logger: AuditLoggerProtocol | None, binding_config: TokenBindingConfig | None, required_audience: str | None, ids: IdGeneratorProtocol | None, allow_unverified_dev: bool) -> None
authn/key_rotation.py
Section titled “authn/key_rotation.py”JWT key rotation helpers.
Section titled “JWT key rotation helpers.”- class JWTKeyStore:
init(self, current_key_id: str, keys: dict[str, Any] | None = None, grace_period_seconds: float = 3600.0) -> None
Manages JWT signing keys with support for seamless rotation.
Section titled “Manages JWT signing keys with support for seamless rotation.”- def init(self, current_key_id: str, keys: dict[str, Any] | None = None, grace_period_seconds: float = 3600.0) -> None
- async def rotate(self, new_key_id: str, new_secret: str | dict) -> None
Switch to a new signing key, retaining the old one for the grace period.
Section titled “Switch to a new signing key, retaining the old one for the grace period.”
- async def _cleanup_old_keys(self) -> None
Remove keys that have been retired beyond the grace period.
Section titled “Remove keys that have been retired beyond the grace period.”
- def get_signing_key(self) -> str
Return the raw signing key string for the current key ID.
Section titled “Return the raw signing key string for the current key ID.” - def get_verification_key(self, kid: str) -> str | None
Return the raw verification key string for kid, or
Section titled “Return the raw verification key string for kid, or None.”None. - def list_keys(self) -> dict[str, dict[str, Any]]
Return a copy of the key metadata dict (for inspection/auditing).
Section titled “Return a copy of the key metadata dict (for inspection/auditing).”
authn/ldap.py
Section titled “authn/ldap.py”LDAP authentication manager implementation.
Section titled “LDAP authentication manager implementation.”-
@dataclass
-
class LDAPProvider:
LDAP provider configuration.
Section titled “LDAP provider configuration.”name: str server_url: str bind_dn: str | None = None bind_password: SecretStr | None = None user_search_base: str = ” user_search_filter: str = ‘(sAMAccountName={username})’ user_dn_attribute: str = ‘distinguishedName’ group_search_base: str | None = None group_search_filter: str | None = None require_group_membership: str | None = None tls_ca_cert_file: str | None = None tls_cert_file: str | None = None tls_key_file: str | None = None timeout: int = 30 max_connections: int = 10
-
class LDAPManager:
Manager for LDAP/Active Directory authentication operations.
Section titled “Manager for LDAP/Active Directory authentication operations.”init(self, providers: dict[str, Any], http_client: HTTPClientProtocol | None = None) -> None
- def init(self, providers: dict[str, Any], http_client: HTTPClientProtocol | None = None) ->
Initialize LDAP manager.
Section titled “Initialize LDAP manager.” - def repr(self) -> str
Return developer-friendly string representation.
Section titled “Return developer-friendly string representation.” - async def authenticate_user(self, provider_name: str, username: str, password: str) -> dict[str, Any] | None
Authenticate a user against LDAP.
Section titled “Authenticate a user against LDAP.” - async def get_user_info(self, provider_name: str, username: str) -> dict[str, Any] | None
Get user information from LDAP without authentication.
Section titled “Get user information from LDAP without authentication.” - async def check_group_membership(self, provider_name: str, username: str, group_name: str) -> bool
Check if user is a member of the specified group.
Section titled “Check if user is a member of the specified group.”
- async def _get_user_dn(self, provider: LDAPProvider, username: str) -> str | None
Get the DN for a username by searching LDAP.
Section titled “Get the DN for a username by searching LDAP.” - async def _get_group_dn(self, provider: LDAPProvider, group_name: str) -> str | None
Get the DN for a group by searching LDAP.
Section titled “Get the DN for a group by searching LDAP.” - async def _bind_with_credentials(self, provider: LDAPProvider, user_dn: str, password: str) -> bool
Attempt to bind to LDAP with user credentials.
Section titled “Attempt to bind to LDAP with user credentials.” - async def _get_user_attributes(self, provider: LDAPProvider, user_dn: str) -> dict[str, Any]
Get user attributes from LDAP.
Section titled “Get user attributes from LDAP.” - async def _get_connection(self, provider: LDAPProvider) -> Connection
Get a connection from the pool or create a new one.
Section titled “Get a connection from the pool or create a new one.” - def _return_connection(self, provider: LDAPProvider, conn: Connection) -> None
Return a connection to the pool.
Section titled “Return a connection to the pool.”
- def init(self, providers: dict[str, Any], http_client: HTTPClientProtocol | None = None) ->
authn/mfa.py
Section titled “authn/mfa.py”MFA utilities - TOTP (RFC 6238) and backup codes
Section titled “MFA utilities - TOTP (RFC 6238) and backup codes”- def _int_to_bytes(i: int) -> bytes
- def generate_totp_secret(length: int = 20) -> str
Generate a base32-encoded secret for TOTP.
Section titled “Generate a base32-encoded secret for TOTP.”
-
def _hotp(secret: str, counter: int, digits: int = DEFAULT_TOTP_DIGITS) -> str
-
def _normalize_base32(secret: str) -> str
Normalize base32 secret by adding padding if necessary
Section titled “Normalize base32 secret by adding padding if necessary”
-
def generate_totp_code(secret: str, for_time: int | None = None, period: int = DEFAULT_TOTP_PERIOD, digits: int = DEFAULT_TOTP_DIGITS) -> str
Generate TOTP code for given secret and time (unix seconds).
Section titled “Generate TOTP code for given secret and time (unix seconds).” -
def verify_totp(secret: str, code: str, window: int = 1, period: int = DEFAULT_TOTP_PERIOD) -> bool
Verify a TOTP code allowing a +/- window of time steps.
Section titled “Verify a TOTP code allowing a +/- window of time steps.” -
def get_provisioning_uri(secret: str, username: str, issuer: str) -> str
Return an
Section titled “Return an otpauth:// provisioning URI suitable for authenticator apps.”otpauth://provisioning URI suitable for authenticator apps. -
def generate_backup_codes(count: int = 10, length: int = 8) -> list[str]
Generate a list of human-friendly backup codes (plain strings).
Section titled “Generate a list of human-friendly backup codes (plain strings).” -
def hash_backup_codes(codes: Iterable[str]) -> list[str]
Return digest hashes for backup codes (store these, compare with digest).
Section titled “Return digest hashes for backup codes (store these, compare with digest).” -
global DEFAULT_TOTP_DIGITS
-
global DEFAULT_TOTP_ALGORITHM
-
global DEFAULT_TOTP_PERIOD
authn/oauth2.py
Section titled “authn/oauth2.py”OAuth2 authentication flows.
Section titled “OAuth2 authentication flows.”-
class LexigramConnectSession:
Session adapter for authlib that delegates to
Section titled “Session adapter for authlib that delegates to HTTPClientProtocol.”HTTPClientProtocol.init(self, http_client: HTTPClientProtocol | None) -> None
- def init(self, http_client: HTTPClientProtocol | None) ->
- async def request(self, method: str, url: str, **kwargs: Any) -> LexigramConnectResponse
Make a request using the injected
Section titled “Make a request using the injected HTTPClientProtocol.”HTTPClientProtocol. - async def get(self, url: str, **kwargs: Any) -> LexigramConnectResponse
- async def post(self, url: str, **kwargs: Any) -> LexigramConnectResponse
- async def put(self, url: str, **kwargs: Any) -> LexigramConnectResponse
- async def delete(self, url: str, **kwargs: Any) -> LexigramConnectResponse
- async def patch(self, url: str, **kwargs: Any) -> LexigramConnectResponse
- async def head(self, url: str, **kwargs: Any) -> LexigramConnectResponse
-
class LexigramConnectResponse:
Response adapter that makes any HTTP response compatible with authlib.
Section titled “Response adapter that makes any HTTP response compatible with authlib.”init(self, response: Any) -> None
- def init(self, response: Any) ->
- @property
- def status_code(self) -> int
- @property
- def headers(self) -> dict[str, str]
- async def json(self) -> dict[str, Any]
- async def text(self) -> str
- def raise_for_status(self) -> None
- async def close(self) -> None
Close the underlying response if the client supports it.
Section titled “Close the underlying response if the client supports it.” - async def aenter(self) -> Self
- async def aexit(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None
-
class OAuth2IdentityProvider:
OAuth2 identity provider configuration.
Section titled “OAuth2 identity provider configuration.”init(self, name: str, client_id: str, client_secret: str, authorize_url: str, access_token_url: str, userinfo_url: str, scope: str = ‘openid email profile’, redirect_uri: str | None = None, require_pkce: bool = True) -> None
- def init(self, name: str, client_id: str, client_secret: str, authorize_url: str, access_token_url: str, userinfo_url: str, scope: str = ‘openid email profile’, redirect_uri: str | None = None, require_pkce: bool = True) ->
-
class OAuth2Manager:
OAuth2 authentication manager
Section titled “OAuth2 authentication manager”init(self, providers: dict[str, OAuth2IdentityProvider], http_client: HTTPClientProtocol | None = None) -> None
- def init(self, providers: dict[str, OAuth2IdentityProvider], http_client: HTTPClientProtocol | None = None) -> None
- def repr(self) -> str
Return developer-friendly string representation.
Section titled “Return developer-friendly string representation.” - async def get_authorization_url(self, provider_name: str, state: str | None = None) -> tuple[str, str, str]
Get OAuth2 authorization URL, PKCE code_verifier (S256), and state.
Section titled “Get OAuth2 authorization URL, PKCE code_verifier (S256), and state.” - async def exchange_code_for_token(self, provider_name: str, code: str, code_verifier: str | None = None) -> dict[str, Any]
Exchange authorization code for access token, optionally with PKCE code_verifier
Section titled “Exchange authorization code for access token, optionally with PKCE code_verifier” - async def get_user_info(self, provider_name: str, token: dict[str, Any]) -> dict[str, Any]
Get user information from OAuth2 provider
Section titled “Get user information from OAuth2 provider”
-
class OAuth2AuthProvider:
Complete OAuth2 provider with user provisioning.
Section titled “Complete OAuth2 provider with user provisioning.”init(self, oauth2_manager: OAuth2Manager, user_store: Any, oauth_identity_store: Any | None = None) -> None
- def init(self, oauth2_manager: OAuth2Manager, user_store: Any, oauth_identity_store: Any | None = None) -> None
- async def authenticate_with_oauth2(self, provider_name: str, code: str, code_verifier: str | None = None) -> User
Authenticate user via OAuth2 and provision if needed.
Section titled “Authenticate user via OAuth2 and provision if needed.”
- async def _find_or_create_oauth_user(self, oauth_user: OAuth2UserInfo) -> User
Find existing user or provision new one.
Section titled “Find existing user or provision new one.” - async def _find_by_oauth_identity(self, provider: str, provider_user_id: str) -> User | None
Find user by OAuth identity.
Section titled “Find user by OAuth identity.” - async def _link_oauth_identity(self, user_id: str, provider: str, provider_user_id: str) -> None
Link OAuth identity to user.
Section titled “Link OAuth identity to user.”
authn/passkeys.py
Section titled “authn/passkeys.py”Minimal WebAuthn / Passkeys helper (server-side operations)
Section titled “Minimal WebAuthn / Passkeys helper (server-side operations)”- class _PendingStore:
init(self, cache: CacheBackendProtocol | None = None) -> None
Abstract pending store with optional async cache backend.
Section titled “Abstract pending store with optional async cache backend.”- def init(self, cache: CacheBackendProtocol | None = None) -> None
- async def set(self, key: str, value: dict, ttl: int = 300) -> None
- async def get(self, key: str) -> dict | None
- async def delete(self, key: str) -> None
- @inject
- class PasskeyService:
init(self, user_store: UserStoreProtocol, cache_service: CacheBackendProtocol | None = None, rp_id: str | None, allowed_origins: set[str] | None) -> None
Manage passkey registration and authentication flows.
Section titled “Manage passkey registration and authentication flows.”- def init(self, user_store: UserStoreProtocol, cache_service: CacheBackendProtocol | None = None, rp_id: str | None, allowed_origins: set[str] | None) -> None
- def _gen_challenge(self) -> str
- def _validate_origin(self, origin: str | None) -> bool
Return False if origin is required but invalid or missing.
Section titled “Return False if origin is required but invalid or missing.”
- async def start_registration(self, user_id: str, name: str | None = None) -> tuple[str, str]
Start registration: return (registration_id, challenge).
Section titled “Start registration: return (registration_id, challenge).” - async def finish_registration(self, registration_id: str, credential_id: str, public_key_pem: str, actor_user_id: str | None = None, origin: str | None) -> bool
Finish registration by storing the passkey on the user’s profile.
Section titled “Finish registration by storing the passkey on the user’s profile.” - async def start_authentication(self, user_id: str) -> tuple[str, str, list[str]]
Start authn: return (auth_id, challenge, allowed_credential_ids).
Section titled “Start authn: return (auth_id, challenge, allowed_credential_ids).” - async def finish_authentication(self, auth_id: str, credential_id: str, signature: bytes, origin: str | None, new_sign_count: int | None) -> bool
Finish authentication by verifying the signature over the challenge.
Section titled “Finish authentication by verifying the signature over the challenge.”
authn/password_hasher.py
Section titled “authn/password_hasher.py”Argon2id-based password hashing implementations.
Section titled “Argon2id-based password hashing implementations.”-
class Argon2idKeyDerivation:
Argon2id key derivation implementation.
Section titled “Argon2id key derivation implementation.”init(self, config: PasswordConfig | None = None) -> None
- def init(self, config: PasswordConfig | None = None) -> None
- async def derive(self, secret: str, salt: bytes | None) -> str
Derive a key from a secret using Argon2id.
Section titled “Derive a key from a secret using Argon2id.” - async def verify(self, secret: str, encoded: str) -> bool
Verify a secret against an Argon2id hash.
Section titled “Verify a secret against an Argon2id hash.” - async def hash(self, secret: str, salt: bytes | None) -> str
Backward-compatible alias for derive.
Section titled “Backward-compatible alias for derive.”
-
class Argon2idPasswordHasher:
Auth-domain password hasher using Argon2id.
Section titled “Auth-domain password hasher using Argon2id.”init(self, kdf: KeyDerivationProtocol) -> None
- def init(self, kdf: KeyDerivationProtocol) -> None
- async def hash(self, password: str) -> str
Hash a password using Argon2id.
Section titled “Hash a password using Argon2id.” - async def verify(self, password: str, hashed_password: str) -> bool
Verify a password against its hash.
Section titled “Verify a password against its hash.”
authn/password_reset.py
Section titled “authn/password_reset.py”Password reset service for Lexigram Auth.
Section titled “Password reset service for Lexigram Auth.”-
class PasswordResetTokenError:
Error during password reset token operations.
Section titled “Error during password reset token operations.” -
@inject
-
class PasswordResetService:
Service for handling password reset flows.
Section titled “Service for handling password reset flows.”init(self, user_store: UserStoreProtocol, token_ttl_hours: int = TOKEN_EXPIRY_HOURS) -> None
- def init(self, user_store: UserStoreProtocol, token_ttl_hours: int = TOKEN_EXPIRY_HOURS) -> None
- def generate_reset_token(self) -> tuple[str, datetime]
Generate a secure random reset token.
Section titled “Generate a secure random reset token.” - async def request_reset(self, email: str) -> Result[tuple[str, datetime] | None, PasswordResetTokenError]
Request a password reset for a user email.
Section titled “Request a password reset for a user email.” - async def confirm_reset(self, token: str, new_password: str) -> Result[None, PasswordResetTokenError]
Confirm password reset with new password.
Section titled “Confirm password reset with new password.”
- async def _find_user_by_token(self, token: str) -> Any | None
Find user by reset token.
Section titled “Find user by reset token.”
- async def invalidate_token(self, user_id: str) -> None
Invalidate any pending reset token for a user.
Section titled “Invalidate any pending reset token for a user.” - def repr(self) -> str
authn/revocation.py
Section titled “authn/revocation.py”Persistent token revocation store backed by a CacheBackendProtocol.
Section titled “Persistent token revocation store backed by a CacheBackendProtocol.”- class PersistentTokenRevocationStore:
init(self, cache: CacheBackendProtocol, ttl: int = _DEFAULT_TTL) -> None
Persistent token revocation list backed by a :class:
Section titled “Persistent token revocation list backed by a :class:CacheBackendProtocol.”CacheBackendProtocol.- def init(self, cache: CacheBackendProtocol, ttl: int = _DEFAULT_TTL) -> None
- def _make_key(self, token_id: str) -> str
Return the cache key for a token ID.
Section titled “Return the cache key for a token ID.”
- async def revoke(self, token_id: str, expires_at: datetime | None = None) -> None
Mark token_id as revoked.
Section titled “Mark token_id as revoked.” - async def is_revoked(self, token_id: str) -> bool
Return
Section titled “Return True if token_id is present in the revocation list.”Trueif token_id is present in the revocation list.
authn/saml.py
Section titled “authn/saml.py”SAML 2.0 authentication flows
Section titled “SAML 2.0 authentication flows”-
class SAMLAttributeMapper:
Protocol for SAML attribute mappers.
Section titled “Protocol for SAML attribute mappers.”- def map_attribute(self, ava: dict[str, list[str]], user_info: dict[str, Any]) -> None …
-
class EmailAttributeMapper:
Maps SAML email attribute to user info.
Section titled “Maps SAML email attribute to user info.”- def map_attribute(self, ava: dict[str, list[str]], user_info: dict[str, Any]) -> None
-
class NameAttributeMapper:
Maps SAML name attributes to user info.
Section titled “Maps SAML name attributes to user info.”- def map_attribute(self, ava: dict[str, list[str]], user_info: dict[str, Any]) -> None
-
class FirstNameAttributeMapper:
Maps SAML firstName/givenName attribute to user info.
Section titled “Maps SAML firstName/givenName attribute to user info.”- def map_attribute(self, ava: dict[str, list[str]], user_info: dict[str, Any]) -> None
-
class LastNameAttributeMapper:
Maps SAML lastName/surname attribute to user info.
Section titled “Maps SAML lastName/surname attribute to user info.”- def map_attribute(self, ava: dict[str, list[str]], user_info: dict[str, Any]) -> None
-
class GroupsAttributeMapper:
Maps SAML groups attribute to user info.
Section titled “Maps SAML groups attribute to user info.”- def map_attribute(self, ava: dict[str, list[str]], user_info: dict[str, Any]) -> None
-
class SAMLAttributeMapperRegistry:
Registry for SAML attribute mappers.
Section titled “Registry for SAML attribute mappers.”init(self) -> None
- def init(self) -> None
- @classmethod
- def with_defaults(cls) -> SAMLAttributeMapperRegistry
Create a registry pre-loaded with the standard attribute mappers.
Section titled “Create a registry pre-loaded with the standard attribute mappers.”
- def _register_default_mappers(self) -> None
Register the default attribute mappers.
Section titled “Register the default attribute mappers.”
- def register_mapper(self, mapper: SAMLAttributeMapper) -> None
Register a custom attribute mapper.
Section titled “Register a custom attribute mapper.” - def clear_mappers(self) -> None
Clear all registered mappers.
Section titled “Clear all registered mappers.” - def get_mappers(self) -> list[SAMLAttributeMapper]
Get all registered mappers.
Section titled “Get all registered mappers.” - def map_attributes(self, ava: dict[str, list[str]], user_info: dict[str, Any]) -> None
Map all SAML attributes using registered mappers.
Section titled “Map all SAML attributes using registered mappers.”
-
class SAMLProvider:
SAML identity provider configuration
Section titled “SAML identity provider configuration”init(self, name: str, entity_id: str, sso_url: str, slo_url: str | None = None, x509_cert: str | None = None, name_id_format: str = NAMEID_FORMAT_EMAILADDRESS, want_assertions_signed: bool = True, want_response_signed: bool = True, want_logout_response_signed: bool = False, want_logout_request_signed: bool = False) -> None
- def init(self, name: str, entity_id: str, sso_url: str, slo_url: str | None = None, x509_cert: str | None = None, name_id_format: str = NAMEID_FORMAT_EMAILADDRESS, want_assertions_signed: bool = True, want_response_signed: bool = True, want_logout_response_signed: bool = False, want_logout_request_signed: bool = False) ->
-
class SAMLManager:
SAML 2.0 authentication manager
Section titled “SAML 2.0 authentication manager”init(self, providers: dict[str, SAMLProvider], http_client: Any | None = None) -> None
- def init(self, providers: dict[str, SAMLProvider], http_client: Any | None = None) -> None
- def repr(self) -> str
Return developer-friendly string representation.
Section titled “Return developer-friendly string representation.”
- def _create_saml_client(self, provider: SAMLProvider) -> Saml2Client
Create SAML client for a provider
Section titled “Create SAML client for a provider”
- async def get_login_url(self, provider_name: str, relay_state: str | None = None) -> str
Get SAML login URL for the given provider
Section titled “Get SAML login URL for the given provider” - async def process_assertion(self, provider_name: str, saml_response: str, relay_state: str | None = None) -> dict[str, Any]
Process SAML assertion response
Section titled “Process SAML assertion response” - async def get_logout_url(self, provider_name: str, name_id: str, session_index: str | None = None) -> str | None
Get SAML logout URL for the given provider
Section titled “Get SAML logout URL for the given provider” - async def process_logout_response(self, provider_name: str, saml_response: str) -> bool
Process SAML logout response
Section titled “Process SAML logout response”
authn/schemas/requests.py
Section titled “authn/schemas/requests.py”-
@dataclass(init=False)
-
class LoginRequest: email: str = Field(…) password: str = Field(…) remember_me: bool = Field(…)
-
@dataclass(init=False)
-
class RegisterRequest: name: str = Field(…) email: str = Field(…) password: str = Field(…) confirm_password: str = Field(…) profile: dict = Field(default_factory=…)
-
@dataclass(init=False)
-
class RefreshTokenRequest: refresh_token: str = Field(…)
-
@dataclass(init=False)
-
class PasswordResetRequest: email: str = Field(…)
-
@dataclass(init=False)
-
class PasswordResetConfirm: token: str = Field(…) new_password: str = Field(…) confirm_password: str = Field(…)
-
@dataclass(init=False)
-
class OAuth2AuthorizeRequest: response_type: str = Field(…) client_id: str = Field(…) redirect_uri: str | None = Field(…) scope: str = Field(…) state: str | None = Field(…)
-
@dataclass(init=False)
-
class OAuth2TokenRequest: grant_type: str = Field(…) code: str | None = Field(…) redirect_uri: str | None = Field(…) client_id: str | None = Field(…) client_secret: str | None = Field(…) refresh_token: str | None = Field(…)
authn/schemas/responses.py
Section titled “authn/schemas/responses.py”- @dataclass(init=False)
- class TokenResponse: access_token: str = Field(…) token_type: str = Field(…) expires_in: int = Field(…) refresh_token: str | None = Field(…) user: dict = Field(…)
authn/schemas/user.py
Section titled “authn/schemas/user.py”- @dataclass(init=False)
- class UserProfile: user_id: str name: str email: str created_at: datetime is_active: bool = True is_verified: bool = False updated_at: datetime | None = None last_login_at: datetime | None = None login_count: int = 0 roles: list[str] = Field(default_factory=…) permissions: list[str] = Field(default_factory=…) profile: dict = Field(default_factory=…)
authn/security.py
Section titled “authn/security.py”Password security utilities using Passlib with a lightweight fallback.
Section titled “Password security utilities using Passlib with a lightweight fallback.”-
class PasswordHasher:
Bcrypt password hasher implementing the PasswordHasherProtocol.
Section titled “Bcrypt password hasher implementing the PasswordHasherProtocol.”init(self, rounds: int = _DEFAULT_BCRYPT_ROUNDS) -> None
- def init(self, rounds: int = _DEFAULT_BCRYPT_ROUNDS) -> None
- @staticmethod
- async def hash(password: str) -> str
Hash a password using bcrypt with UTF-8 aware truncation.
Section titled “Hash a password using bcrypt with UTF-8 aware truncation.” - @staticmethod
- async def verify(password: str, hashed_password: str) -> bool
Verify a password against its hash asynchronously.
Section titled “Verify a password against its hash asynchronously.” - def needs_rehash(self, hashed_password: str) -> bool
Check if the hash needs to be rehashed.
Section titled “Check if the hash needs to be rehashed.” - async def rehash_if_needed(self, password: str, hashed_password: str | None) -> str | None
Rehash the password if needed.
Section titled “Rehash the password if needed.”
-
class PasswordPolicy:
Password policy configuration.
Section titled “Password policy configuration.”init(self, min_length: int = 8, max_length: int = 128, require_uppercase: bool = True, require_lowercase: bool = True, require_digits: bool = False, require_special: bool = False, prevent_common: bool = True, prevent_reuse: bool = False, history_size: int = 5, common_passwords_file: str | None = None, banned_patterns: list[str] | None = None) -> None
- def init(self, min_length: int = 8, max_length: int = 128, require_uppercase: bool = True, require_lowercase: bool = True, require_digits: bool = False, require_special: bool = False, prevent_common: bool = True, prevent_reuse: bool = False, history_size: int = 5, common_passwords_file: str | None = None, banned_patterns: list[str] | None = None) ->
- @classmethod
- def from_config(cls, config: PasswordConfig) -> PasswordPolicy
Build a
Section titled “Build a PasswordPolicy from a PasswordConfig dataclass.”PasswordPolicyfrom aPasswordConfigdataclass.
- def _load_common_passwords(self, file_path: str | None) -> set[str]
Load common passwords from file (lazy-loaded).
Section titled “Load common passwords from file (lazy-loaded).” - def _get_common_passwords(self) -> set[str]
Return common passwords set, loading lazily on first access.
Section titled “Return common passwords set, loading lazily on first access.”
- def validate(self, password: str) -> None
Validate password against policy.
Section titled “Validate password against policy.” - def is_valid(self, password: str) -> bool
Return True if the password satisfies the policy without raising.
Section titled “Return True if the password satisfies the policy without raising.”
-
def _prehash(password: str) -> str
Pre-hash password with SHA-256 to avoid bcrypt’s 72-byte truncation.
Section titled “Pre-hash password with SHA-256 to avoid bcrypt’s 72-byte truncation.” -
def _prepare_password_bytes(password: str) -> bytes
Return the bytes that bcrypt should hash for password.
Section titled “Return the bytes that bcrypt should hash for password.”
- global DUMMY_PASSWORD_HASH
authn/services.py
Section titled “authn/services.py”Authentication services for user login, registration, and token management.
Section titled “Authentication services for user login, registration, and token management.”-
@dataclass
-
class LockoutConfig:
Configuration for account lockout on repeated failed login attempts.
Section titled “Configuration for account lockout on repeated failed login attempts.”max_failed_attempts: int = 5 lockout_duration_seconds: int = 300 max_attempts: int
-
class LoginAttemptTracker:
Tracks failed login attempts and enforces account lockout.
Section titled “Tracks failed login attempts and enforces account lockout.”init(self, max_attempts: int = 5, lockout_duration_seconds: int = 900, cache: CacheBackendProtocol | None = None) -> None
- def init(self, max_attempts: int = 5, lockout_duration_seconds: int = 900, cache: CacheBackendProtocol | None = None) -> None
- async def is_locked(self, identifier: str) -> bool
Return
Section titled “Return True if identifier has exceeded the failure threshold.”Trueif identifier has exceeded the failure threshold. - async def record_failure(self, identifier: str) -> None
Record a failed authentication attempt for identifier.
Section titled “Record a failed authentication attempt for identifier.” - async def clear(self, identifier: str) -> None
Remove all recorded failures for identifier (call on success).
Section titled “Remove all recorded failures for identifier (call on success).”
- async def _is_locked_cache(self, identifier: str) -> bool
- async def _record_failure_cache(self, identifier: str) -> None
- def _is_locked_local(self, identifier: str) -> bool
- def _record_failure_local(self, identifier: str) -> None
-
@inject
-
class AuthenticationService:
Service for core authentication operations.
Section titled “Service for core authentication operations.”init(self, password_policy: PasswordPolicy, user_store: UserStoreProtocol, token_manager: JWTTokenManager, lockout_config: LockoutConfig | None = None, event_bus: EventBusProtocol | None = None, tracker: LoginAttemptTracker | None = None, hooks: HookRegistryProtocol | None = None) -> None
- def init(self, password_policy: PasswordPolicy, user_store: UserStoreProtocol, token_manager: JWTTokenManager, lockout_config: LockoutConfig | None = None, event_bus: EventBusProtocol | None = None, tracker: LoginAttemptTracker | None = None, hooks: HookRegistryProtocol | None = None) -> None
- def repr(self) -> str
Return a string representation of this service.
Section titled “Return a string representation of this service.”
- def _emit(self, event: object) -> None
Fire-and-forget event publication.
Section titled “Fire-and-forget event publication.”
- def set_hook_registry(self, hooks: HookRegistryProtocol | None) -> None
Attach an optional hook registry after provider boot wiring.
Section titled “Attach an optional hook registry after provider boot wiring.”
- async def _emit_action(self, hook_name: str, payload: object) -> None
Emit an auth action hook when a registry is available.
Section titled “Emit an auth action hook when a registry is available.”
- async def authenticate_user(self, email: str, password: str) -> Result[User, InvalidCredentialsError | AccountLockedError]
Authenticate a user with email and password.
Section titled “Authenticate a user with email and password.”
- async def _rehash_password_if_needed(self, user: User, password: str, creds: UserCredentials | None) -> None
Rehash password in background if needed.
Section titled “Rehash password in background if needed.”
- async def register_user(self, request: RegisterRequest) -> Result[User, EmailExistsError | PasswordPolicyError]
Register a new user.
Section titled “Register a new user.” - def create_token(self, user: User) -> AuthToken
Create an authentication token for a user.
Section titled “Create an authentication token for a user.” - async def verify_token(self, token: str) -> Result[VerifiedToken, TokenError]
Verify and decode an authentication token.
Section titled “Verify and decode an authentication token.” - async def refresh_token(self, refresh_token: str) -> Result[AuthToken, TokenError]
Refresh an access token using a refresh token.
Section titled “Refresh an access token using a refresh token.” - async def get_user_from_token(self, token: str) -> Result[VerifiedToken, TokenError]
Get user information from token.
Section titled “Get user information from token.” - async def shutdown(self) -> None
Cancel and await all pending background event tasks.
Section titled “Cancel and await all pending background event tasks.”
authn/user_service.py
Section titled “authn/user_service.py”User management services for CRUD operations.
Section titled “User management services for CRUD operations.”- @inject
- class UserService:
init(self, password_policy: PasswordPolicy, user_store: Any, event_bus: EventBusProtocol | None = None) -> None
Service for user management operations.
Section titled “Service for user management operations.”- def init(self, password_policy: PasswordPolicy, user_store: Any, event_bus: EventBusProtocol | None = None) -> None
- def _emit(self, event: object) -> None
Fire-and-forget event publication.
Section titled “Fire-and-forget event publication.”
- async def create_user(self, name: str, email: str, password: str, roles: list[str] | None = None) -> Result[User, EmailExistsError | PasswordPolicyError]
Create a new user.
Section titled “Create a new user.” - async def get_user(self, user_id: str) -> User | None
Get user by ID.
Section titled “Get user by ID.” - async def update_user(self, user: User) -> Result[User, UserNotFoundError | ValidationError]
Update user information.
Section titled “Update user information.” - async def delete_user(self, user_id: str) -> Result[None, UserNotFoundError]
Delete a user.
Section titled “Delete a user.” - async def lock_user(self, user_id: str) -> Result[None, UserNotFoundError]
Deactivate (lock) a user account.
Section titled “Deactivate (lock) a user account.” - async def unlock_user(self, user_id: str) -> Result[None, UserNotFoundError]
Reactivate (unlock) a previously locked user account.
Section titled “Reactivate (unlock) a previously locked user account.” - async def change_user_password(self, user_id: str, current_password: str, new_password: str) -> Result[None, InvalidCredentialsError | PasswordPolicyError]
Change a user’s password (requires current password).
Section titled “Change a user’s password (requires current password).” - async def set_user_password(self, user_id: str, new_password: str, force: bool = False) -> None
Set a user’s password (admin operation).
Section titled “Set a user’s password (admin operation).” - async def list_users(self, skip: int = 0, limit: int = 100) -> list[User]
List users with pagination.
Section titled “List users with pagination.” - async def count_users(self) -> int
Count total users.
Section titled “Count total users.” - def repr(self) -> str
Return a string representation of this service.
Section titled “Return a string representation of this service.” - async def shutdown(self) -> None
Cancel and await all pending background event tasks.
Section titled “Cancel and await all pending background event tasks.”
authz/init.py
Section titled “authz/init.py”Authorization (AuthZ) - Permissions and access control
Section titled “Authorization (AuthZ) - Permissions and access control”-
def getattr(name: str) -> Any
-
def dir() -> list[str]
authz/_check_mixin.py
Section titled “authz/_check_mixin.py”Authorization check mixin for AuthorizationService.
Section titled “Authorization check mixin for AuthorizationService.”- class _AuthCheckMixin:
Mixin providing authorization check methods for AuthorizationService.
Section titled “Mixin providing authorization check methods for AuthorizationService.”- def _parse_list(self, val: Any) -> list[str]
Parse a value into a list of strings using the registry.
Section titled “Parse a value into a list of strings using the registry.”
- async def check_access(self, user: Any, allowed_roles: set[str], resource: str | None = None, action: str | None = None) -> bool
Core access check combining roles, inheritance, and permission patterns.
Section titled “Core access check combining roles, inheritance, and permission patterns.” - def has_any_role(self, user: Any, roles: list[str]) -> bool
Check if user has any of the given roles.
Section titled “Check if user has any of the given roles.” - def has_any_permission(self, user: Any, permissions: list[str]) -> bool
Return True if user has at least one of the given permissions.
Section titled “Return True if user has at least one of the given permissions.” - async def can(self, user: Any, action: str, resource: str) -> bool
Convenience alias: return True if user can perform action on resource.
Section titled “Convenience alias: return True if user can perform action on resource.” - async def authorize(self, user: Any, action: str, resource: Any) -> Result[bool, AuthorizationError]
Check whether user is allowed to perform action on resource.
Section titled “Check whether user is allowed to perform action on resource.”
- def _get_effective_roles(self, user_roles: set[str]) -> set[str]
Resolve all roles including inherited ones.
Section titled “Resolve all roles including inherited ones.” - def _get_user_permissions(self, effective_roles: set[str]) -> set[str]
Flatten all permissions from effective roles.
Section titled “Flatten all permissions from effective roles.” - def _has_permission(self, user_permissions: set[str], required: str) -> bool
Check if required permission matches any user permission patterns.
Section titled “Check if required permission matches any user permission patterns.” - def _flatten_roles(self, role_names: set[str]) -> set[str]
Recursively collect all parent roles for the given set of role names.
Section titled “Recursively collect all parent roles for the given set of role names.”
- def _parse_list(self, val: Any) -> list[str]
authz/_parsers.py
Section titled “authz/_parsers.py”Value parser classes for authorization service.
Section titled “Value parser classes for authorization service.”-
class ValueParser:
Protocol for value parsers.
Section titled “Protocol for value parsers.”- def can_parse(self, val: Any) -> bool …
- def parse(self, val: Any) -> list[str] …
-
class StringValueParser:
Parser for string values, including JSON-encoded lists.
Section titled “Parser for string values, including JSON-encoded lists.”- def can_parse(self, val: Any) -> bool
- def parse(self, val: Any) -> list[str]
-
class ListValueParser:
Parser for list values.
Section titled “Parser for list values.”- def can_parse(self, val: Any) -> bool
- def parse(self, val: Any) -> list[str]
-
class NoneValueParser:
Parser for None values.
Section titled “Parser for None values.”- def can_parse(self, val: Any) -> bool
- def parse(self, val: Any) -> list[str]
-
class ValueParserRegistry:
Registry for value parsers.
Section titled “Registry for value parsers.”init(self) -> None
- def init(self) -> None
- def _register_default_parsers(self) -> None
Register the default value parsers.
Section titled “Register the default value parsers.”
- def register_parser(self, parser: ValueParser) -> None
Register a custom value parser.
Section titled “Register a custom value parser.” - def parse(self, val: Any) -> list[str]
Parse the value using registered parsers.
Section titled “Parse the value using registered parsers.”
authz/guards.py
Section titled “authz/guards.py”Authorization guards and route protection decorators.
Section titled “Authorization guards and route protection decorators.”-
class AuthorizationGuard:
GuardProtocol that checks user roles and/or permissions.
Section titled “GuardProtocol that checks user roles and/or permissions.”init(self, roles: list[str] | None = None, permissions: list[str] | None = None, auth_service: AuthorizationService | None = None) -> None
- def init(self, roles: list[str] | None = None, permissions: list[str] | None = None, auth_service: AuthorizationService | None = None) -> None
- async def check_authorization(self, user: User | None) -> bool
Return
Section titled “Return True if user satisfies all role and permission requirements.”Trueif user satisfies all role and permission requirements. - def get_error_message(self) -> str
Build a human-readable description of the requirements.
Section titled “Build a human-readable description of the requirements.”
-
class RouteGuard:
Wraps an :class:
Section titled “Wraps an :class:AuthorizationGuard and provides HTTP response helpers.”AuthorizationGuardand provides HTTP response helpers.init(self, guard: AuthorizationGuard) -> None
- def init(self, guard: AuthorizationGuard) -> None
- async def check_access(self, user: User | None) -> bool
Delegate to the underlying guard’s
Section titled “Delegate to the underlying guard’s check_authorization.”check_authorization. - async def get_deny_response(self) -> JSONResponse
Build a 403 JSON response describing what was required.
Section titled “Build a 403 JSON response describing what was required.”
- def _find_request(args: tuple, kwargs: dict) -> Any
Extract the first Starlette-like request from positional or keyword args.
Section titled “Extract the first Starlette-like request from positional or keyword args.”
-
def require_auth(roles: list[str] | None = None, permissions: list[str] | None = None, optional: bool = False) -> Callable[, Callable]
Decorator that protects a route handler with authentication and RBAC/ABAC checks.
Section titled “Decorator that protects a route handler with authentication and RBAC/ABAC checks.” -
def require_roles(*roles: str) -> Callable[, Callable]
Shorthand decorator requiring the user to have at least one of roles.
Section titled “Shorthand decorator requiring the user to have at least one of roles.” -
def require_permissions(*permissions: str) -> Callable[, Callable]
Shorthand decorator requiring the user to hold all of permissions.
Section titled “Shorthand decorator requiring the user to hold all of permissions.” -
def optional_auth(func: Callable) -> Callable
Decorator that attaches optional auth: user is populated if present, never blocked.
Section titled “Decorator that attaches optional auth: user is populated if present, never blocked.”
authz/scopes.py
Section titled “authz/scopes.py”OAuth2 scopes and scope management
Section titled “OAuth2 scopes and scope management”-
class OAuthScope:
Standard OAuth2 scopes
Section titled “Standard OAuth2 scopes”OPENID = ‘openid’ EMAIL = ‘email’ PROFILE = ‘profile’ ADDRESS = ‘address’ PHONE = ‘phone’ READ = ‘read’ WRITE = ‘write’ DELETE = ‘delete’ ADMIN = ‘admin’
-
class ScopeManager:
Manages OAuth2 scopes and their mappings.
Section titled “Manages OAuth2 scopes and their mappings.”init(self) -> None
- def init(self) -> None
- def get_scope_permissions(self, scope: str) -> set[str]
Get permissions associated with a scope.
Section titled “Get permissions associated with a scope.” - def get_scopes_for_permissions(self, permissions: list[str]) -> set[str]
Get minimum scopes required for permissions.
Section titled “Get minimum scopes required for permissions.” - def validate_scopes(self, requested_scopes: list[str], allowed_scopes: list[str]) -> list[str]
Validate requested scopes against allowed scopes.
Section titled “Validate requested scopes against allowed scopes.” - def expand_scope_permissions(self, scopes: list[str]) -> set[str]
Expand scopes to their associated permissions.
Section titled “Expand scopes to their associated permissions.”
authz/service.py
Section titled “authz/service.py”Unified Authorization Service for Lexigram.
Section titled “Unified Authorization Service for Lexigram.”-
@runtime_checkable
-
class UserProtocol:
-
class AuthorizationService:
Central service for all authorization checks.
Section titled “Central service for all authorization checks.”init(self, permission_cache_ttl: float = 300.0, max_cache_entries: int, audit_logger: AuditLoggerProtocol | None) -> None
- def init(self, permission_cache_ttl: float = 300.0, max_cache_entries: int, audit_logger: AuditLoggerProtocol | None) -> None
Initialize a new authorization service instance.
Section titled “Initialize a new authorization service instance.” - def set_policies(self, policies: list[Any]) -> None
Set the ABAC policies and initialize the engine.
Section titled “Set the ABAC policies and initialize the engine.” - def set_roles(self, roles: dict[str, RoleDefinition | dict[str, Any]]) -> None
Set the global role definitions (usually from config/seed).
Section titled “Set the global role definitions (usually from config/seed).” - def repr(self) -> str
Return developer-friendly string representation.
Section titled “Return developer-friendly string representation.” - def register_role(self, name: str, role: RoleDefinition | dict[str, Any]) -> None
Register a role definition.
Section titled “Register a role definition.” - def get_role(self, name: str) -> Any | None
- async def sync_from_db(self, container: Any) -> None
Load roles from database and merge with existing (YAML) roles.
Section titled “Load roles from database and merge with existing (YAML) roles.” - def create_role(self, name: str, permissions: list[str] | None = None, inherits: list[str] | None = None) -> None
Create or update a role definition.
Section titled “Create or update a role definition.” - def add_role_permission(self, role_name: str, permission: str) -> None
Add a permission to an existing role.
Section titled “Add a permission to an existing role.” - def get_role_permissions(self, role: str) -> set[str]
Get all permissions for a role, including inherited ones.
Section titled “Get all permissions for a role, including inherited ones.” - def invalidate_user(self, user_id: str) -> None
Invalidate the permission cache for a specific user.
Section titled “Invalidate the permission cache for a specific user.”
- def init(self, permission_cache_ttl: float = 300.0, max_cache_entries: int, audit_logger: AuditLoggerProtocol | None) -> None
- def getattr(name: str) -> Any
cli/checks.py
Section titled “cli/checks.py”CLI health checks for lexigram-auth.
Section titled “CLI health checks for lexigram-auth.”- async def check_auth_service(container: ContainerResolverProtocol) -> dict[str, object]
Verify auth service and JWT manager are operational.
Section titled “Verify auth service and JWT manager are operational.”
cli/commands.py
Section titled “cli/commands.py”Auth CLI command group factory.
Section titled “Auth CLI command group factory.”- def create_auth_app() -> typer.Typer
Create the
Section titled “Create the lexigram auth command group.”lexigram authcommand group.
cli/contributor.py
Section titled “cli/contributor.py”Auth CLI contributor definitions.
Section titled “Auth CLI contributor definitions.”- class AuthCliContributor:
CLI contributor for the lexigram-auth package.
Section titled “CLI contributor for the lexigram-auth package.”- @property
- def contributor_id(self) -> str
Return the contributor identifier.
Section titled “Return the contributor identifier.” - def get_generators(self) -> list[GeneratorDefinition]
Return generator definitions for auth.
Section titled “Return generator definitions for auth.” - def get_commands(self) -> list[CommandContribution]
Return the contributed
Section titled “Return the contributed auth command group.”authcommand group. - def get_health_checks(self) -> list[HealthCheckContribution]
Return auth service health check.
Section titled “Return auth service health check.” - def get_doctor_checks(self) -> list[DoctorCheckContribution]
Return auth configuration doctor checks.
Section titled “Return auth configuration doctor checks.” - def get_shell_context(self) -> list[ShellContextContribution]
Return auth shell context.
Section titled “Return auth shell context.” - def get_hooks(self) -> list[HookContribution]
Return auth lifecycle hooks.
Section titled “Return auth lifecycle hooks.”
cli/doctor.py
Section titled “cli/doctor.py”CLI doctor checks for lexigram-auth.
Section titled “CLI doctor checks for lexigram-auth.”-
def check_jwt_secret() -> dict[str, object]
Check JWT_SECRET env var or auth.jwt.secret config.
Section titled “Check JWT_SECRET env var or auth.jwt.secret config.” -
def check_auth_config() -> dict[str, object]
Validate auth section in application.yaml.
Section titled “Validate auth section in application.yaml.”
cli/generators/auth_guard.py
Section titled “cli/generators/auth_guard.py”Auth guard generator.
Section titled “Auth guard generator.”- class AuthGuardGenerator:
Generator for authentication/authorization guards.
Section titled “Generator for authentication/authorization guards.”- def generate(self, context: dict[str, object]) -> list[object]
Generate auth guard files.
Section titled “Generate auth guard files.”
- def generate(self, context: dict[str, object]) -> list[object]
cli/generators/auth_policy.py
Section titled “cli/generators/auth_policy.py”Auth policy generator.
Section titled “Auth policy generator.”- class AuthPolicyGenerator:
Generator for authorization policies.
Section titled “Generator for authorization policies.”- def generate(self, context: dict[str, object]) -> list[object]
Generate auth policy files.
Section titled “Generate auth policy files.”
- def generate(self, context: dict[str, object]) -> list[object]
cli/generators/guard.py
Section titled “cli/generators/guard.py”- class AuthGuardGenerator:
- def generate(self, name: str, **options: Any) -> GenerationResult
cli/hooks.py
Section titled “cli/hooks.py”CLI lifecycle hooks for lexigram-auth.
Section titled “CLI lifecycle hooks for lexigram-auth.”- def log_auth_command(ctx: object) -> None
Audit-log auth-sensitive CLI commands.
Section titled “Audit-log auth-sensitive CLI commands.”
cli/shell.py
Section titled “cli/shell.py”CLI shell context factories for lexigram-auth.
Section titled “CLI shell context factories for lexigram-auth.”- async def provide_auth(container: ContainerResolverProtocol) -> AuthenticationService
Provide AuthenticationService for interactive shell use.
Section titled “Provide AuthenticationService for interactive shell use.”
config.py
Section titled “config.py”Configuration models for Lexigram Auth.
Section titled “Configuration models for Lexigram Auth.”-
@dataclass(init=False)
-
class AuthUserConfig:
Single user configuration for bootstrapping.
Section titled “Single user configuration for bootstrapping.”- @model_validator(mode=‘before’)
- def _handle_username(self, values: dict[str, Any]) -> dict[str, Any]
-
@dataclass(init=False)
-
class AuthRoleConfig:
Role configuration with permissions and inheritance.
Section titled “Role configuration with permissions and inheritance.” -
@dataclass(init=False)
-
class RBACConfig:
RBAC system configuration.
Section titled “RBAC system configuration.” -
@dataclass(init=False)
-
class JWTConfig:
JWT Configuration
Section titled “JWT Configuration”- @model_validator(mode=‘after’)
- def validate_jwt_security(self) -> JWTConfig
Enforce verified-only JWT policy based on deployment environment.
Section titled “Enforce verified-only JWT policy based on deployment environment.”
-
@dataclass(init=False)
-
class PasswordConfig:
Password complexity and validation configuration.
Section titled “Password complexity and validation configuration.” -
@dataclass(init=False)
-
class AuthMiddlewareConfig:
Configuration for authentication middleware.
Section titled “Configuration for authentication middleware.” -
@dataclass(init=False)
-
class AuthConfig:
Hierarchical root configuration for Lexigram Auth.
Section titled “Hierarchical root configuration for Lexigram Auth.”- @model_validator(mode=‘after’)
- def validate_security(self) -> AuthConfig
Ensure secure settings in production.
Section titled “Ensure secure settings in production.”
constants.py
Section titled “constants.py”Constants for lexigram-auth.
Section titled “Constants for lexigram-auth.”-
global ENV_PREFIX: str = ‘LEX_AUTH__’
-
global ENV_NESTED_DELIMITER: str = ’__’
-
global DEFAULT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
-
global DEFAULT_REFRESH_TOKEN_EXPIRE_DAYS: int = 7
-
global DEFAULT_TOKEN_ALGORITHM: str = ‘HS256’
-
global DEFAULT_TOKEN_TYPE: str = ‘Bearer’
-
global DEFAULT_JWT_KEY_ROTATION_GRACE_PERIOD_SECONDS: int = 3600
-
global DEFAULT_MIN_PASSWORD_LENGTH: int = 8
-
global DEFAULT_MAX_PASSWORD_LENGTH: int = 128
-
global DEFAULT_PASSWORD_HASH_ROUNDS: int = 12
-
global DEFAULT_SESSION_TIMEOUT_MINUTES: int = 60
-
global DEFAULT_SESSION_COOKIE_NAME: str = ‘session’
-
global DEFAULT_SESSION_COOKIE_SECURE: bool = True
-
global DEFAULT_SESSION_COOKIE_HTTPONLY: bool = True
-
global DEFAULT_TOTP_DIGITS: int = 6
-
global DEFAULT_TOTP_INTERVAL: int = 30
-
global DEFAULT_TOTP_VALID_WINDOW: int = 1
decorators.py
Section titled “decorators.py”- def _extract_request(*args: Any, **kwargs: Any) -> Any
Extract the request-like object from positional or keyword args.
Section titled “Extract the request-like object from positional or keyword args.”
-
def require_auth(fn: F) -> F
Require a valid authenticated identity on the request context.
Section titled “Require a valid authenticated identity on the request context.” -
def require_roles(*roles: str) -> Callable[, F]
Require that the authenticated identity holds at least one of the given roles.
Section titled “Require that the authenticated identity holds at least one of the given roles.” -
global F
di/bundle_provider.py
Section titled “di/bundle_provider.py”Convenience provider that registers the full authentication + authorisation stack.
Section titled “Convenience provider that registers the full authentication + authorisation stack.”- class AuthBundleProvider:
init(self, config: AuthConfig | None = None, initial_roles: dict[str, Any] | None = None, enable_passkeys: bool = False, **kwargs: Any) -> None
Composite provider that wires the full Lexigram auth stack.
Section titled “Composite provider that wires the full Lexigram auth stack.”- def init(self, config: AuthConfig | None = None, initial_roles: dict[str, Any] | None = None, enable_passkeys: bool = False, **kwargs: Any) -> None
- @classmethod
- def from_config(cls, config: AuthConfig, **context: Any) -> Self
Create provider from config object.
Section titled “Create provider from config object.” - async def register(self, container: ContainerRegistrarProtocol) -> None
Register all auth sub-providers with the container.
Section titled “Register all auth sub-providers with the container.” - async def boot(self, container: BootContainerProtocol) -> None
Boot all auth sub-providers in registration order.
Section titled “Boot all auth sub-providers in registration order.” - async def shutdown(self) -> None
Shut down all auth sub-providers in reverse registration order.
Section titled “Shut down all auth sub-providers in reverse registration order.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Aggregate health check across all sub-providers.
Section titled “Aggregate health check across all sub-providers.”
di/provider.py
Section titled “di/provider.py”DI provider for lexigram-auth.
Section titled “DI provider for lexigram-auth.”- class AuthProvider:
init(self, config: AuthConfig | None = None) -> None
Authentication provider for Lexigram Framework.
Section titled “Authentication provider for Lexigram Framework.”- def init(self, config: AuthConfig | None = None) -> None
- @classmethod
- def from_config(cls, config: AuthConfig, **context: Any) -> AuthProvider
- async def register(self, container: ContainerRegistrarProtocol) -> None
Register auth services with the DI container.
Section titled “Register auth services with the DI container.” - async def boot(self, container: ContainerResolverProtocol) -> None
Boot the auth provider.
Section titled “Boot the auth provider.” - async def shutdown(self) -> None
Shutdown the auth provider.
Section titled “Shutdown the auth provider.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Return health status of the auth provider.
Section titled “Return health status of the auth provider.”
di/sub_providers/admin_provider.py
Section titled “di/sub_providers/admin_provider.py”Admin DI provider: registers AuthAdminContributor.
Section titled “Admin DI provider: registers AuthAdminContributor.”- class AuthAdminProvider:
init(self, config: AuthConfig | None = None) -> None
Registers AuthAdminContributor for admin panel integration.
Section titled “Registers AuthAdminContributor for admin panel integration.”- def init(self, config: AuthConfig | None = None) -> None
- async def register(self, container: ContainerRegistrarProtocol) -> None
Bind admin contributor singleton.
Section titled “Bind admin contributor singleton.” - async def boot(self, container: ContainerResolverProtocol) -> None
Boot the auth admin contributor.
Section titled “Boot the auth admin contributor.”
di/sub_providers/authentication_provider.py
Section titled “di/sub_providers/authentication_provider.py”Authentication provider - handles user authentication only.
Section titled “Authentication provider - handles user authentication only.”- @inject
- class AuthenticationProvider:
init(self, config: Annotated[AuthConfig, Inject] | None = None, password_policy: PasswordPolicy | None, user_store: UserStoreProtocol | None, token_manager: Any, cache_service: Any, mfa_service: MFAManager | None) -> None
User authentication ONLY (login/logout/validation).
Section titled “User authentication ONLY (login/logout/validation).”- def init(self, config: Annotated[AuthConfig, Inject] | None = None, password_policy: PasswordPolicy | None, user_store: UserStoreProtocol | None, token_manager: Any, cache_service: Any, mfa_service: MFAManager | None) -> None
- @property
- def service(self) -> AuthenticationService
Get or create the authentication service.
Section titled “Get or create the authentication service.” - async def get_user(self, user_id: str) -> Any | None
Fetch a user by their ID.
Section titled “Fetch a user by their ID.” - async def register(self, container: ContainerRegistrarProtocol) -> None
Register authentication services with the container.
Section titled “Register authentication services with the container.” - async def boot(self, container: BootContainerProtocol) -> None
Initialize authentication provider and register with kernel health registry.
Section titled “Initialize authentication provider and register with kernel health registry.” - async def shutdown(self) -> None
Shutdown authentication provider.
Section titled “Shutdown authentication provider.” - async def verify_token(self, token: str) -> Result[VerifiedToken, TokenError]
Verify a JWT token and return a
Section titled “Verify a JWT token and return a Result with the decoded payload.”Resultwith the decoded payload. - async def validate_session(self, token: str) -> Any
Validate a session token and return user information.
Section titled “Validate a session token and return user information.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Check authentication provider health.
Section titled “Check authentication provider health.”
- def _build_token_manager(secret_key: str | None, jwt_algorithm: str, cache_service: Any, required_audience: str | None = None) -> Any
Build a JWTTokenManager from the supplied credentials.
Section titled “Build a JWTTokenManager from the supplied credentials.”
di/sub_providers/authorization_provider.py
Section titled “di/sub_providers/authorization_provider.py”Authorization provider - handles role-based access control and permissions.
Section titled “Authorization provider - handles role-based access control and permissions.”- @inject
- class AuthorizationProvider:
init(self, config: Annotated[AuthConfig, Inject] | None = None, initial_roles: dict[str, Any] | None = None, **kwargs: Any) -> None
Role-based access control and permission management.
Section titled “Role-based access control and permission management.”- def init(self, config: Annotated[AuthConfig, Inject] | None = None, initial_roles: dict[str, Any] | None = None, **kwargs: Any) -> None
- @property
- def auth_config(self) -> AuthConfig | None
- async def register(self, container: ContainerRegistrarProtocol) -> None
Register authorization services with the container.
Section titled “Register authorization services with the container.” - async def boot(self, container: ContainerResolverProtocol) -> None
Initialize authorization provider.
Section titled “Initialize authorization provider.” - async def shutdown(self) -> None
Shutdown authorization provider.
Section titled “Shutdown authorization provider.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Check authorization provider health.
Section titled “Check authorization provider health.”
di/sub_providers/google_oauth_provider.py
Section titled “di/sub_providers/google_oauth_provider.py”Google OAuth provider — first-class Google token verification support.
Section titled “Google OAuth provider — first-class Google token verification support.”- @inject
- class GoogleOAuthProvider:
init(self, config: Annotated[AuthConfig, Inject] | None = None, google_oauth: dict[str, str] | None = None, http_client: Any | None = None, **kwargs: Any) -> None
Registers a first-class Google OAuth verification service.
Section titled “Registers a first-class Google OAuth verification service.”- def init(self, config: Annotated[AuthConfig, Inject] | None = None, google_oauth: dict[str, str] | None = None, http_client: Any | None = None, **kwargs: Any) -> None
- @property
- def service(self) -> GoogleOAuthService | None
Return the registered Google OAuth service, if any.
Section titled “Return the registered Google OAuth service, if any.” - async def register(self, container: ContainerRegistrarProtocol) -> None
Register the Google OAuth verifier service.
Section titled “Register the Google OAuth verifier service.” - async def boot(self, container: ContainerResolverProtocol) -> None
Initialize Google OAuth support.
Section titled “Initialize Google OAuth support.” - async def shutdown(self) -> None
Shutdown Google OAuth support.
Section titled “Shutdown Google OAuth support.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Check Google OAuth provider health.
Section titled “Check Google OAuth provider health.”
di/sub_providers/mfa_provider.py
Section titled “di/sub_providers/mfa_provider.py”MFA provider - handles multi-factor authentication only.
Section titled “MFA provider - handles multi-factor authentication only.”- @inject
- class MFAProvider:
init(self, **kwargs: Any) -> None
Multi-factor authentication ONLY.
Section titled “Multi-factor authentication ONLY.”- def init(self, **kwargs: Any) -> None
- async def register(self, container: ContainerRegistrarProtocol) -> None
Register MFA services with the container.
Section titled “Register MFA services with the container.” - async def boot(self, container: ContainerResolverProtocol) -> None
Initialize MFA provider.
Section titled “Initialize MFA provider.” - async def shutdown(self) -> None
Shutdown MFA provider.
Section titled “Shutdown MFA provider.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Check MFA provider health.
Section titled “Check MFA provider health.”
di/sub_providers/oauth2_provider.py
Section titled “di/sub_providers/oauth2_provider.py”OAuth2 provider - handles OAuth2/OIDC integration only.
Section titled “OAuth2 provider - handles OAuth2/OIDC integration only.”- @inject
- class OAuth2Provider:
init(self, config: Annotated[AuthConfig, Inject] | None = None, oauth2_providers: dict[str, dict[str, str]] | None = None, oauth_identity_store: OAuthIdentityStore | None = None, http_client: Any | None = None, **kwargs: Any) -> None
OAuth2/OIDC integration ONLY.
Section titled “OAuth2/OIDC integration ONLY.”- def init(self, config: Annotated[AuthConfig, Inject] | None = None, oauth2_providers: dict[str, dict[str, str]] | None = None, oauth_identity_store: OAuthIdentityStore | None = None, http_client: Any | None = None, **kwargs: Any) -> None
- @property
- def identity_resolver(self) -> OAuthIdentityStore | None
Return the OAuth identity store for resolving external IDs to internal UUIDs.
Section titled “Return the OAuth identity store for resolving external IDs to internal UUIDs.” - async def register(self, container: ContainerRegistrarProtocol) -> None
Register OAuth2 services with the container.
Section titled “Register OAuth2 services with the container.” - async def boot(self, container: ContainerResolverProtocol) -> None
Initialize OAuth2 provider.
Section titled “Initialize OAuth2 provider.” - async def shutdown(self) -> None
Shutdown OAuth2 provider.
Section titled “Shutdown OAuth2 provider.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Check OAuth2 provider health.
Section titled “Check OAuth2 provider health.”
di/sub_providers/oauth_provider.py
Section titled “di/sub_providers/oauth_provider.py”OAuth provider config helpers for provider presets.
Section titled “OAuth provider config helpers for provider presets.”- def detect_oauth_providers_from_config(oauth2_providers: dict[str, dict[str, str]]) -> dict[str, dict[str, str]]
Return valid OAuth2 providers from a pre-loaded config mapping.
Section titled “Return valid OAuth2 providers from a pre-loaded config mapping.”
di/sub_providers/passkey_provider.py
Section titled “di/sub_providers/passkey_provider.py”Passkey provider - handles WebAuthn/Passkey support only.
Section titled “Passkey provider - handles WebAuthn/Passkey support only.”- @inject
- class PasskeyProvider:
init(self, **kwargs: Any) -> None
WebAuthn/Passkey support ONLY.
Section titled “WebAuthn/Passkey support ONLY.”- def init(self, **kwargs: Any) -> None
- async def register(self, container: ContainerRegistrarProtocol) -> None
Register passkey services with the container.
Section titled “Register passkey services with the container.” - async def boot(self, container: ContainerResolverProtocol) -> None
Initialize passkey provider.
Section titled “Initialize passkey provider.” - async def shutdown(self) -> None
Shutdown passkey provider.
Section titled “Shutdown passkey provider.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Check passkey provider health.
Section titled “Check passkey provider health.”
di/sub_providers/session_provider.py
Section titled “di/sub_providers/session_provider.py”Session provider — registers SessionManagerImpl in the DI container.
Section titled “Session provider — registers SessionManagerImpl in the DI container.”- class SessionProvider:
init(self, config: Any = None, **kwargs: Any) -> None
Registers :class:
Section titled “Registers :class:~lexigram.auth.session.manager.SessionManagerImpl in the container.”~lexigram.auth.session.manager.SessionManagerImplin the container.- def init(self, config: Any = None, **kwargs: Any) -> None
Initialise the session provider.
Section titled “Initialise the session provider.” - async def register(self, container: ContainerRegistrarProtocol) -> None
Register :class:
Section titled “Register :class:SessionManagerImpl as a singleton.”SessionManagerImplas a singleton. - async def boot(self, container: ContainerResolverProtocol) -> None
Boot the session provider.
Section titled “Boot the session provider.” - async def shutdown(self) -> None
Shut down the session provider.
Section titled “Shut down the session provider.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Return the health of the session subsystem.
Section titled “Return the health of the session subsystem.”
- def init(self, config: Any = None, **kwargs: Any) -> None
di/sub_providers/token_provider.py
Section titled “di/sub_providers/token_provider.py”Token management provider - handles JWT tokens only.
Section titled “Token management provider - handles JWT tokens only.”- @inject
- class TokenProvider:
init(self, config: Annotated[AuthConfig, Inject] | None = None, secret_key: str | None = None, jwt_algorithm: str | None = None, jwt_access_expiration_hours: int | None = None, jwt_refresh_expiration_days: int | None = None, **kwargs: Any) -> None
JWT token management ONLY.
Section titled “JWT token management ONLY.”- def init(self, config: Annotated[AuthConfig, Inject] | None = None, secret_key: str | None = None, jwt_algorithm: str | None = None, jwt_access_expiration_hours: int | None = None, jwt_refresh_expiration_days: int | None = None, **kwargs: Any) -> None
- async def register(self, container: ContainerRegistrarProtocol) -> None
Register token services with the container.
Section titled “Register token services with the container.” - async def boot(self, container: ContainerResolverProtocol) -> None
Initialize token provider.
Section titled “Initialize token provider.” - async def shutdown(self) -> None
Shutdown token provider.
Section titled “Shutdown token provider.” - async def health_check(self, timeout: float = 5.0) -> HealthCheckResult
Check token provider health.
Section titled “Check token provider health.”
events.py
Section titled “events.py”Auth domain events emitted by key authentication operations.
Section titled “Auth domain events emitted by key authentication operations.”-
@dataclass(frozen=True, init=False)
-
class UserAuthenticated:
Emitted when a user successfully authenticates.
Section titled “Emitted when a user successfully authenticates.” -
@dataclass(frozen=True, init=False)
-
class AuthenticationFailed:
Emitted when an authentication attempt fails.
Section titled “Emitted when an authentication attempt fails.” -
@dataclass(frozen=True, init=False)
-
class UserRegistered:
Emitted when a new user account is created.
Section titled “Emitted when a new user account is created.” -
@dataclass(frozen=True, init=False)
-
class PasswordChanged:
Emitted when a user successfully changes their password.
Section titled “Emitted when a user successfully changes their password.” -
@dataclass(frozen=True, init=False)
-
class SessionCreated:
Emitted when a new session/token is issued.
Section titled “Emitted when a new session/token is issued.” -
@dataclass(frozen=True, init=False)
-
class SessionRevoked:
Emitted when a session/token is invalidated.
Section titled “Emitted when a session/token is invalidated.” -
@dataclass(frozen=True, init=False)
-
class UserLoggedIn:
Emitted after a successful password-based login.
Section titled “Emitted after a successful password-based login.” -
@dataclass(frozen=True, init=False)
-
class UserLoginFailed:
Emitted when a login attempt fails due to bad credentials.
Section titled “Emitted when a login attempt fails due to bad credentials.” -
@dataclass(frozen=True, init=False)
-
class UserLockedOut:
Emitted when a login attempt is rejected because the account is locked.
Section titled “Emitted when a login attempt is rejected because the account is locked.” -
@dataclass(frozen=True, init=False)
-
class UserLoggedOut:
Emitted when a user explicitly logs out.
Section titled “Emitted when a user explicitly logs out.” -
@dataclass(frozen=True, init=False)
-
class TokenRevoked:
Emitted when a specific token is revoked.
Section titled “Emitted when a specific token is revoked.”
exceptions.py
Section titled “exceptions.py”Exception hierarchy for Lexigram Auth.
Section titled “Exception hierarchy for Lexigram Auth.”-
class AuthError:
Base exception for all auth errors.
Section titled “Base exception for all auth errors.” -
class AuthenticationError:
Raised when authentication fails.
Section titled “Raised when authentication fails.” -
class AuthorizationError:
Raised when user lacks required permissions.
Section titled “Raised when user lacks required permissions.” -
class InvalidCredentialsError:
Raised when credentials are invalid.
Section titled “Raised when credentials are invalid.”init(self, message: str = ‘Invalid credentials’, **kwargs: Any) -> None
- def init(self, message: str = ‘Invalid credentials’, **kwargs: Any) ->
-
class AccountLockedError:
Raised when an account is locked due to too many failed login attempts.
Section titled “Raised when an account is locked due to too many failed login attempts.”init(self, email: str = ”, **kwargs: Any) -> None
- def init(self, email: str = ”, **kwargs: Any) ->
-
class UserNotFoundError:
Raised when user is not found.
Section titled “Raised when user is not found.”init(self, identifier: str, **kwargs: Any) -> None
- def init(self, identifier: str, **kwargs: Any) ->
-
class TokenError:
Base exception for token-related errors.
Section titled “Base exception for token-related errors.” -
class InvalidTokenError:
Raised when a token is malformed or invalid.
Section titled “Raised when a token is malformed or invalid.” -
class TokenExpiredError:
Raised when a token has expired.
Section titled “Raised when a token has expired.”init(self, message: str = ‘Token has expired’, expiration_time: str | None = None, **kwargs: Any) -> None
- def init(self, message: str = ‘Token has expired’, expiration_time: str | None = None, **kwargs: Any) -> None
-
class TokenBlacklistedError:
Token has been explicitly revoked.
Section titled “Token has been explicitly revoked.”init(self, message: str = ‘Token has been revoked’, **kwargs: Any) -> None
- def init(self, message: str = ‘Token has been revoked’, **kwargs: Any) -> None
-
class TokenInvalidError:
Token is structurally invalid or has wrong type.
Section titled “Token is structurally invalid or has wrong type.”init(self, message: str = ‘Token is invalid’, reason: str | None = None, **kwargs: Any) -> None
- def init(self, message: str = ‘Token is invalid’, reason: str | None = None, **kwargs: Any) -> None
-
class TokenAudienceError:
Token audience claim does not match expected.
Section titled “Token audience claim does not match expected.”init(self, message: str = ‘Token audience mismatch’, expected: str | None = None, actual: str | None = None, **kwargs: Any) -> None
- def init(self, message: str = ‘Token audience mismatch’, expected: str | None = None, actual: str | None = None, **kwargs: Any) -> None
-
class TokenNotFoundError:
Token record does not exist.
Section titled “Token record does not exist.”init(self, message: str = ‘Token not found’, token_id: str | None = None, **kwargs: Any) -> None
- def init(self, message: str = ‘Token not found’, token_id: str | None = None, **kwargs: Any) -> None
-
class InvalidAudienceError:
Raised when a token audience is invalid.
Section titled “Raised when a token audience is invalid.” -
class InvalidScopeError:
Raised when a token lacks required scope.
Section titled “Raised when a token lacks required scope.” -
class BlacklistedTokenError:
Raised when a token has been blacklisted.
Section titled “Raised when a token has been blacklisted.” -
class TokenExpiredVerificationError:
Account verification has expired.
Section titled “Account verification has expired.”init(self, message: str = ‘Verification has expired’, user_id: str | None = None, **kwargs: Any) -> None
- def init(self, message: str = ‘Verification has expired’, user_id: str | None = None, **kwargs: Any) -> None
-
class AlreadyVerifiedError:
Account is already verified.
Section titled “Account is already verified.”init(self, message: str = ‘Account is already verified’, user_id: str | None = None, **kwargs: Any) -> None
- def init(self, message: str = ‘Account is already verified’, user_id: str | None = None, **kwargs: Any) -> None
-
class EmailExistsError:
Raised when email is already taken.
Section titled “Raised when email is already taken.” -
class UsernameExistsError:
Raised when username is already taken.
Section titled “Raised when username is already taken.” -
class PasswordPolicyError:
Raised when password doesn’t meet requirements.
Section titled “Raised when password doesn’t meet requirements.” -
class OAuth2Error:
Base exception for OAuth2 errors.
Section titled “Base exception for OAuth2 errors.” -
class SessionNotFoundError:
Raised when a session cannot be found in the store.
Section titled “Raised when a session cannot be found in the store.”init(self, session_id: str, **kwargs: Any) -> None
- def init(self, session_id: str, **kwargs: Any) ->
hooks.py
Section titled “hooks.py”Root hook payload surface for lexigram-auth.
Section titled “Root hook payload surface for lexigram-auth.”-
@dataclass(frozen=True, kw_only=True)
-
class AuthUserAuthenticatedHook:
Payload fired when a user successfully authenticates.
Section titled “Payload fired when a user successfully authenticates.” -
@dataclass(frozen=True, kw_only=True)
-
class AuthAuthenticationFailedHook:
Payload fired when an authentication attempt fails.
Section titled “Payload fired when an authentication attempt fails.” -
@dataclass(frozen=True, kw_only=True)
-
class AuthTokenIssuedHook:
Payload fired when an access or refresh token is issued.
Section titled “Payload fired when an access or refresh token is issued.” -
@dataclass(frozen=True, kw_only=True)
-
class AuthTokenRevokedHook:
Payload fired when a token is explicitly revoked.
Section titled “Payload fired when a token is explicitly revoked.” -
@dataclass(frozen=True, kw_only=True)
-
class AuthTokenRefreshedHook:
Payload fired when an access token is refreshed.
Section titled “Payload fired when an access token is refreshed.”
mfa/hooks.py
Section titled “mfa/hooks.py”Lifecycle hooks for auth/mfa — intercepted when MFA operations occur.
Section titled “Lifecycle hooks for auth/mfa — intercepted when MFA operations occur.”-
@dataclass(frozen=True, kw_only=True)
-
class MFAChallengeIssuedHook:
Payload fired when an MFA challenge is issued to a user.
Section titled “Payload fired when an MFA challenge is issued to a user.” -
@dataclass(frozen=True, kw_only=True)
-
class MFAVerifiedHook:
Payload fired when MFA verification succeeds.
Section titled “Payload fired when MFA verification succeeds.” -
@dataclass(frozen=True, kw_only=True)
-
class MFAFailedHook:
Payload fired when MFA verification fails.
Section titled “Payload fired when MFA verification fails.”
mfa/manager.py
Section titled “mfa/manager.py”MFA Manager — consolidated TOTP and backup-code management.
Section titled “MFA Manager — consolidated TOTP and backup-code management.”- @inject
- class MFAManager:
init(self, user_store: UserStoreProtocol) -> None
Manages Multi-Factor Authentication (TOTP + backup codes) for users.
Section titled “Manages Multi-Factor Authentication (TOTP + backup codes) for users.”- def init(self, user_store: UserStoreProtocol) -> None
- async def enable_totp(self, user_id: str, issuer: str = ‘lexigram’) -> tuple[str, str, list[str]]
Enable TOTP for a user and return enrollment credentials.
Section titled “Enable TOTP for a user and return enrollment credentials.” - async def verify_totp(self, user_id: str, code: str) -> bool
Verify a TOTP or backup code for a user.
Section titled “Verify a TOTP or backup code for a user.” - async def disable_totp(self, user_id: str) -> bool
Disable TOTP for a user.
Section titled “Disable TOTP for a user.” - def repr(self) -> str
Return a string representation of this MFAManager.
Section titled “Return a string representation of this MFAManager.”
mfa/totp_vectors.py
Section titled “mfa/totp_vectors.py”RFC 6238 TOTP test vectors for testing without pyotp.
Section titled “RFC 6238 TOTP test vectors for testing without pyotp.”-
@dataclass(frozen=True)
-
class TOTPTestVector:
A TOTP test vector from RFC 6238.
Section titled “A TOTP test vector from RFC 6238.” -
class TOTPTestVectors:
RFC 6238 test vectors for TOTP verification.
Section titled “RFC 6238 test vectors for TOTP verification.”- @classmethod
- def get_all(cls) -> list[TOTPTestVector]
Get all test vectors.
Section titled “Get all test vectors.” - @classmethod
- def get_by_algorithm(cls, algorithm: str) -> list[TOTPTestVector]
Get test vectors by algorithm.
Section titled “Get test vectors by algorithm.” - @classmethod
- def get_by_digits(cls, digits: int) -> list[TOTPTestVector]
Get test vectors by digit count.
Section titled “Get test vectors by digit count.”
-
def generate_test_vector(secret: str, time: int, time_step: int = 30, digits: int = 6, algorithm: str = ‘SHA1’) -> str
Generate a TOTP for testing purposes.
Section titled “Generate a TOTP for testing purposes.”
models/mfa.py
Section titled “models/mfa.py”MFA model for Lexigram Auth.
Section titled “MFA model for Lexigram Auth.”- @dataclass(frozen=True)
- class UserMFA:
Represents MFA configuration for a user.
Section titled “Represents MFA configuration for a user.”
models/session.py
Section titled “models/session.py”Session model for Lexigram Auth.
Section titled “Session model for Lexigram Auth.”- @dataclass(frozen=True)
- class UserSession:
User session data model.
Section titled “User session data model.”- def is_expired(self) -> bool
Return
Section titled “Return True if this session has passed its expiry time.”Trueif this session has passed its expiry time. - def is_valid(self) -> bool
Return
Section titled “Return True if the session is active and has not expired.”Trueif the session is active and has not expired. - def remaining_ttl(self) -> timedelta | None
Return the time remaining until this session expires.
Section titled “Return the time remaining until this session expires.” - def refresh(self, expires_at: datetime, last_active_at: datetime | None = None) -> UserSession
Return a new :class:
Section titled “Return a new :class:UserSession with an updated expiry time.”UserSessionwith an updated expiry time.
- def is_expired(self) -> bool
models/token.py
Section titled “models/token.py”Authentication token models.
Section titled “Authentication token models.”- @dataclass(init=False)
- class AuthToken:
token: str = Field(…) expires_at: datetime = Field(…) refresh_token: str | None = None refresh_expires_at: datetime | None = None token_type: str = Field(…)
Authentication token response.
Section titled “Authentication token response.”
models/user.py
Section titled “models/user.py”User model.
Section titled “User model.”-
@dataclass(init=False)
-
class UserCredentials:
Sensitive credential data for a user.
Section titled “Sensitive credential data for a user.”user_id: str = Field(…) hashed_password: str | None = None previous_hashes: list[str] = Field(default_factory=…)
-
@dataclass(init=False)
-
class User:
User model representing an authenticated user.
Section titled “User model representing an authenticated user.”user_id: str = Field(default_factory=…) email: str = ” name: str | None = None is_active: bool = True is_verified: bool = False is_superuser: bool = False roles: list[str] = Field(default_factory=…) permissions: list[str] = Field(default_factory=…) profile: dict[str, Any] = Field(default_factory=…) created_at: datetime | None = Field(default_factory=…) updated_at: datetime | None = Field(default_factory=…) last_login_at: datetime | None = None login_count: int = 0 delegations: list[Any] = Field(default_factory=…) _request_metadata: dict[str, Any] = Field(default_factory=…)
module.py
Section titled “module.py”Authentication and authorization module for dependency injection.
Section titled “Authentication and authorization module for dependency injection.”- @module(is_global=True)
- class AuthModule:
Full authentication and authorization stack (JWT, OAuth2/OIDC, RBAC, policies).
Section titled “Full authentication and authorization stack (JWT, OAuth2/OIDC, RBAC, policies).”- @classmethod
- def configure(cls, config: Any | None = None, initial_roles: dict[str, Any] | None = None, is_global: bool = True) -> DynamicModule
Create an AuthModule with explicit configuration.
Section titled “Create an AuthModule with explicit configuration.” - @classmethod
- def stub(cls, config: Any = None) -> DynamicModule
Return an in-memory AuthModule suitable for unit and integration testing.
Section titled “Return an in-memory AuthModule suitable for unit and integration testing.”
policies/engine.py
Section titled “policies/engine.py”Policy Engine for ABAC evaluation.
Section titled “Policy Engine for ABAC evaluation.”-
class PatternMatcher:
Protocol for pattern matchers.
Section titled “Protocol for pattern matchers.”- def can_match(self, pattern: str) -> bool …
- def matches(self, pattern: str, target: str) -> bool …
-
class ExactPatternMatcher:
Matches exact string patterns without wildcards.
Section titled “Matches exact string patterns without wildcards.”- def can_match(self, pattern: str) -> bool
- def matches(self, pattern: str, target: str) -> bool
-
class WildcardPatternMatcher:
Matches wildcard patterns using regex.
Section titled “Matches wildcard patterns using regex.”- def can_match(self, pattern: str) -> bool
- def matches(self, pattern: str, target: str) -> bool
-
class GlobPatternMatcher:
Matches glob-style patterns (e.g., ‘user.*’ matches ‘user.read’).
Section titled “Matches glob-style patterns (e.g., ‘user.*’ matches ‘user.read’).”- def can_match(self, pattern: str) -> bool
- def matches(self, pattern: str, target: str) -> bool
-
class PatternMatcherRegistry:
Registry for pattern matchers.
Section titled “Registry for pattern matchers.”init(self) -> None
- def init(self) -> None
- @classmethod
- def with_defaults(cls) -> PatternMatcherRegistry
Create a registry pre-loaded with the standard pattern matchers.
Section titled “Create a registry pre-loaded with the standard pattern matchers.”
- def _register_default_matchers(self) -> None
Register the default pattern matchers.
Section titled “Register the default pattern matchers.”
- def register_matcher(self, matcher: PatternMatcher) -> None
Register a custom pattern matcher.
Section titled “Register a custom pattern matcher.” - def matches(self, pattern: str, target: str) -> bool
Check if the target matches the pattern using registered matchers.
Section titled “Check if the target matches the pattern using registered matchers.”
-
class PolicyEngine:
Evaluates authorization requests against a collection of policies.
Section titled “Evaluates authorization requests against a collection of policies.”init(self, policies: list[Policy] | None = None, store: PolicyStoreProtocol | None, strategy: Literal[‘deny_first’, ‘allow_first’, ‘unanimous’]) -> None
- def init(self, policies: list[Policy] | None = None, store: PolicyStoreProtocol | None, strategy: Literal[‘deny_first’, ‘allow_first’, ‘unanimous’]) -> None
Initialise the policy engine with a static list and/or a store.
Section titled “Initialise the policy engine with a static list and/or a store.” - def repr(self) -> str
Return developer-friendly string representation.
Section titled “Return developer-friendly string representation.”
- def _rebuild_resource_index(self) -> None
Rebuild the resource-pattern index from
Section titled “Rebuild the resource-pattern index from self.policies.”self.policies.
- async def load_from_store(self) -> None
Load policies from the configured :class:
Section titled “Load policies from the configured :class:PolicyStoreProtocol.”PolicyStoreProtocol. - async def save_policy(self, policy: Policy) -> None
Persist a policy to the store and add it to the in-memory list.
Section titled “Persist a policy to the store and add it to the in-memory list.” - def evaluate(self, request: AuthorizationRequest) -> AuthorizationDecision
Evaluate an authorization request against loaded policies.
Section titled “Evaluate an authorization request against loaded policies.”
- def _matches(self, policy: Policy, request: AuthorizationRequest) -> bool
Check if a policy applies to the given request.
Section titled “Check if a policy applies to the given request.” - def _pattern_match(self, patterns: list[str], target: str) -> bool
Check if target matches any of the patterns using the registry.
Section titled “Check if target matches any of the patterns using the registry.”
- def init(self, policies: list[Policy] | None = None, store: PolicyStoreProtocol | None, strategy: Literal[‘deny_first’, ‘allow_first’, ‘unanimous’]) -> None
policies/evaluator.py
Section titled “policies/evaluator.py”Condition evaluators for ABAC Policy Engine.
Section titled “Condition evaluators for ABAC Policy Engine.”-
class OperatorHandlerProtocol:
Protocol for operator handlers.
Section titled “Protocol for operator handlers.”- def compare(self, actual: Any, expected: Any) -> bool …
-
class EqualsOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class NotEqualsOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class ContainsOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class NotContainsOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class InOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class NotInOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class MatchesOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class GreaterThanOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class LessThanOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class GreaterThanOrEqualsOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class LessThanOrEqualsOperator:
- def compare(self, actual: Any, expected: Any) -> bool
-
class OperatorRegistry:
Registry for condition operators.
Section titled “Registry for condition operators.”init(self) -> None
- def init(self) -> None
- @classmethod
- def with_defaults(cls) -> OperatorRegistry
Create a registry pre-loaded with the standard operator handlers.
Section titled “Create a registry pre-loaded with the standard operator handlers.”
- def _register_default_handlers(self) -> None
- def register_handler(self, operator: str, handler: OperatorHandlerProtocol) -> None
Register a custom operator handler.
Section titled “Register a custom operator handler.” - def compare(self, actual: Any, operator: str, expected: Any) -> bool
Compare using the registered handler for the operator.
Section titled “Compare using the registered handler for the operator.”
-
class ConditionEvaluator:
Evaluates individual policy conditions against a request context.
Section titled “Evaluates individual policy conditions against a request context.”init(self) -> None
- def init(self) -> None
- @staticmethod
- def _compile_path(path: str) -> Callable[, Any]
Compile an attribute path string into a reusable accessor function.
Section titled “Compile an attribute path string into a reusable accessor function.”
- def evaluate(self, condition: Condition, context: dict[str, Any]) -> bool
Evaluate a single condition against the context.
Section titled “Evaluate a single condition against the context.”
- def _resolve_attribute(self, path: str, context: dict[str, Any]) -> Any
Resolve a nested attribute from the context using a cached accessor.
Section titled “Resolve a nested attribute from the context using a cached accessor.”
policies/in_memory_store.py
Section titled “policies/in_memory_store.py”In-memory policy store for development and testing.
Section titled “In-memory policy store for development and testing.”- class InMemoryPolicyStore:
init(self) -> None
In-memory policy storage for development and testing.
Section titled “In-memory policy storage for development and testing.”- def init(self) -> None
- async def get_all(self) -> list[Policy]
Get all policies.
Section titled “Get all policies.” - async def get_by_id(self, policy_id: str) -> Policy | None
Get a policy by ID.
Section titled “Get a policy by ID.” - async def get_by_name(self, name: str) -> Policy | None
Get a policy by name.
Section titled “Get a policy by name.” - async def save(self, policy: Policy) -> None
Save a policy.
Section titled “Save a policy.” - async def delete(self, policy_id: str) -> bool
Delete a policy.
Section titled “Delete a policy.” - async def exists(self, policy_id: str) -> bool
Check if a policy exists.
Section titled “Check if a policy exists.” - def clear(self) -> None
Clear all policies.
Section titled “Clear all policies.”
policies/store.py
Section titled “policies/store.py”Policy store protocols for ABAC policy persistence.
Section titled “Policy store protocols for ABAC policy persistence.”-
@runtime_checkable
-
class PolicyStoreProtocol:
Protocol for policy storage backends.
Section titled “Protocol for policy storage backends.” -
@runtime_checkable
-
class PolicyLoader:
Protocol for loading policies from various sources.
Section titled “Protocol for loading policies from various sources.”
policies/types.py
Section titled “policies/types.py”Core types for Lexigram ABAC Policy Engine.
Section titled “Core types for Lexigram ABAC Policy Engine.”-
class PolicyEffect: ALLOW = ‘allow’ DENY = ‘deny’
-
class DecisionOutcome: ALLOW = ‘allow’ DENY = ‘deny’ INDETERMINATE = ‘indeterminate’
-
@dataclass(frozen=True)
-
class Condition:
A condition that must be met for a policy to apply.
Section titled “A condition that must be met for a policy to apply.” -
@dataclass(frozen=True)
-
class Policy:
A granular authorization policy.
Section titled “A granular authorization policy.” -
@dataclass(frozen=True)
-
class AuthorizationRequest:
A request for authorization evaluation.
Section titled “A request for authorization evaluation.” -
@dataclass(frozen=True)
-
class AuthorizationDecision:
The result of an authorization evaluation.
Section titled “The result of an authorization evaluation.”
protocols.py
Section titled “protocols.py”-
@runtime_checkable
-
class TokenValidatorProtocol:
Validates and decodes authentication tokens.
Section titled “Validates and decodes authentication tokens.” -
@runtime_checkable
-
class IdentityResolverProtocol:
Resolves a token payload to a concrete user identity.
Section titled “Resolves a token payload to a concrete user identity.”
services/result_pattern_service.py
Section titled “services/result_pattern_service.py”Authentication service using Result pattern for error handling.
Section titled “Authentication service using Result pattern for error handling.”- class AuthServiceWithResultPattern:
init(self, cache: CacheBackendProtocol | None = None, event_bus: EventBusProtocol | None = None) -> None
Authentication service using Result pattern.
Section titled “Authentication service using Result pattern.”- def init(self, cache: CacheBackendProtocol | None = None, event_bus: EventBusProtocol | None = None) -> None
Initialize the auth service with optional dependencies.
Section titled “Initialize the auth service with optional dependencies.” - async def validate_password(self, password: str, min_length: int = 8, require_uppercase: bool = True, require_digits: bool = True) -> Result[None, AuthError]
Validate password against policy requirements.
Section titled “Validate password against policy requirements.” - async def verify_credentials(self, username: str, password: str, stored_hash: str | None = None) -> Result[bool, AuthError]
Verify user credentials against stored hash.
Section titled “Verify user credentials against stored hash.” - async def get_cached_token(self, token_id: str) -> Result[AuthToken | None, AuthError]
Get cached auth token.
Section titled “Get cached auth token.” - async def cache_token(self, token_id: str, token: AuthToken, ttl: int = 3600) -> Result[None, AuthError]
Cache auth token with TTL.
Section titled “Cache auth token with TTL.” - async def invalidate_token(self, token_id: str) -> Result[None, AuthError]
Invalidate a cached token.
Section titled “Invalidate a cached token.”
- def init(self, cache: CacheBackendProtocol | None = None, event_bus: EventBusProtocol | None = None) -> None
session/cookie_backend.py
Section titled “session/cookie_backend.py”Session-cookie authentication backend for SSR admin flows.
Section titled “Session-cookie authentication backend for SSR admin flows.”- class SessionCookieBackend:
init(self, session_repository: SessionRepositoryProtocol, user_fetcher: Callable[, Awaitable[AuthenticatedUserProtocol | None]], ids: IdGeneratorProtocol | None = None, cookie_name: str = ‘session_id’, secure: bool = True, http_only: bool = True, same_site: str = ‘lax’) -> None
Session-cookie authentication backend for SSR flows.
Section titled “Session-cookie authentication backend for SSR flows.”- def init(self, session_repository: SessionRepositoryProtocol, user_fetcher: Callable[, Awaitable[AuthenticatedUserProtocol | None]], ids: IdGeneratorProtocol | None = None, cookie_name: str = ‘session_id’, secure: bool = True, http_only: bool = True, same_site: str = ‘lax’) -> None
- async def authenticate(self, request: Any) -> AuthenticatedUserProtocol | None
Extract session cookie, validate, return user or
Section titled “Extract session cookie, validate, return user or None.”None. - async def login(self, response: Any, user_id: str, expires_in: int = 86400) -> str
Create a session record and set the session cookie on response.
Section titled “Create a session record and set the session cookie on response.” - async def logout(self, request: Any, response: Any) -> None
Invalidate the session and clear the cookie.
Section titled “Invalidate the session and clear the cookie.”
session/fingerprint.py
Section titled “session/fingerprint.py”Device fingerprinting for Lexigram Auth.
Section titled “Device fingerprinting for Lexigram Auth.”- def generate_device_id(fingerprint_data: dict[str, Any]) -> str
Generate a stable device ID from fingerprint data.
Section titled “Generate a stable device ID from fingerprint data.”
session/hooks.py
Section titled “session/hooks.py”Lifecycle hooks for auth/session — intercepted when session operations occur.
Section titled “Lifecycle hooks for auth/session — intercepted when session operations occur.”-
@dataclass(frozen=True, kw_only=True)
-
class SessionCreatedHook:
Payload fired when a new session is created.
Section titled “Payload fired when a new session is created.” -
@dataclass(frozen=True, kw_only=True)
-
class SessionRefreshedHook:
Payload fired when a session token is refreshed.
Section titled “Payload fired when a session token is refreshed.” -
@dataclass(frozen=True, kw_only=True)
-
class SessionExpiredHook:
Payload fired when a session expires.
Section titled “Payload fired when a session expires.” -
@dataclass(frozen=True, kw_only=True)
-
class SessionRevokedHook:
Payload fired when a session is explicitly revoked.
Section titled “Payload fired when a session is explicitly revoked.”
session/manager.py
Section titled “session/manager.py”Session Management for Lexigram Auth.
Section titled “Session Management for Lexigram Auth.”- @inject
- class SessionManagerImpl:
init(self, session_store: SessionStore | None = None, ids: IdGeneratorProtocol | None = None, audit_logger: AuditLoggerProtocol | None, revoke_token: Callable[, Awaitable[None]] | None, max_sessions_per_user: int | None) -> None
Manages persistent user sessions with device awareness.
Section titled “Manages persistent user sessions with device awareness.”- def init(self, session_store: SessionStore | None = None, ids: IdGeneratorProtocol | None = None, audit_logger: AuditLoggerProtocol | None, revoke_token: Callable[, Awaitable[None]] | None, max_sessions_per_user: int | None) -> None
Initialise the session manager.
Section titled “Initialise the session manager.” - def repr(self) -> str
Return developer-friendly string representation.
Section titled “Return developer-friendly string representation.” - async def create_session(self, user_id: str, fingerprint_data: dict[str, Any], ip_address: str | None = None, user_agent: str | None = None, expires_days: int = DEFAULT_EXPIRY_DAYS) -> UserSession
Create a new session for a user.
Section titled “Create a new session for a user.” - async def validate_session(self, session_id: str) -> Result[UserSession, AuthenticationError | SessionNotFoundError | TokenExpiredError]
Validate a session and update its activity.
Section titled “Validate a session and update its activity.”
- def _prune_activity_write_cache(self, now: float) -> None
Remove stale entries from the activity-write tracker.
Section titled “Remove stale entries from the activity-write tracker.”
- async def revoke_session(self, session_id: str) -> bool
Revoke a specific session.
Section titled “Revoke a specific session.” - async def verify_mfa(self, session_id: str) -> bool
Mark the session as MFA verified.
Section titled “Mark the session as MFA verified.” - async def revoke_all_sessions(self, user_id: str) -> bool
Revoke all active sessions for a user.
Section titled “Revoke all active sessions for a user.” - async def get_active_sessions(self, user_id: str) -> list[UserSession]
List all active sessions for a user.
Section titled “List all active sessions for a user.”
- def _generate_fallback_id(self) -> str
Generate session ID when IdGenerator is not injected.
Section titled “Generate session ID when IdGenerator is not injected.”
- def init(self, session_store: SessionStore | None = None, ids: IdGeneratorProtocol | None = None, audit_logger: AuditLoggerProtocol | None, revoke_token: Callable[, Awaitable[None]] | None, max_sessions_per_user: int | None) -> None
storage/_mongo_store.py
Section titled “storage/_mongo_store.py”MongoDB-backed user store implementation.
Section titled “MongoDB-backed user store implementation.”- @inject
- class MongoDBUserStore:
init(self, document_store: DocumentStoreProtocol, collection_name: str = ‘users’) -> None
MongoDB-based user store using
Section titled “MongoDB-based user store using DocumentStoreProtocol.”DocumentStoreProtocol.- def init(self, document_store: DocumentStoreProtocol, collection_name: str = ‘users’) ->
- @property
- def _col(self) -> CollectionProtocol
Lazily initialize the collection handle.
Section titled “Lazily initialize the collection handle.” - async def _ensure_collection(self) -> None
Ensure user collection exists.
Section titled “Ensure user collection exists.” - async def _user_from_doc(self, doc: dict[str, Any]) -> User
Convert MongoDB document to User object.
Section titled “Convert MongoDB document to User object.” - async def _doc_from_user(self, user: User) -> dict[str, Any]
Convert User object to MongoDB document (non-credential fields).
Section titled “Convert User object to MongoDB document (non-credential fields).”
- async def create_user(self, name: str, email: str, hashed_password: str | None, roles: list[str] | None = None, permissions: list[str] | None = None, profile: dict[str, Any] | None = None, **kwargs: Any) -> User
Create a new user.
Section titled “Create a new user.” - async def get_user_by_id(self, user_id: str) -> User | None
Get user by ID.
Section titled “Get user by ID.” - async def get_user_by_email(self, email: str) -> User | None
Get user by email.
Section titled “Get user by email.” - async def update_user(self, user: User) -> None
Update non-credential user information.
Section titled “Update non-credential user information.” - async def delete_user(self, user_id: str) -> None
Delete a user.
Section titled “Delete a user.” - async def list_users(self, skip: int = 0, limit: int = 100) -> list[User]
List users with pagination.
Section titled “List users with pagination.” - async def count_users(self) -> int
Count total users.
Section titled “Count total users.” - async def get_credentials(self, user_id: str) -> UserCredentials | None
Return credential data for user_id.
Section titled “Return credential data for user_id.” - async def update_credentials(self, creds: UserCredentials) -> None
Persist updated credentials for the user.
Section titled “Persist updated credentials for the user.”
storage/_sql_store.py
Section titled “storage/_sql_store.py”SQL-backed user store implementation.
Section titled “SQL-backed user store implementation.”- @inject
- class SQLUserStore:
init(self, db_provider: DatabaseProviderProtocol) -> None
Database-based user store (SQL-backed implementation)
Section titled “Database-based user store (SQL-backed implementation)”- def init(self, db_provider: DatabaseProviderProtocol) ->
- async def _ensure_tables(self) -> None
Ensure user table exists (Managed by Alembic migrations).
Section titled “Ensure user table exists (Managed by Alembic migrations).” - async def _user_from_row(self, row: Any) -> User
Convert database row to User object
Section titled “Convert database row to User object” - def _credentials_from_row(self, row: Any) -> UserCredentials
Extract credential data from a database row.
Section titled “Extract credential data from a database row.”
- async def create_user(self, name: str, email: str, hashed_password: str, roles: list[str] | None = None, permissions: list[str] | None = None, profile: dict[str, Any] | None = None, **kwargs: Any) -> User
Create a new user using the canonical DatabaseProvider API.
Section titled “Create a new user using the canonical DatabaseProvider API.” - async def get_user_by_id(self, user_id: str) -> User | None
Get user by ID
Section titled “Get user by ID” - async def get_user_by_email(self, email: str) -> User | None
Get user by email
Section titled “Get user by email” - async def update_user(self, user: User) -> None
Update non-credential user information.
Section titled “Update non-credential user information.” - async def delete_user(self, user_id: str) -> None
Delete a user
Section titled “Delete a user” - async def list_users(self, skip: int = 0, limit: int = 100) -> list[User]
List users with pagination
Section titled “List users with pagination” - async def count_users(self) -> int
Count total users
Section titled “Count total users” - async def get_credentials(self, user_id: str) -> UserCredentials | None
Return credential data for user_id from the database.
Section titled “Return credential data for user_id from the database.” - async def update_credentials(self, creds: UserCredentials) -> None
Persist updated credential data for
Section titled “Persist updated credential data for creds.user_id.”creds.user_id.
storage/cached_user_store.py
Section titled “storage/cached_user_store.py”Cached user store implementation for performance optimization
Section titled “Cached user store implementation for performance optimization”-
class CachedUserStore:
User store with multi-layer caching for high-performance user lookups.
Section titled “User store with multi-layer caching for high-performance user lookups.”init(self, user_store: UserStoreProtocol, cache_service: CacheBackendProtocol, cache_ttl: int = 300, memory_cache_ttl: int = 60, pubsub: PubSubProtocol | None) -> None
- def init(self, user_store: UserStoreProtocol, cache_service: CacheBackendProtocol, cache_ttl: int = 300, memory_cache_ttl: int = 60, pubsub: PubSubProtocol | None) ->
Initialize cached user store.
Section titled “Initialize cached user store.” - async def subscribe_to_invalidations(self) -> None
Subscribe to cross-instance cache invalidation events.
Section titled “Subscribe to cross-instance cache invalidation events.”
- async def _handle_invalidation(self, data: Any) -> None
Handle a
Section titled “Handle a user.cache.invalidate event from another instance.”user.cache.invalidateevent from another instance.
- async def get_user_by_id(self, user_id: str) -> User | None
Get user by ID with multi-layer caching.
Section titled “Get user by ID with multi-layer caching.” - async def get_user_by_email(self, email: str) -> User | None
Get user by email with caching.
Section titled “Get user by email with caching.” - async def update_user(self, user: User) -> None
Update user and invalidate all related caches.
Section titled “Update user and invalidate all related caches.” - async def delete_user(self, user_id: str) -> None
Delete user and invalidate caches.
Section titled “Delete user and invalidate caches.” - async def create_user(self, name: str, email: str, hashed_password: str | None, roles: list[str] | None = None, permissions: list[str] | None = None, profile: dict[str, Any] | None = None, **kwargs: Any) -> User
Create user (no caching needed for new users).
Section titled “Create user (no caching needed for new users).” - async def list_users(self, skip: int = 0, limit: int = 100) -> list[User]
List users (not cached for simplicity).
Section titled “List users (not cached for simplicity).” - async def count_users(self) -> int
Count users (not cached for simplicity).
Section titled “Count users (not cached for simplicity).” - async def get_credentials(self, user_id: str) -> UserCredentials | None
Return credential data from the underlying store.
Section titled “Return credential data from the underlying store.” - async def update_credentials(self, creds: UserCredentials) -> None
Persist updated credentials and invalidate the user cache.
Section titled “Persist updated credentials and invalidate the user cache.”
- def _serialize_user(self, user: User) -> dict[str, Any]
Serialize user for caching (credential fields excluded).
Section titled “Serialize user for caching (credential fields excluded).” - def _deserialize_user(self, data: dict[str, Any]) -> User
Deserialize user from cache (credential fields excluded).
Section titled “Deserialize user from cache (credential fields excluded).”
- def init(self, user_store: UserStoreProtocol, cache_service: CacheBackendProtocol, cache_ttl: int = 300, memory_cache_ttl: int = 60, pubsub: PubSubProtocol | None) ->
-
global CACHE_INVALIDATION_TOPIC
storage/db_stores.py
Section titled “storage/db_stores.py”Database-backed user store implementations.
Section titled “Database-backed user store implementations.”- @inject
- class RedisUserStore:
init(self, db_provider: DatabaseProviderProtocol, prefix: str = ‘user:’, ttl: int | None = None) -> None
Redis-backed user cache (read-through cache, not primary storage).
Section titled “Redis-backed user cache (read-through cache, not primary storage).”- def init(self, db_provider: DatabaseProviderProtocol, prefix: str = ‘user:’, ttl: int | None = None) -> None
- async def _user_key(self, user_id: str) -> str
Generate Redis key for user
Section titled “Generate Redis key for user” - async def _user_from_data(self, data: dict[str, Any]) -> User
Convert Redis hash data to a User object (credentials excluded).
Section titled “Convert Redis hash data to a User object (credentials excluded).” - async def _data_from_user(self, user: User) -> dict[str, str]
Convert User object to Redis hash fields (non-credential fields only).
Section titled “Convert User object to Redis hash fields (non-credential fields only).”
- async def create_user(self, name: str, email: str, hashed_password: str | None, roles: list[str] | None = None, permissions: list[str] | None = None, profile: dict[str, Any] | None = None, **kwargs: Any) -> User
Cache a new user entry.
Section titled “Cache a new user entry.” - async def get_user_by_id(self, user_id: str) -> User | None
Get user by ID
Section titled “Get user by ID” - async def get_user_by_email(self, email: str) -> User | None
Not supported — Redis does not index by email efficiently.
Section titled “Not supported — Redis does not index by email efficiently.” - async def update_user(self, user: User) -> None
Update non-credential fields for the cached user entry.
Section titled “Update non-credential fields for the cached user entry.” - async def delete_user(self, user_id: str) -> None
Delete a user
Section titled “Delete a user” - async def list_users(self, skip: int = 0, limit: int = 100) -> list[User]
Not supported — Redis is not designed for full user enumeration.
Section titled “Not supported — Redis is not designed for full user enumeration.” - async def count_users(self) -> int
Not supported — Redis is not designed for counting all users.
Section titled “Not supported — Redis is not designed for counting all users.” - async def get_credentials(self, user_id: str) -> UserCredentials | None
Return cached credential data for user_id.
Section titled “Return cached credential data for user_id.” - async def update_credentials(self, creds: UserCredentials) -> None
Update credential fields in the cached user hash.
Section titled “Update credential fields in the cached user hash.”
storage/in_memory_stores.py
Section titled “storage/in_memory_stores.py”In-memory session store for development and testing.
Section titled “In-memory session store for development and testing.”-
class InMemorySessionStore:
In-memory session storage for development and testing.
Section titled “In-memory session storage for development and testing.”init(self) -> None
- def init(self) -> None
- async def create(self, session: UserSession) -> str
Create a new session.
Section titled “Create a new session.” - async def get(self, session_id: str) -> UserSession | None
Get a session by ID.
Section titled “Get a session by ID.” - async def update(self, session_id: str, **updates: Any) -> None
Update session fields.
Section titled “Update session fields.” - async def delete(self, session_id: str) -> bool
Delete a session.
Section titled “Delete a session.” - async def delete_all_for_user(self, user_id: str) -> int
Delete all sessions for a user.
Section titled “Delete all sessions for a user.” - async def list_for_user(self, user_id: str) -> list[UserSession]
List all sessions for a user.
Section titled “List all sessions for a user.”
-
class InMemoryMFAStore:
In-memory MFA storage for development and testing.
Section titled “In-memory MFA storage for development and testing.”init(self) -> None
- def init(self) -> None
- async def get(self, user_id: str) -> dict[str, Any] | None
Get MFA config for a user.
Section titled “Get MFA config for a user.” - async def save(self, mfa_data: dict[str, object]) -> None
Save MFA config for a user.
Section titled “Save MFA config for a user.” - async def delete(self, user_id: str) -> bool
Delete MFA config for a user.
Section titled “Delete MFA config for a user.”
storage/oauth_identity_store.py
Section titled “storage/oauth_identity_store.py”OAuth identity storage for linking users to OAuth providers.
Section titled “OAuth identity storage for linking users to OAuth providers.”-
@runtime_checkable
-
class OAuthIdentityStore:
Protocol for OAuth identity storage.
Section titled “Protocol for OAuth identity storage.” -
class OAuthIdentity:
OAuth identity linking user to provider
Section titled “OAuth identity linking user to provider”init(self, user_id: str, provider: str, provider_user_id: str, created_at: datetime | None = None, updated_at: datetime | None = None) -> None
- def init(self, user_id: str, provider: str, provider_user_id: str, created_at: datetime | None = None, updated_at: datetime | None = None) ->
-
@inject
-
class SQLAlchemyOAuthIdentityStore:
Database-backed OAuth identity store
Section titled “Database-backed OAuth identity store”init(self, db_provider: DatabaseProviderProtocol) -> None
- def init(self, db_provider: DatabaseProviderProtocol) ->
- async def _ensure_tables(self) -> None
Ensure oauth_identities table exists.
Section titled “Ensure oauth_identities table exists.” - async def _identity_from_row(self, row: Any) -> OAuthIdentity
Convert database row to OAuthIdentity object
Section titled “Convert database row to OAuthIdentity object”
- async def create_oauth_identity(self, user_id: str, provider: str, provider_user_id: str) -> OAuthIdentity
Create OAuth identity link
Section titled “Create OAuth identity link” - async def get_oauth_identity(self, provider: str, provider_user_id: str) -> OAuthIdentity | None
Get OAuth identity by provider and provider user ID
Section titled “Get OAuth identity by provider and provider user ID” - async def get_oauth_identities_for_user(self, user_id: str) -> list[OAuthIdentity]
Get all OAuth identities for a user
Section titled “Get all OAuth identities for a user” - async def delete_oauth_identity(self, provider: str, provider_user_id: str) -> bool
Delete OAuth identity
Section titled “Delete OAuth identity” - async def delete_oauth_identities_for_user(self, user_id: str) -> int
Delete all OAuth identities for a user
Section titled “Delete all OAuth identities for a user” - async def get_user_by_oauth_identity(self, provider: str, provider_user_id: str) -> str | None
Get local user_id by OAuth provider and external user ID.
Section titled “Get local user_id by OAuth provider and external user ID.” - async def resolve_user_id(self, user_id_or_oauth_id: str, provider: str = ‘google’) -> str | None
Resolve user_id from either UUID or OAuth external ID.
Section titled “Resolve user_id from either UUID or OAuth external ID.” - def resolve_user_id_sync(self, external_id: str, provider: str = ‘google’) -> str | None
Synchronous resolution is not supported for database-backed store.
Section titled “Synchronous resolution is not supported for database-backed store.”
-
@inject
-
class MongoDBOAuthIdentityStore:
MongoDB-backed OAuth identity store
Section titled “MongoDB-backed OAuth identity store”init(self, db_provider: DatabaseProviderProtocol, collection_name: str = ‘oauth_identities’) -> None
- def init(self, db_provider: DatabaseProviderProtocol, collection_name: str = ‘oauth_identities’) ->
- async def _ensure_collection(self) -> None
Ensure collection exists with indexes
Section titled “Ensure collection exists with indexes” - async def _identity_from_doc(self, doc: dict[str, Any]) -> OAuthIdentity
Convert MongoDB document to OAuthIdentity object
Section titled “Convert MongoDB document to OAuthIdentity object” - async def _doc_from_identity(self, identity: OAuthIdentity) -> dict[str, Any]
Convert OAuthIdentity object to MongoDB document
Section titled “Convert OAuthIdentity object to MongoDB document”
- async def create_oauth_identity(self, user_id: str, provider: str, provider_user_id: str) -> OAuthIdentity
Create OAuth identity link
Section titled “Create OAuth identity link” - async def get_oauth_identity(self, provider: str, provider_user_id: str) -> OAuthIdentity | None
Get OAuth identity by provider and provider user ID
Section titled “Get OAuth identity by provider and provider user ID” - async def get_oauth_identities_for_user(self, user_id: str) -> list[OAuthIdentity]
Get all OAuth identities for a user
Section titled “Get all OAuth identities for a user” - async def delete_oauth_identity(self, provider: str, provider_user_id: str) -> bool
Delete OAuth identity
Section titled “Delete OAuth identity” - async def delete_oauth_identities_for_user(self, user_id: str) -> int
Delete all OAuth identities for a user
Section titled “Delete all OAuth identities for a user” - async def get_user_by_oauth_identity(self, provider: str, provider_user_id: str) -> str | None
Get local user_id by OAuth provider and external user ID.
Section titled “Get local user_id by OAuth provider and external user ID.” - async def resolve_user_id(self, user_id_or_oauth_id: str, provider: str = ‘google’) -> str | None
Resolve user_id from either UUID or OAuth external ID.
Section titled “Resolve user_id from either UUID or OAuth external ID.” - def resolve_user_id_sync(self, external_id: str, provider: str = ‘google’) -> str | None
Synchronous resolution is not supported for database-backed store.
Section titled “Synchronous resolution is not supported for database-backed store.”
storage/session_store.py
Section titled “storage/session_store.py”Session store protocols for abstracting session storage.
Section titled “Session store protocols for abstracting session storage.”-
@runtime_checkable
-
class SessionStore:
Protocol for session storage backends.
Section titled “Protocol for session storage backends.” -
@runtime_checkable
-
class MFAStore:
Protocol for MFA storage backends.
Section titled “Protocol for MFA storage backends.”
storage/token_store.py
Section titled “storage/token_store.py”User and token storage interfaces and implementations
Section titled “User and token storage interfaces and implementations”-
@runtime_checkable
-
class CachedUserStore:
Protocol for read-through cache user stores (point-lookup only).
Section titled “Protocol for read-through cache user stores (point-lookup only).” -
@runtime_checkable
-
class UserStoreProtocol:
Protocol for user storage implementations.
Section titled “Protocol for user storage implementations.” -
class InMemoryUserStore:
Simple in-memory user store for development/testing
Section titled “Simple in-memory user store for development/testing”init(self) -> None
- def init(self) -> None
- async def create_user(self, name: str, email: str, hashed_password: str | None, roles: list[str] | None = None, permissions: list[str] | None = None, profile: dict[str, Any] | None = None, **kwargs: Any) -> User
Create a new user
Section titled “Create a new user” - async def get_user_by_id(self, user_id: str) -> User | None
Get user by ID
Section titled “Get user by ID” - async def get_user_by_email(self, email: str) -> User | None
Get user by email
Section titled “Get user by email” - async def get_user_by_username(self, username: str) -> User | None
Get user by username (name).
Section titled “Get user by username (name).” - async def update_user(self, user: User) -> None
Update user information
Section titled “Update user information” - async def delete_user(self, user_id: str) -> None
Delete a user
Section titled “Delete a user” - async def list_users(self, skip: int = 0, limit: int = 100) -> list[User]
List users with pagination
Section titled “List users with pagination” - async def count_users(self) -> int
Count total users
Section titled “Count total users” - async def get_credentials(self, user_id: str) -> UserCredentials | None
Return stored credentials for user_id.
Section titled “Return stored credentials for user_id.” - async def update_credentials(self, creds: UserCredentials) -> None
Persist updated credentials for the user.
Section titled “Persist updated credentials for the user.”
types.py
Section titled “types.py”Type definitions for Lexigram Auth.
Section titled “Type definitions for Lexigram Auth.”-
class AuthStatus:
Authentication status values.
Section titled “Authentication status values.”AUTHENTICATED = ‘authenticated’ UNAUTHENTICATED = ‘unauthenticated’ TOKEN_EXPIRED = ‘token_expired’ TOKEN_INVALID = ‘token_invalid’ USER_INACTIVE = ‘user_inactive’ USER_NOT_VERIFIED = ‘user_not_verified’
-
class TokenType:
Token type values.
Section titled “Token type values.”BEARER = ‘Bearer’ BASIC = ‘Basic’ API_KEY = ‘ApiKey’
-
class UserStatus:
User account status values.
Section titled “User account status values.”ACTIVE = ‘active’ INACTIVE = ‘inactive’ SUSPENDED = ‘suspended’ PENDING_VERIFICATION = ‘pending_verification’ DELETED = ‘deleted’
-
@dataclass(init=False)
-
class GuardContext:
Context passed to authorization guards.
Section titled “Context passed to authorization guards.”user: User | None = None request: Any | None = None request_context_user_id: str | None = None route: str | None = None method: str | None = None path: str | None = None headers: dict[str, str] = Field(default_factory=…) params: dict[str, Any] = Field(default_factory=…)
-
@dataclass(init=False)
-
class AuthResult:
Result of an authentication attempt.
Section titled “Result of an authentication attempt.”success: bool = Field(…) status: AuthStatus = Field(…) user: User | None = None token: AuthToken | None = None message: str | None = None
-
@dataclass(init=False)
-
class OAuth2UserInfo:
User information returned by an OAuth2 provider.
Section titled “User information returned by an OAuth2 provider.”provider: str = Field(…) provider_user_id: str = Field(…) email: str | None = None username: str | None = None name: str | None = None avatar_url: str | None = None raw_data: dict[str, Any] | None = None
-
@dataclass(init=False)
-
class RoleDefinition:
Definition of a role with its permissions.
Section titled “Definition of a role with its permissions.”name: str = Field(…) description: str = ” permissions: list[str] = Field(default_factory=…) inherits: list[str] = Field(default_factory=…)
-
@dataclass(init=False)
-
class AuthHealthResult:
Health check result for auth components.
Section titled “Health check result for auth components.”status: HealthStatus = Field(…) message: str = Field(…) users_count: int = 0 components: dict[str, dict[str, Any]] = Field(default_factory=…)
-
@dataclass(frozen=True)
-
class TokenPair:
A minimal access + refresh token pair.
Section titled “A minimal access + refresh token pair.”
web/guards.py
Section titled “web/guards.py”GuardProtocol services for authorization in Lexigram Framework
Section titled “GuardProtocol services for authorization in Lexigram Framework”- class _GuardBase:
Private abstract base for auth guards; provides default handle_rejection.
Section titled “Private abstract base for auth guards; provides default handle_rejection.”- @abstractmethod
- async def can_activate(self, context: GuardContext) -> bool
Check if the guard allows the request to proceed
Section titled “Check if the guard allows the request to proceed” - async def handle_rejection(self, context: GuardContext) -> ResponseProtocol
Handle guard rejection by returning appropriate response.
Section titled “Handle guard rejection by returning appropriate response.”
-
class AuthGuard:
GuardProtocol that requires authentication
Section titled “GuardProtocol that requires authentication”- async def can_activate(self, context: GuardContext) -> bool
Check if user is authenticated
Section titled “Check if user is authenticated” - async def handle_rejection(self, context: GuardContext) -> ResponseProtocol
Return 401 for unauthenticated requests.
Section titled “Return 401 for unauthenticated requests.”
- async def can_activate(self, context: GuardContext) -> bool
-
class RoleGuard:
GuardProtocol that requires specific roles
Section titled “GuardProtocol that requires specific roles”init(self, *roles: str) -> None
- def init(self, *roles: str) -> None
- async def can_activate(self, context: GuardContext) -> bool
Check if user has required roles
Section titled “Check if user has required roles”
-
class PermissionGuard:
GuardProtocol that requires specific permissions
Section titled “GuardProtocol that requires specific permissions”init(self, *permissions: str) -> None
- def init(self, *permissions: str) -> None
- async def can_activate(self, context: GuardContext) -> bool
Check if user has required permissions
Section titled “Check if user has required permissions”
-
class CompositeGuard:
GuardProtocol that combines multiple guards with AND logic
Section titled “GuardProtocol that combines multiple guards with AND logic”init(self, *guards: GuardProtocol) -> None
- def init(self, *guards: GuardProtocol) -> None
- async def can_activate(self, context: GuardContext) -> bool
Check if all guards pass
Section titled “Check if all guards pass”
-
class AdminGuard:
GuardProtocol that requires admin role
Section titled “GuardProtocol that requires admin role”init(self) -> None
- def init(self) -> None
-
class UserGuard:
GuardProtocol that requires any authenticated user
Section titled “GuardProtocol that requires any authenticated user” -
class GuardFactory:
Factory for creating guards via dependency injection.
Section titled “Factory for creating guards via dependency injection.”- @classmethod
- async def get_guard(cls, guard_type: type[GuardProtocol], resolver: Any | None = None) -> GuardProtocol
Get a guard instance, resolving from DI container if needed.
Section titled “Get a guard instance, resolving from DI container if needed.”
-
def _get_request_resolver(request: Any) -> Any | None
-
async def _get_request_context_user_id(request: Any) -> str | None
-
def use_guards(*guards: GuardProtocol) -> Callable[, Callable[…, Any]]
Apply guards to a route handler (auth-scoped internal implementation).
Section titled “Apply guards to a route handler (auth-scoped internal implementation).” -
def require_auth() -> Callable[, Callable[…, Any]]
Decorator requiring authentication.
Section titled “Decorator requiring authentication.” -
def require_admin() -> Callable[, Callable[…, Any]]
Decorator requiring admin role.
Section titled “Decorator requiring admin role.” -
def require_role(*roles: str) -> Callable[, Callable[…, Any]]
Decorator requiring specific roles
Section titled “Decorator requiring specific roles” -
def require_permission(*permissions: str) -> Callable[, Callable[…, Any]]
Decorator requiring specific permissions
Section titled “Decorator requiring specific permissions”
web/middleware/auth.py
Section titled “web/middleware/auth.py”Authentication middleware for web applications
Section titled “Authentication middleware for web applications”-
class AuthMiddleware:
Middleware for handling authentication and authorization - Pure ASGI implementation.
Section titled “Middleware for handling authentication and authorization - Pure ASGI implementation.”init(self, auth_provider: AuthProviderProtocol, config: AuthMiddlewareConfig | None = None, ctx: Context | None = None, attempt_tracker: LoginAttemptTracker | None = None) -> None
- def init(self, auth_provider: AuthProviderProtocol, config: AuthMiddlewareConfig | None = None, ctx: Context | None = None, attempt_tracker: LoginAttemptTracker | None = None) ->
- def should_skip_auth(self, path: str) -> bool
Check if authentication should be skipped for this path
Section titled “Check if authentication should be skipped for this path” - def extract_token(self, request: Request) -> str | None
Extract authentication token from request
Section titled “Extract authentication token from request” - async def authenticate_request(self, request: Any) -> User | None
Authenticate the request and return user if valid
Section titled “Authenticate the request and return user if valid” - def check_authorization(self, user: User) -> bool
Check if user is authorized based on roles/permissions
Section titled “Check if user is authorized based on roles/permissions” - async def call(self, scope: dict[str, Any], receive: Callable, send: Callable) -> None
Pure ASGI middleware entry point - OPT-AUTH-2.
Section titled “Pure ASGI middleware entry point - OPT-AUTH-2.” - def set_app(self, app: Callable) -> None
Set the ASGI app to wrap.
Section titled “Set the ASGI app to wrap.”
-
class AuthRouter:
Router extension with authentication helpers
Section titled “Router extension with authentication helpers”init(self, auth_provider: AuthProviderProtocol) -> None
- def init(self, auth_provider: AuthProviderProtocol) ->
- def require_auth(self, roles: list[str] | None = None, permissions: list[str] | None = None, optional: bool = False) -> Callable[, Callable[…, Any]]
Decorator to require authentication and authorization for routes
Section titled “Decorator to require authentication and authorization for routes” - def get_current_user(self, request: Request) -> User | None
Get current authenticated user from request
Section titled “Get current authenticated user from request”
-
def _extract_request(*args: Any, **kwargs: Any) -> Any
Extract the Starlette-like request object from positional or keyword args.
Section titled “Extract the Starlette-like request object from positional or keyword args.” -
async def _get_auth_provider(context: Any | None = None) -> AuthProviderProtocol
Resolve
Section titled “Resolve AuthProvider from dynamic context or global container.”AuthProviderfrom dynamic context or global container. -
async def _get_response_factory(context: Any | None = None) -> Any
Resolve
Section titled “Resolve ResponseFactoryProtocol from global container.”ResponseFactoryProtocolfrom global container.
- def require_mfa(max_age_seconds: int = 300) -> Callable[, Callable[…, Any]]
Decorator to require MFA verification (step-up).
Section titled “Decorator to require MFA verification (step-up).”
web/middleware/jwt_authenticator.py
Section titled “web/middleware/jwt_authenticator.py”JWT token authentication component.
Section titled “JWT token authentication component.”- class JwtAuthenticator:
init(self, auth_provider: AuthProviderProtocol, config = None, logger: Logger | None = None) -> None
Handles JWT token validation and user resolution.
Section titled “Handles JWT token validation and user resolution.”- def init(self, auth_provider: AuthProviderProtocol, config = None, logger: Logger | None = None) ->
- async def authenticate(self, request: Any) -> Any | None
Validate JWT token from request and return user if valid.
Section titled “Validate JWT token from request and return user if valid.”
web/middleware/response_handler.py
Section titled “web/middleware/response_handler.py”- class AuthResponseHandler:
Handles authentication-related HTTP responses.
Section titled “Handles authentication-related HTTP responses.”- @staticmethod
- async def unauthorized_response(message: str = ‘Authentication required’, request: Any = None, response_factory: Any | None = None) -> Any
Return 401 Unauthorized response or redirect to login.
Section titled “Return 401 Unauthorized response or redirect to login.” - @staticmethod
- async def forbidden_response(message: str = ‘Insufficient permissions’, request: Any | None = None, response_factory: Any | None = None) -> Any
Return 403 Forbidden response.
Section titled “Return 403 Forbidden response.”
- async def _get_response_factory(context: Any | None = None) -> Any
Resolve
Section titled “Resolve ResponseFactoryProtocol from context or global container.”ResponseFactoryProtocolfrom context or global container.
web/middleware/session_authenticator.py
Section titled “web/middleware/session_authenticator.py”Session-based authentication component.
Section titled “Session-based authentication component.”- class SessionAuthenticator:
init(self, auth_provider: AuthProviderProtocol) -> None
Handles session validation and user resolution.
Section titled “Handles session validation and user resolution.”- def init(self, auth_provider: AuthProviderProtocol) ->
- async def authenticate(self, request: Any) -> Any
Validate session from cookies and return user if valid.
Section titled “Validate session from cookies and return user if valid.”
web/middleware/session_validator.py
Section titled “web/middleware/session_validator.py”Session validation utilities for authentication middleware.
Section titled “Session validation utilities for authentication middleware.”- class SessionValidator:
init(self, config: Any, auth_provider: Any) -> None
Handles session validation and authorization checks.
Section titled “Handles session validation and authorization checks.”- def init(self, config: Any, auth_provider: Any) -> None
Initialize with configuration and auth provider.
Section titled “Initialize with configuration and auth provider.” - def check_authorization(self, user: User) -> bool
Check if user is authorized based on roles/permissions.
Section titled “Check if user is authorized based on roles/permissions.” - def should_skip_auth(self, path: str) -> bool
Check if authentication should be skipped for this path.
Section titled “Check if authentication should be skipped for this path.”
- def init(self, config: Any, auth_provider: Any) -> None
web/middleware/throttle.py
Section titled “web/middleware/throttle.py”Rate-limiting middleware for authentication endpoints.
Section titled “Rate-limiting middleware for authentication endpoints.”-
class RateLimitExceededError:
Raised when a client has exceeded the configured rate limit.
Section titled “Raised when a client has exceeded the configured rate limit.”init(self, retry_after: int) -> None
- def init(self, retry_after: int) -> None
-
class RateLimitMiddleware:
ASGI middleware that rate-limits authentication endpoints.
Section titled “ASGI middleware that rate-limits authentication endpoints.”init(self, app: Any, rate_limit: str = ‘5/minute’, block_duration: int = 60, paths: frozenset[str] | None = None, cache_service: Any | None = None) -> None
- def init(self, app: Any, rate_limit: str = ‘5/minute’, block_duration: int = 60, paths: frozenset[str] | None = None, cache_service: Any | None = None) -> None
- async def call(self, scope: dict[str, Any], receive: Any, send: Any) -> None
Process an ASGI request.
Section titled “Process an ASGI request.”
- @staticmethod
- async def _send_429(send: Any, retry_after: int) -> None
Send a 429 Too Many Requests ASGI response.
Section titled “Send a 429 Too Many Requests ASGI response.”
- def _parse_rate_limit(rate_limit: str) -> tuple[int, int]
Parse a rate-limit string such as
Section titled “Parse a rate-limit string such as "5/minute" into (max, seconds).”"5/minute"into(max, seconds).
- def throttle(rate_limit: str = ‘5/minute’, block_duration: int = 60, paths: frozenset[str] | None = None) -> Callable[, RateLimitMiddleware]
Decorator / factory that wraps an ASGI app with :class:
Section titled “Decorator / factory that wraps an ASGI app with :class:RateLimitMiddleware.”RateLimitMiddleware.
web/middleware/token_cache.py
Section titled “web/middleware/token_cache.py”Token caching utilities for authentication middleware.
Section titled “Token caching utilities for authentication middleware.”- class TokenCache:
init(self, max_size: int = 10000, ttl_seconds: float = 300.0, cache_backend: CacheBackendProtocol | None = None) -> None
Cache for JWT tokens with LRU eviction and TTL support.
Section titled “Cache for JWT tokens with LRU eviction and TTL support.”- def init(self, max_size: int = 10000, ttl_seconds: float = 300.0, cache_backend: CacheBackendProtocol | None = None) ->
Initialize token cache.
Section titled “Initialize token cache.”
- def _cleanup_expired(self) -> None
Remove expired entries from cache.
Section titled “Remove expired entries from cache.” - def _evict_if_needed(self) -> None
Evict oldest entry if cache is full.
Section titled “Evict oldest entry if cache is full.”
- async def get(self, token: str) -> Any | None
Get cached user for token, or None if not cached/expired.
Section titled “Get cached user for token, or None if not cached/expired.” - async def set(self, token: str, user: Any) -> None
Cache user for token.
Section titled “Cache user for token.” - async def invalidate(self, token: str) -> None
Invalidate a specific token from cache.
Section titled “Invalidate a specific token from cache.” - def clear(self) -> None
Clear all cached tokens.
Section titled “Clear all cached tokens.” - def stats(self) -> dict[str, Any]
Get cache statistics.
Section titled “Get cache statistics.”
- def init(self, max_size: int = 10000, ttl_seconds: float = 300.0, cache_backend: CacheBackendProtocol | None = None) ->
web/middleware/token_extractor.py
Section titled “web/middleware/token_extractor.py”Token extraction utilities for authentication middleware.
Section titled “Token extraction utilities for authentication middleware.”- class TokenExtractor:
init(self, config: Any) -> None
Handles extraction of authentication tokens from HTTP requests.
Section titled “Handles extraction of authentication tokens from HTTP requests.”- def init(self, config: Any) -> None
Initialize with middleware configuration.
Section titled “Initialize with middleware configuration.” - def extract_token(self, request: Request) -> str | None
Extract authentication token from request.
Section titled “Extract authentication token from request.”
- def init(self, config: Any) -> None