-
Notifications
You must be signed in to change notification settings - Fork 2
feat: Add MFA (Multi-Factor Authentication) API support #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
0a2d6b3
feat(mfa): Implement MfaClient for Multi-Factor Authentication operat…
subhankarmaiti eda205d
Revert "feat(mfa): Implement MfaClient for Multi-Factor Authenticatio…
subhankarmaiti 6027a25
Merge branch 'main' of https://github.com/auth0/auth0-server-python i…
subhankarmaiti f99df07
feat(mfa): Implement Multi-Factor Authentication (MFA) client and int…
subhankarmaiti e9bec1c
fix: reorder import
subhankarmaiti 3fcdbf9
fix: reorder imports
subhankarmaiti c8cf2d7
added test cases to cover enroll, challenge and verify steps
subhankarmaiti 1b3d60a
refactor: remove delete_authenticator method and its tests
subhankarmaiti 69b96f8
feat(mfa): Add comprehensive MFA documentation and usage examples
subhankarmaiti f2746e1
feat(mfa): Implement session persistence for MFA tokens in the client
subhankarmaiti 038c934
feat(mfa): Update MFA API to use 'factor_type' instead of 'authentica…
subhankarmaiti 31a75d9
refactor: Remove unused MfaDeleteAuthenticatorError import from mfa_c…
subhankarmaiti d69d6c0
feat(mfa): Update MFA API to support 'auth0' factor type and refactor…
subhankarmaiti 6327758
feat(mfa): Enhance MFA documentation and add support for Push Notific…
subhankarmaiti 49bda6d
Merge branch 'main' into feat/mfa-api
kishore7snehil 1ea1169
fix(mfa): Add MCD support and consistent store_options to MfaClient
kishore7snehil c600dc1
test: Add tests for verify method with persist option in MfaClient
kishore7snehil 2654fef
fix(test): Correct push enrollment test factor_type and remove stale …
kishore7snehil 16ef484
refactor: Move inline imports to top-level and add PLC0415 ruff rule
kishore7snehil 1d589e3
docs(mfa): Document token encryption design and chained MFA contract
kishore7snehil b94f46d
fix(mfa): Ensure existing state is always dumped before persisting in…
kishore7snehil a61e49d
docs(mfa): Add decrypt step to Complete Login and Error Handling exam…
kishore7snehil 1939d38
docs(mfa): Fix misleading docstrings about encrypted vs raw mfa_token
kishore7snehil File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| from .mfa_client import MfaClient | ||
| from .my_account_client import MyAccountClient | ||
| from .server_client import ServerClient | ||
|
|
||
| __all__ = ["ServerClient", "MyAccountClient"] | ||
| __all__ = ["ServerClient", "MyAccountClient", "MfaClient"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,358 @@ | ||
| """ | ||
| MFA Client for auth0-server-python SDK. | ||
| Handles Multi-Factor Authentication operations against the Auth0 MFA API. | ||
| """ | ||
|
|
||
| import time | ||
| from typing import Any, Optional | ||
|
|
||
| import httpx | ||
| from auth0_server_python.auth_schemes.bearer_auth import BearerAuth | ||
| from auth0_server_python.auth_types import ( | ||
| AuthenticatorResponse, | ||
| ChallengeResponse, | ||
| EnrollmentResponse, | ||
| MfaRequirements, | ||
| MfaTokenContext, | ||
| MfaVerifyResponse, | ||
| OobEnrollmentResponse, | ||
| OtpEnrollmentResponse, | ||
| ) | ||
| from auth0_server_python.encryption.encrypt import decrypt, encrypt | ||
| from auth0_server_python.error import ( | ||
| MfaChallengeError, | ||
| MfaDeleteAuthenticatorError, | ||
| MfaEnrollmentError, | ||
| MfaListAuthenticatorsError, | ||
| MfaRequiredError, | ||
| MfaTokenExpiredError, | ||
| MfaTokenInvalidError, | ||
| MfaVerifyError, | ||
| ) | ||
|
|
||
| DEFAULT_MFA_TOKEN_TTL = 300 # 5 minutes | ||
|
|
||
|
|
||
| class MfaClient: | ||
| """ | ||
| Client for Auth0 MFA API operations. | ||
|
|
||
| Provides methods for listing authenticators, enrolling new authenticators, | ||
| deleting authenticators, challenging authenticators, and verifying MFA codes. | ||
|
|
||
| All operations require an mfa_token which is obtained either: | ||
| 1. From MfaRequiredError raised during get_access_token() (encrypted) | ||
| 2. Directly from the Auth0 MFA challenge response (raw) | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| domain: str, | ||
| client_id: str, | ||
| client_secret: str, | ||
| secret: str | ||
| ): | ||
| self._domain = domain | ||
| self._base_url = f"https://{domain}" | ||
| self._client_id = client_id | ||
| self._client_secret = client_secret | ||
| self._secret = secret | ||
|
|
||
| # ============================================================================ | ||
| # MFA TOKEN ENCRYPTION / DECRYPTION | ||
| # ============================================================================ | ||
|
|
||
| def encrypt_mfa_token( | ||
| self, | ||
| raw_mfa_token: str, | ||
| audience: str, | ||
| scope: str, | ||
| mfa_requirements: Optional[MfaRequirements] = None, | ||
| ttl: int = DEFAULT_MFA_TOKEN_TTL | ||
| ) -> str: | ||
| """Encrypt an MFA token with context for secure client-side storage.""" | ||
| context = MfaTokenContext( | ||
| mfa_token=raw_mfa_token, | ||
| audience=audience, | ||
| scope=scope, | ||
| mfa_requirements=mfa_requirements, | ||
| created_at=int(time.time()) | ||
| ) | ||
| return encrypt(context.model_dump(), self._secret, "mfa_token") | ||
|
|
||
| def decrypt_mfa_token(self, encrypted_token: str) -> MfaTokenContext: | ||
| """Decrypt an MFA token and validate TTL.""" | ||
| try: | ||
| payload = decrypt(encrypted_token, self._secret, "mfa_token") | ||
| context = MfaTokenContext(**payload) | ||
| except Exception: | ||
| raise MfaTokenInvalidError() | ||
|
|
||
| # Check TTL | ||
| elapsed = int(time.time()) - context.created_at | ||
| if elapsed > DEFAULT_MFA_TOKEN_TTL: | ||
| raise MfaTokenExpiredError() | ||
|
|
||
| return context | ||
|
|
||
| # ============================================================================ | ||
| # MFA API OPERATIONS | ||
| # ============================================================================ | ||
|
|
||
| async def list_authenticators( | ||
| self, | ||
| options: dict[str, Any] | ||
| ) -> list[AuthenticatorResponse]: | ||
| """ | ||
| Lists all MFA authenticators enrolled by the user. | ||
|
|
||
| Args: | ||
| options: Dict containing 'mfa_token' (encrypted or raw). | ||
|
|
||
| Returns: | ||
| List of enrolled authenticators. | ||
|
|
||
| Raises: | ||
| MfaListAuthenticatorsError: When the request fails. | ||
| """ | ||
| mfa_token = options["mfa_token"] | ||
| url = f"{self._base_url}/mfa/authenticators" | ||
|
|
||
| try: | ||
| async with httpx.AsyncClient() as client: | ||
| response = await client.get( | ||
| url, | ||
| auth=BearerAuth(mfa_token) | ||
| ) | ||
|
|
||
| if response.status_code != 200: | ||
| error_data = response.json() | ||
| raise MfaListAuthenticatorsError( | ||
| error_data.get("error_description", "Failed to list authenticators"), | ||
| error_data | ||
| ) | ||
|
|
||
| api_response = response.json() | ||
| return [AuthenticatorResponse(**auth) for auth in api_response] | ||
|
|
||
| except MfaListAuthenticatorsError: | ||
| raise | ||
| except Exception as e: | ||
| raise MfaListAuthenticatorsError( | ||
| f"Unexpected error listing authenticators: {str(e)}" | ||
| ) | ||
|
|
||
| async def enroll_authenticator( | ||
| self, | ||
| options: dict[str, Any] | ||
| ) -> EnrollmentResponse: | ||
| """ | ||
| Enrolls a new MFA authenticator for the user. | ||
|
|
||
| Args: | ||
| options: Dict containing enrollment parameters. | ||
| Required: 'mfa_token', 'authenticator_types'. | ||
| Optional: 'oob_channels', 'phone_number', 'email'. | ||
|
|
||
| Returns: | ||
| OtpEnrollmentResponse or OobEnrollmentResponse. | ||
|
|
||
| Raises: | ||
| MfaEnrollmentError: When enrollment fails. | ||
| """ | ||
| mfa_token = options["mfa_token"] | ||
| url = f"{self._base_url}/mfa/associate" | ||
|
|
||
| # Build API request body | ||
| body: dict[str, Any] = { | ||
| "authenticator_types": options["authenticator_types"] | ||
| } | ||
|
|
||
| if "oob_channels" in options: | ||
| body["oob_channels"] = options["oob_channels"] | ||
|
|
||
| if "phone_number" in options and options["phone_number"]: | ||
| body["phone_number"] = options["phone_number"] | ||
|
|
||
| if "email" in options and options["email"]: | ||
| body["email"] = options["email"] | ||
|
|
||
| try: | ||
| async with httpx.AsyncClient() as client: | ||
| response = await client.post( | ||
| url, | ||
| json=body, | ||
| auth=BearerAuth(mfa_token), | ||
| headers={"Content-Type": "application/json"} | ||
| ) | ||
|
|
||
| if response.status_code != 200: | ||
| error_data = response.json() | ||
| raise MfaEnrollmentError( | ||
| error_data.get("error_description", "Failed to enroll authenticator"), | ||
| error_data | ||
| ) | ||
|
|
||
| api_response = response.json() | ||
| authenticator_type = api_response.get("authenticator_type") | ||
|
|
||
| if authenticator_type == "otp": | ||
| return OtpEnrollmentResponse(**api_response) | ||
| elif authenticator_type == "oob": | ||
| return OobEnrollmentResponse(**api_response) | ||
| else: | ||
| raise MfaEnrollmentError( | ||
| f"Unexpected authenticator type: {authenticator_type}" | ||
| ) | ||
|
|
||
| except MfaEnrollmentError: | ||
| raise | ||
| except Exception as e: | ||
| raise MfaEnrollmentError( | ||
| f"Unexpected error enrolling authenticator: {str(e)}" | ||
| ) | ||
|
|
||
| async def challenge_authenticator( | ||
| self, | ||
| options: dict[str, Any] | ||
| ) -> ChallengeResponse: | ||
| """ | ||
| Initiates an MFA challenge for user verification. | ||
|
|
||
| Args: | ||
| options: Dict containing 'mfa_token', 'challenge_type', | ||
| and optionally 'authenticator_id'. | ||
|
|
||
| Returns: | ||
| ChallengeResponse with challenge details. | ||
|
|
||
| Raises: | ||
| MfaChallengeError: When the challenge fails. | ||
| """ | ||
| mfa_token = options["mfa_token"] | ||
| url = f"{self._base_url}/mfa/challenge" | ||
|
|
||
| body: dict[str, Any] = { | ||
| "mfa_token": mfa_token, | ||
| "client_id": self._client_id, | ||
| "challenge_type": options["challenge_type"] | ||
| } | ||
|
|
||
| if "authenticator_id" in options and options["authenticator_id"]: | ||
| body["authenticator_id"] = options["authenticator_id"] | ||
|
|
||
| try: | ||
| async with httpx.AsyncClient() as client: | ||
| response = await client.post( | ||
| url, | ||
| json=body, | ||
| headers={"Content-Type": "application/json"} | ||
| ) | ||
|
|
||
| if response.status_code != 200: | ||
| error_data = response.json() | ||
| raise MfaChallengeError( | ||
| error_data.get("error_description", "Failed to challenge authenticator"), | ||
| error_data | ||
| ) | ||
|
|
||
| api_response = response.json() | ||
| return ChallengeResponse(**api_response) | ||
|
|
||
| except MfaChallengeError: | ||
| raise | ||
| except Exception as e: | ||
| raise MfaChallengeError( | ||
| f"Unexpected error challenging authenticator: {str(e)}" | ||
| ) | ||
|
|
||
| async def verify( | ||
| self, | ||
| options: dict[str, Any] | ||
| ) -> MfaVerifyResponse: | ||
| """ | ||
| Verifies an MFA code and completes authentication. | ||
|
|
||
| Supports OTP, OOB (with binding code), and recovery code verification. | ||
|
|
||
| If Auth0 returns 'mfa_required' again (chained MFA), raises MfaRequiredError | ||
| with a new encrypted mfa_token. | ||
|
|
||
| Args: | ||
| options: Dict containing 'mfa_token' and one of: | ||
| - 'otp': OTP code | ||
| - 'oob_code' + 'binding_code': OOB verification | ||
| - 'recovery_code': Recovery code | ||
|
|
||
| Returns: | ||
| MfaVerifyResponse with access_token, token_type, etc. | ||
|
|
||
| Raises: | ||
| MfaVerifyError: When verification fails. | ||
| MfaRequiredError: When chained MFA is required. | ||
| """ | ||
| mfa_token = options["mfa_token"] | ||
|
|
||
| # Determine grant type and build body | ||
| body: dict[str, Any] = { | ||
| "client_id": self._client_id, | ||
| "client_secret": self._client_secret, | ||
| "mfa_token": mfa_token | ||
| } | ||
|
|
||
| if "otp" in options: | ||
| body["grant_type"] = "http://auth0.com/oauth/grant-type/mfa-otp" | ||
| body["otp"] = options["otp"] | ||
| elif "oob_code" in options: | ||
| body["grant_type"] = "http://auth0.com/oauth/grant-type/mfa-oob" | ||
| body["oob_code"] = options["oob_code"] | ||
| body["binding_code"] = options.get("binding_code", "") | ||
| elif "recovery_code" in options: | ||
| body["grant_type"] = "http://auth0.com/oauth/grant-type/mfa-recovery-code" | ||
| body["recovery_code"] = options["recovery_code"] | ||
| else: | ||
| raise MfaVerifyError( | ||
| "No verification credential provided (otp, oob_code, or recovery_code)" | ||
| ) | ||
|
|
||
| try: | ||
| token_endpoint = f"{self._base_url}/oauth/token" | ||
|
|
||
| async with httpx.AsyncClient() as client: | ||
| response = await client.post( | ||
| token_endpoint, | ||
| data=body, | ||
| headers={"Content-Type": "application/x-www-form-urlencoded"} | ||
| ) | ||
|
|
||
| if response.status_code != 200: | ||
| error_data = response.json() | ||
|
|
||
| # Handle chained MFA | ||
| if error_data.get("error") == "mfa_required": | ||
| new_mfa_token = error_data.get("mfa_token") | ||
| mfa_requirements_data = error_data.get("mfa_requirements") | ||
| mfa_requirements = None | ||
| if mfa_requirements_data: | ||
| mfa_requirements = MfaRequirements(**mfa_requirements_data) | ||
|
|
||
| raise MfaRequiredError( | ||
| error_data.get("error_description", "Additional MFA factor required"), | ||
| mfa_token=new_mfa_token, | ||
| mfa_requirements=mfa_requirements | ||
| ) | ||
|
|
||
| raise MfaVerifyError( | ||
| error_data.get("error_description", "MFA verification failed"), | ||
| error_data | ||
| ) | ||
|
|
||
| token_response = response.json() | ||
| return MfaVerifyResponse(**token_response) | ||
|
|
||
| except (MfaVerifyError, MfaRequiredError): | ||
| raise | ||
| except Exception as e: | ||
| raise MfaVerifyError( | ||
| f"Unexpected error during MFA verification: {str(e)}" | ||
| ) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.