diff --git a/examples/MFA.md b/examples/MFA.md new file mode 100644 index 0000000..d60cd56 --- /dev/null +++ b/examples/MFA.md @@ -0,0 +1,630 @@ +# Multi-Factor Authentication (MFA) + +The Auth0 MFA API allows you to manage multi-factor authentication for users in server-side applications. This guide covers how to handle MFA requirements, enroll authenticators, and verify MFA challenges. + +> [!NOTE] +> Multi-Factor Authentication support for server SDKs is in Early Access. For detailed information, refer to the [Auth0 MFA documentation](https://auth0.com/docs/secure/multi-factor-authentication). + +## Table of Contents + +- [Multi-Factor Authentication (MFA)](#multi-factor-authentication-mfa) + - [Table of Contents](#table-of-contents) + - [Setup](#setup) + - [Understanding MFA Responses](#understanding-mfa-responses) + - [Challenge Flow Response](#challenge-flow-response) + - [Enroll Flow Response](#enroll-flow-response) + - [Handling MFA Required Errors](#handling-mfa-required-errors) + - [Getting Authenticators](#getting-authenticators) + - [Response Structure](#response-structure) + - [Getting Enrollment Factors](#getting-enrollment-factors) + - [Enrollment](#enrollment) + - [Enrolling OTP (Authenticator App)](#enrolling-otp-authenticator-app) + - [Enrolling SMS](#enrolling-sms) + - [Enrolling Voice](#enrolling-voice) + - [Enrolling Email](#enrolling-email) + - [Challenge](#challenge) + - [Challenge with SMS](#challenge-with-sms) + - [Challenge with Email](#challenge-with-email) + - [Challenge with OTP](#challenge-with-otp) + - [Verify](#verify) + - [Verify with OOB (SMS or Email)](#verify-with-oob-sms-or-email) + - [Verify with OTP](#verify-with-otp) + - [Verify with Recovery Code](#verify-with-recovery-code) + - [Session Persistence](#session-persistence) + - [Automatic Session Update](#automatic-session-update) + - [Manual Session Update](#manual-session-update) + - [Complete MFA Flow Examples](#complete-mfa-flow-examples) + - [Enrollment Flow](#enrollment-flow) + - [Challenge Flow](#challenge-flow) + - [Complete Login with MFA](#complete-login-with-mfa) + - [Error Handling](#error-handling) + - [Common Error Scenarios](#common-error-scenarios) + - [Additional Resources](#additional-resources) + +## Setup + +Before using the MFA API, ensure MFA is configured in your [Auth0 Dashboard](https://manage.auth0.com) under **Security** > **Multi-factor Auth**. For detailed configuration, see the [Auth0 MFA documentation](https://auth0.com/docs/secure/multi-factor-authentication/customize-mfa/customize-mfa-enrollments-universal-login). + +## Understanding MFA Responses + +When MFA is required during authentication, the error response contains an `mfa_requirements` object that indicates either: + +1. **Challenge Flow** (user has enrolled authenticators) +2. **Enroll Flow** (user needs to set up MFA) + +### Challenge Flow Response + +```json +{ + "error": "mfa_required", + "error_description": "Multifactor authentication required", + "mfa_token": "Fe26.2*...", + "mfa_requirements": { + "challenge": [ + { "type": "otp" }, + { "type": "sms" }, + { "type": "email" } + ] + } +} +``` + +**Interpretation**: User has enrolled authenticators → proceed with **Get Authenticators → Challenge → Verify** flow + +### Enroll Flow Response + +```json +{ + "error": "mfa_required", + "error_description": "Multifactor authentication required", + "mfa_token": "Fe26.2*...", + "mfa_requirements": { + "enroll": [ + { "type": "otp" }, + { "type": "sms" }, + { "type": "email" } + ] + } +} +``` + +**Interpretation**: User needs to set up MFA → proceed with **Get Enrollment Factors → Enroll → Verify** flow + +## Handling MFA Required Errors + +When `get_access_token()` encounters an MFA requirement, it raises an `MfaRequiredError` with the `mfa_token` context: + +```python +from auth0_server_python.error import MfaRequiredError +from auth0_server_python.auth_server import ServerClient + +server_client = ServerClient( + domain="", + client_id="", + client_secret="" +) + +try: + access_token = await server_client.get_access_token() +except MfaRequiredError as error: + mfa_token = error.mfa_token + print(f"MFA Required: {error.error_description}") + + # The MFA context is automatically stored in the client + # You can now use the MFA methods +``` + +## Getting Authenticators + +Retrieve the list of authenticators the user has already enrolled: + +```python +try: + authenticators = await server_client.mfa.list_authenticators({ + "mfa_token": mfa_token + }) + + for auth in authenticators: + print(f"Authenticator: {auth.id}") + print(f" Type: {auth.authenticator_type}") + print(f" Created: {auth.created_at}") + +except Exception as error: + print(f"Error retrieving authenticators: {error}") +``` + +### Response Structure + +Each authenticator is an `AuthenticatorResponse` object with: +- `id`: Authenticator identifier (e.g., `otp|dev_xxx`) +- `authenticator_type`: Type of authenticator (`otp`, `sms`, `voice`, `email`) +- `created_at`: Creation timestamp +- `active`: Boolean indicating if authenticator is active +- `oob_channels`: For OOB authenticators, the channels used (`sms`, `voice`, `email`) + +## Getting Enrollment Factors + +Check what MFA factors are available for enrollment: + +```python +try: + factors = await server_client.mfa.list_authenticators({ + "mfa_token": mfa_token + }) + + if len(factors) > 0: + print("Available enrollment factors:") + for factor in factors: + print(f" - {factor.authenticator_type}") + else: + print("User already has all available authenticators enrolled") + +except Exception as error: + print(f"Error retrieving enrollment factors: {error}") +``` + +## Enrollment + +### Enrolling OTP (Authenticator App) + +Enroll an OTP authenticator (Google Authenticator, Microsoft Authenticator, etc.): + +```python +try: + enrollment = await server_client.mfa.enroll_authenticator({ + "mfa_token": mfa_token, + "factor_type": "otp" + }) + + # Display QR code to user + print(f"QR Code URI: {enrollment.barcode_uri}") # otpauth://totp/... + print(f"Secret Key: {enrollment.secret}") # Base32 secret for manual entry + +except Exception as error: + print(f"Enrollment failed: {error}") +``` + +### Enrolling SMS + +Enroll an SMS authenticator: + +```python +try: + enrollment = await server_client.mfa.enroll_authenticator({ + "mfa_token": mfa_token, + "factor_type": "sms", + "phone_number": "+12025551234" # E.164 format + }) + + # Save oobCode for enrollment verification + print(f"OOB Code: {enrollment.oob_code}") + print("SMS sent to user's phone number") + +except Exception as error: + print(f"SMS enrollment failed: {error}") +``` + +### Enrolling Voice + +Enroll a voice call authenticator: + +```python +try: + enrollment = await server_client.mfa.enroll_authenticator({ + "mfa_token": mfa_token, + "factor_type": "voice", + "phone_number": "+12025551234" # E.164 format + }) + + print(f"OOB Code: {enrollment.oob_code}") + print("Voice call initiated to user's phone number") + +except Exception as error: + print(f"Voice enrollment failed: {error}") +``` + +### Enrolling Email + +Enroll an email authenticator: + +```python +try: + enrollment = await server_client.mfa.enroll_authenticator({ + "mfa_token": mfa_token, + "factor_type": "email", + "email": "user@example.com" + }) + + print(f"OOB Code: {enrollment.oob_code}") + print("Verification email sent to user") + +except Exception as error: + print(f"Email enrollment failed: {error}") +``` + +## Challenge + +After enrolling an authenticator, or when the user has existing authenticators, initiate a challenge: + +### Challenge with SMS + +```python +try: + challenge = await server_client.mfa.challenge_authenticator({ + "mfa_token": mfa_token, + "factor_type": "sms", + "authenticator_id": "sms|dev_xxx" + }) + + print(f"OOB Code: {challenge.oob_code}") + print(f"Challenge Expires In: {challenge.expires_in} seconds") + print("User will receive SMS with verification code") + +except Exception as error: + print(f"Challenge failed: {error}") +``` + +### Challenge with Email + +```python +try: + challenge = await server_client.mfa.challenge_authenticator({ + "mfa_token": mfa_token, + "factor_type": "email", + "authenticator_id": "email|dev_xxx" + }) + + print(f"OOB Code: {challenge.oob_code}") + print("User will receive verification email") + +except Exception as error: + print(f"Challenge failed: {error}") +``` + +### Challenge with OTP + +> [!NOTE] +> For OTP authenticators, you do not need to explicitly call challenge. The code is generated automatically by the user's authenticator app. Simply prompt the user to open their app and provide the 6-digit code. + +```python +try: + challenge = await server_client.mfa.challenge_authenticator({ + "mfa_token": mfa_token, + "factor_type": "otp", + "authenticator_id": "otp|dev_xxx" + }) + + print("User should open their authenticator app and provide the code") + +except Exception as error: + print(f"Challenge failed: {error}") +``` + +## Verify + +Complete MFA verification with the challenge response: + +### Verify with OOB (SMS or Email) + +```python +try: + verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "oob_code": challenge.oob_code, + "binding_code": "123456", # Code user received via SMS/Email + "persist": True, # Persist tokens to session store + "audience": "https://api.example.com", # Required when persist=True + "scope": "openid profile email" # Optional scope + }) + + access_token = verify_response.access_token + id_token = verify_response.id_token + + print(f"MFA verification successful!") + print(f"Access Token: {access_token}") + print(f"ID Token: {id_token}") + print("Tokens have been persisted to session store") + +except Exception as error: + print(f"Verification failed: {error}") +``` + +> [!NOTE] +> Setting `persist=True` automatically updates the session store with the new tokens, similar to nextjs-auth0 and auth0-spa-js SDKs. This eliminates the need for manual token management after MFA verification. + +### Verify with OTP + +```python +try: + verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "otp": "123456", # 6-digit code from authenticator app + "persist": True, # Persist tokens to session store + "audience": "https://api.example.com", # Required when persist=True + "scope": "openid profile email" + }) + + access_token = verify_response.access_token + + print("MFA verification successful!") + print("Tokens have been persisted to session store") + +except Exception as error: + print(f"Invalid OTP code: {error}") +``` + +### Verify with Recovery Code + +Recovery codes can be used to complete MFA verification without initiating a challenge: + +```python +try: + verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "recovery_code": "XXXX-XXXX-XXXX", # One of the recovery codes + "persist": True, # Persist tokens to session store + "audience": "https://api.example.com" # Required when persist=True + }) + + access_token = verify_response.access_token + + print("MFA verification successful using recovery code!") + print("Tokens have been persisted to session store") + +except Exception as error: + print(f"Verification failed: {error}") +``` + +## Session Persistence + +By default, `verify()` returns tokens without persisting them to the session store. However, you can automatically persist tokens by setting `persist=True`, similar to how nextjs-auth0 and auth0-spa-js handle MFA. + +### Automatic Session Update + +When you set `persist=True`, the SDK will: +1. Update the session's `access_token` for the specified audience +2. Update the session's `id_token` if present +3. Add the token to the `token_sets` array with expiration information + +```python +verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "otp": "123456", + "persist": True, # Enable automatic persistence + "audience": "https://api.example.com", # Required when persist=True + "scope": "openid profile email" # Optional +}) + +# Tokens are now available in the session store +# User can call server_client.get_user() to access updated session +user = await server_client.get_user() +``` + +### Manual Session Update + +If you prefer to manage session updates yourself: + +```python +verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "otp": "123456" + # persist=False (default) +}) + +# Handle token storage manually if needed +access_token = verify_response.access_token +id_token = verify_response.id_token + +# Store tokens in your application's session management +await my_session_store.update_tokens(access_token, id_token) +``` + +## Complete MFA Flow Examples + +### Enrollment Flow + +When a user needs to set up MFA for the first time: + +```python +async def handle_mfa_enrollment_flow(server_client, mfa_token): + try: + # Get available enrollment factors + factors = await server_client.mfa.list_authenticators({ + "mfa_token": mfa_token + }) + + print("Available MFA options:") + for factor in factors: + print(f" - {factor.authenticator_type}") + + # User selects OTP + enrollment = await server_client.mfa.enroll_authenticator({ + "mfa_token": mfa_token, + "factor_type": "otp" + }) + + # Display QR code to user + print(f"QR Code: {enrollment.barcode_uri}") + + # Wait for user to scan and enter verification code + user_code = input("Enter 6-digit code from authenticator: ") + + # Verify enrollment + verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "otp": user_code, + "persist": True, + "audience": "https://api.example.com", + "scope": "openid profile email" + }) + + print("MFA enrollment successful!") + print("Tokens have been persisted to session store") + return verify_response.access_token + + except Exception as error: + print(f"Enrollment flow failed: {error}") + raise +``` + +### Challenge Flow + +When a user with existing authenticators needs to verify: + +```python +async def handle_mfa_challenge_flow(server_client, mfa_token): + try: + # Get user's enrolled authenticators + authenticators = await server_client.mfa.list_authenticators({ + "mfa_token": mfa_token + }) + + print("Select an authenticator:") + for i, auth in enumerate(authenticators): + print(f" {i + 1}. {auth.authenticator_type} ({auth.id})") + + # User selects authenticator + selected_index = int(input("Selection: ")) - 1 + selected_auth = authenticators[selected_index] + + # Initiate challenge + challenge = await server_client.mfa.challenge_authenticator({ + "mfa_token": mfa_token, + "factor_type": selected_auth.authenticator_type, + "authenticator_id": selected_auth.id + }) + + # Get verification code from user + if selected_auth.authenticator_type == "otp": + user_code = input("Enter 6-digit code from authenticator: ") + verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "otp": user_code, + "persist": True, + "audience": "https://api.example.com", + "scope": "openid profile email" + }) + else: + user_code = input(f"Enter code from {selected_auth.authenticator_type}: ") + verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "oob_code": challenge.oob_code, + "binding_code": user_code, + "persist": True, + "audience": "https://api.example.com", + "scope": "openid profile email" + }) + + print("MFA verification successful!") + print("Tokens have been persisted to session store") + return verify_response.access_token + + except Exception as error: + print(f"Challenge flow failed: {error}") + raise +``` + +### Complete Login with MFA + +```python +from auth0_server_python.error import MfaRequiredError + +async def login_with_mfa(server_client): + try: + # Attempt to get access token + access_token = await server_client.get_access_token() + return access_token + + except MfaRequiredError as mfa_error: + mfa_token = mfa_error.mfa_token + + # Determine flow: check if user needs to enroll or has authenticators + authenticators = await server_client.mfa.list_authenticators({ + "mfa_token": mfa_token + }) + + if len(authenticators) == 0: + # User needs to enroll + print("MFA enrollment required") + access_token = await handle_mfa_enrollment_flow( + server_client, mfa_token + ) + else: + # User has authenticators, proceed with challenge + print("MFA verification required") + access_token = await handle_mfa_challenge_flow( + server_client, mfa_token + ) + + return access_token + + except Exception as error: + print(f"Login failed: {error}") + raise +``` + +## Error Handling + +Each MFA operation has specific error handling: + +```python +from auth0_server_python.error import MfaRequiredError, ApiError + +async def handle_mfa_with_error_handling(server_client): + try: + # Attempt token exchange + access_token = await server_client.get_access_token() + + except MfaRequiredError as error: + print(f"MFA Required: {error.error_description}") + mfa_token = error.mfa_token + + try: + # Get authenticators + authenticators = await server_client.mfa.list_authenticators({ + "mfa_token": mfa_token + }) + except ApiError as list_error: + print(f"Failed to retrieve authenticators: {list_error}") + raise + + try: + # Initiate challenge + challenge = await server_client.mfa.challenge_authenticator({ + "mfa_token": mfa_token, + "factor_type": "sms", + "authenticator_id": authenticators[0].id + }) + except ApiError as challenge_error: + print(f"Challenge failed: {challenge_error}") + raise + + try: + # Verify challenge + verify_response = await server_client.mfa.verify({ + "mfa_token": mfa_token, + "oob_code": challenge.oob_code, + "binding_code": "123456" + }) + except ApiError as verify_error: + if verify_error.status_code == 403: + print("Invalid code or challenge expired") + else: + print(f"Verification error: {verify_error}") + raise + + except Exception as error: + print(f"Unexpected error: {error}") + raise +``` + +### Common Error Scenarios + +- **Invalid OTP Code**: HTTP 403 with error details +- **Expired Challenge**: HTTP 403 with expired_token error +- **MFA Token Expired**: HTTP 400 with context_not_found error +- **Network Issues**: Connection errors with descriptive messages + +## Additional Resources + +- [Auth0 MFA Documentation](https://auth0.com/docs/secure/multi-factor-authentication) diff --git a/src/auth0_server_python/auth_server/__init__.py b/src/auth0_server_python/auth_server/__init__.py index 72818be..611f6b7 100644 --- a/src/auth0_server_python/auth_server/__init__.py +++ b/src/auth0_server_python/auth_server/__init__.py @@ -1,4 +1,5 @@ +from .mfa_client import MfaClient from .my_account_client import MyAccountClient from .server_client import ServerClient -__all__ = ["ServerClient", "MyAccountClient"] +__all__ = ["ServerClient", "MyAccountClient", "MfaClient"] diff --git a/src/auth0_server_python/auth_server/mfa_client.py b/src/auth0_server_python/auth_server/mfa_client.py new file mode 100644 index 0000000..878199f --- /dev/null +++ b/src/auth0_server_python/auth_server/mfa_client.py @@ -0,0 +1,478 @@ +""" +MFA Client for auth0-server-python SDK. +Handles Multi-Factor Authentication operations against the Auth0 MFA API. +""" + +import time +from typing import Any, Optional + +import httpx +from auth0_server_python.auth_schemes.bearer_auth import BearerAuth +from auth0_server_python.auth_types import ( + AuthenticatorResponse, + ChallengeResponse, + EnrollmentResponse, + MfaRequirements, + MfaTokenContext, + MfaVerifyResponse, + OobEnrollmentResponse, + OtpEnrollmentResponse, +) +from auth0_server_python.encryption.encrypt import decrypt, encrypt +from auth0_server_python.error import ( + MfaChallengeError, + MfaEnrollmentError, + MfaListAuthenticatorsError, + MfaRequiredError, + MfaTokenExpiredError, + MfaTokenInvalidError, + MfaVerifyError, +) + +DEFAULT_MFA_TOKEN_TTL = 300 # 5 minutes + + +class MfaClient: + """ + Client for Auth0 MFA API operations. + + Provides methods for listing authenticators, enrolling new authenticators, + deleting authenticators, challenging authenticators, and verifying MFA codes. + + All operations require an mfa_token which is obtained either: + 1. From MfaRequiredError raised during get_access_token() (encrypted) + 2. Directly from the Auth0 MFA challenge response (raw) + """ + + def __init__( + self, + domain: str, + client_id: str, + client_secret: str, + secret: str, + state_store=None, + state_identifier: str = "_a0_session" + ): + self._domain = domain + self._base_url = f"https://{domain}" + self._client_id = client_id + self._client_secret = client_secret + self._secret = secret + self._state_store = state_store + self._state_identifier = state_identifier + + # ============================================================================ + # MFA TOKEN ENCRYPTION / DECRYPTION + # ============================================================================ + + def encrypt_mfa_token( + self, + raw_mfa_token: str, + audience: str, + scope: str, + mfa_requirements: Optional[MfaRequirements] = None, + ttl: int = DEFAULT_MFA_TOKEN_TTL + ) -> str: + """Encrypt an MFA token with context for secure client-side storage.""" + context = MfaTokenContext( + mfa_token=raw_mfa_token, + audience=audience, + scope=scope, + mfa_requirements=mfa_requirements, + created_at=int(time.time()) + ) + return encrypt(context.model_dump(), self._secret, "mfa_token") + + def decrypt_mfa_token(self, encrypted_token: str) -> MfaTokenContext: + """Decrypt an MFA token and validate TTL.""" + try: + payload = decrypt(encrypted_token, self._secret, "mfa_token") + context = MfaTokenContext(**payload) + except Exception: + raise MfaTokenInvalidError() + + # Check TTL + elapsed = int(time.time()) - context.created_at + if elapsed > DEFAULT_MFA_TOKEN_TTL: + raise MfaTokenExpiredError() + + return context + + # ============================================================================ + # MFA API OPERATIONS + # ============================================================================ + + async def list_authenticators( + self, + options: dict[str, Any] + ) -> list[AuthenticatorResponse]: + """ + Lists all MFA authenticators enrolled by the user. + + Args: + options: Dict containing 'mfa_token' (encrypted or raw). + + Returns: + List of enrolled authenticators. + + Raises: + MfaListAuthenticatorsError: When the request fails. + """ + mfa_token = options["mfa_token"] + url = f"{self._base_url}/mfa/authenticators" + + try: + async with httpx.AsyncClient() as client: + response = await client.get( + url, + auth=BearerAuth(mfa_token) + ) + + if response.status_code != 200: + error_data = response.json() + raise MfaListAuthenticatorsError( + error_data.get("error_description", "Failed to list authenticators"), + error_data + ) + + api_response = response.json() + return [AuthenticatorResponse(**auth) for auth in api_response] + + except MfaListAuthenticatorsError: + raise + except Exception as e: + raise MfaListAuthenticatorsError( + f"Unexpected error listing authenticators: {str(e)}" + ) + + async def enroll_authenticator( + self, + options: dict[str, Any] + ) -> EnrollmentResponse: + """ + Enrolls a new MFA authenticator for the user. + + Args: + options: Dict containing enrollment parameters. + Required: 'mfa_token', 'factor_type' (otp, sms, voice, email). + Optional: 'phone_number', 'email'. + + Returns: + OtpEnrollmentResponse or OobEnrollmentResponse. + + Raises: + MfaEnrollmentError: When enrollment fails. + """ + mfa_token = options["mfa_token"] + factor_type = options["factor_type"] + url = f"{self._base_url}/mfa/associate" + + # Map factor_type to Auth0 API parameters + if factor_type == "otp": + authenticator_type = "otp" + oob_channels = None + elif factor_type in ["sms", "voice", "email"]: + authenticator_type = "oob" + oob_channels = factor_type + else: + raise MfaEnrollmentError( + f"Unsupported factor_type: {factor_type}. Supported types: otp, sms, voice, email" + ) + + # Build API request body + body: dict[str, Any] = { + "authenticator_types": authenticator_type + } + + if oob_channels: + body["oob_channels"] = oob_channels + + if "phone_number" in options and options["phone_number"]: + body["phone_number"] = options["phone_number"] + + if "email" in options and options["email"]: + body["email"] = options["email"] + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + url, + json=body, + auth=BearerAuth(mfa_token), + headers={"Content-Type": "application/json"} + ) + + if response.status_code != 200: + error_data = response.json() + raise MfaEnrollmentError( + error_data.get("error_description", "Failed to enroll authenticator"), + error_data + ) + + api_response = response.json() + authenticator_type = api_response.get("authenticator_type") + + if authenticator_type == "otp": + return OtpEnrollmentResponse(**api_response) + elif authenticator_type == "oob": + return OobEnrollmentResponse(**api_response) + else: + raise MfaEnrollmentError( + f"Unexpected authenticator type: {authenticator_type}" + ) + + except MfaEnrollmentError: + raise + except Exception as e: + raise MfaEnrollmentError( + f"Unexpected error enrolling authenticator: {str(e)}" + ) + + async def challenge_authenticator( + self, + options: dict[str, Any] + ) -> ChallengeResponse: + """ + Initiates an MFA challenge for user verification. + + Args: + options: Dict containing 'mfa_token', 'factor_type' (otp, sms, voice, email), + and optionally 'authenticator_id'. + + Returns: + ChallengeResponse with challenge details. + + Raises: + MfaChallengeError: When the challenge fails. + """ + mfa_token = options["mfa_token"] + factor_type = options["factor_type"] + url = f"{self._base_url}/mfa/challenge" + + # Map factor_type to Auth0 API challenge_type + if factor_type == "otp": + challenge_type = "otp" + elif factor_type in ["sms", "voice", "email"]: + challenge_type = "oob" + else: + raise MfaChallengeError( + f"Unsupported factor_type: {factor_type}. Supported types: otp, sms, voice, email" + ) + + body: dict[str, Any] = { + "mfa_token": mfa_token, + "client_id": self._client_id, + "challenge_type": challenge_type + } + + if "authenticator_id" in options and options["authenticator_id"]: + body["authenticator_id"] = options["authenticator_id"] + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + url, + json=body, + headers={"Content-Type": "application/json"} + ) + + if response.status_code != 200: + error_data = response.json() + raise MfaChallengeError( + error_data.get("error_description", "Failed to challenge authenticator"), + error_data + ) + + api_response = response.json() + return ChallengeResponse(**api_response) + + except MfaChallengeError: + raise + except Exception as e: + raise MfaChallengeError( + f"Unexpected error challenging authenticator: {str(e)}" + ) + + async def verify( + self, + options: dict[str, Any] + ) -> MfaVerifyResponse: + """ + Verifies an MFA code and completes authentication. + + Supports OTP, OOB (with binding code), and recovery code verification. + + If Auth0 returns 'mfa_required' again (chained MFA), raises MfaRequiredError + with a new encrypted mfa_token. + + Args: + options: Dict containing 'mfa_token' and one of: + - 'otp': OTP code + - 'oob_code' + 'binding_code': OOB verification + - 'recovery_code': Recovery code + - 'persist': bool (optional, default=False) - Persist tokens to state store + - 'audience': str (optional, required if persist=True) - Audience for token_set + - 'scope': str (optional) - Scope for token_set + - 'store_options': dict (optional) - Store-specific options + + Returns: + MfaVerifyResponse with access_token, token_type, etc. + + Raises: + MfaVerifyError: When verification fails. + MfaRequiredError: When chained MFA is required. + """ + mfa_token = options["mfa_token"] + + # Determine grant type and build body + body: dict[str, Any] = { + "client_id": self._client_id, + "client_secret": self._client_secret, + "mfa_token": mfa_token + } + + if "otp" in options: + body["grant_type"] = "http://auth0.com/oauth/grant-type/mfa-otp" + body["otp"] = options["otp"] + elif "oob_code" in options: + body["grant_type"] = "http://auth0.com/oauth/grant-type/mfa-oob" + body["oob_code"] = options["oob_code"] + body["binding_code"] = options.get("binding_code", "") + elif "recovery_code" in options: + body["grant_type"] = "http://auth0.com/oauth/grant-type/mfa-recovery-code" + body["recovery_code"] = options["recovery_code"] + else: + raise MfaVerifyError( + "No verification credential provided (otp, oob_code, or recovery_code)" + ) + + try: + token_endpoint = f"{self._base_url}/oauth/token" + + async with httpx.AsyncClient() as client: + response = await client.post( + token_endpoint, + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"} + ) + + if response.status_code != 200: + error_data = response.json() + + # Handle chained MFA + if error_data.get("error") == "mfa_required": + new_mfa_token = error_data.get("mfa_token") + mfa_requirements_data = error_data.get("mfa_requirements") + mfa_requirements = None + if mfa_requirements_data: + mfa_requirements = MfaRequirements(**mfa_requirements_data) + + raise MfaRequiredError( + error_data.get("error_description", "Additional MFA factor required"), + mfa_token=new_mfa_token, + mfa_requirements=mfa_requirements + ) + + raise MfaVerifyError( + error_data.get("error_description", "MFA verification failed"), + error_data + ) + + token_response = response.json() + verify_response = MfaVerifyResponse(**token_response) + + # Persist tokens to state store if requested + if options.get("persist") and self._state_store: + await self._persist_mfa_tokens( + verify_response=verify_response, + options=options + ) + + return verify_response + + except (MfaVerifyError, MfaRequiredError): + raise + except Exception as e: + raise MfaVerifyError( + f"Unexpected error during MFA verification: {str(e)}" + ) + + async def _persist_mfa_tokens( + self, + verify_response: MfaVerifyResponse, + options: dict[str, Any] + ) -> None: + """ + Persist MFA verification tokens to the state store. + + Updates the session with the new access_token and id_token from MFA verification. + + Args: + verify_response: The response from verify() containing tokens + options: Dict containing: + - 'audience': str - Audience for token_set + - 'scope': str (optional) - Scope for token_set + - 'store_options': dict (optional) - Store-specific options + """ + import time + + from auth0_server_python.auth_types import StateData, TokenSet + + audience = options.get("audience") + scope = options.get("scope") + store_options = options.get("store_options") + + if not audience: + raise MfaVerifyError( + "audience is required when persist=True" + ) + + try: + # Get existing state + state_data = await self._state_store.get( + self._state_identifier, + store_options + ) + + if not state_data: + raise MfaVerifyError( + "No existing session found to update with MFA tokens" + ) + + # Parse state data + existing_state = StateData(**state_data) if isinstance(state_data, dict) else state_data + + # Update id_token if present + if verify_response.id_token: + existing_state.id_token = verify_response.id_token + + # Create token_set for the access_token + expires_in = verify_response.get("expires_in", 86400) # Default 24 hours + expires_at = int(time.time()) + expires_in + + new_token_set = TokenSet( + audience=audience, + access_token=verify_response.access_token, + scope=scope, + expires_at=expires_at + ) + + # Add to token_sets, replacing any existing token_set for this audience + existing_state.token_sets = [ + ts for ts in existing_state.token_sets if ts.audience != audience + ] + existing_state.token_sets.append(new_token_set) + + # Persist updated state + await self._state_store.set( + self._state_identifier, + existing_state.model_dump() if hasattr(existing_state, 'model_dump') else existing_state, + options=store_options + ) + + except MfaVerifyError: + raise + except Exception as e: + raise MfaVerifyError( + f"Failed to persist MFA tokens to state store: {str(e)}" + ) diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 1e62aa5..359ec39 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -11,6 +11,7 @@ import httpx import jwt +from auth0_server_python.auth_server.mfa_client import MfaClient from auth0_server_python.auth_server.my_account_client import MyAccountClient from auth0_server_python.auth_types import ( CompleteConnectAccountRequest, @@ -24,6 +25,7 @@ LoginWithCustomTokenExchangeResult, LogoutOptions, LogoutTokenClaims, + MfaRequirements, StartInteractiveLoginOptions, StateData, TokenExchangeResponse, @@ -41,6 +43,7 @@ CustomTokenExchangeError, CustomTokenExchangeErrorCode, InvalidArgumentError, + MfaRequiredError, MissingRequiredArgumentError, MissingTransactionError, PollingApiError, @@ -105,6 +108,7 @@ def __init__( self._client_id = client_id self._client_secret = client_secret self._redirect_uri = redirect_uri + self._secret = secret self._default_authorization_params = authorization_params or {} self._pushed_authorization_requests = pushed_authorization_requests # store the flag @@ -122,6 +126,21 @@ def __init__( self._my_account_client = MyAccountClient(domain=domain) + # Initialize MFA client + self._mfa_client = MfaClient( + domain=self._domain, + client_id=self._client_id, + client_secret=self._client_secret, + secret=self._secret, + state_store=self._state_store, + state_identifier=self._state_identifier + ) + + @property + def mfa(self) -> MfaClient: + """Access the MFA client for multi-factor authentication operations.""" + return self._mfa_client + async def _fetch_oidc_metadata(self, domain: str) -> dict: """Fetch OpenID Connect discovery metadata from the Auth0 domain.""" metadata_url = f"https://{domain}/.well-known/openid-configuration" @@ -550,6 +569,24 @@ async def get_access_token( return token_endpoint_response["access_token"] except Exception as e: + # Check for mfa_required error from token refresh + if isinstance(e, ApiError) and e.code == "mfa_required": + raw_mfa_token = getattr(e, "mfa_token", None) + mfa_requirements = getattr(e, "mfa_requirements", None) + + if raw_mfa_token: + encrypted_token = self._mfa_client.encrypt_mfa_token( + raw_mfa_token=raw_mfa_token, + audience=audience or self.DEFAULT_AUDIENCE_STATE_KEY, + scope=merged_scope or "", + mfa_requirements=mfa_requirements + ) + raise MfaRequiredError( + "Multifactor authentication required", + mfa_token=encrypted_token, + mfa_requirements=mfa_requirements + ) + if isinstance(e, AccessTokenError): raise raise AccessTokenError( @@ -615,8 +652,23 @@ async def get_token_by_refresh_token(self, options: dict[str, Any]) -> dict[str, if response.status_code != 200: error_data = response.json() + error_code = error_data.get("error", "refresh_token_error") + + # Preserve mfa_required details for upstream handling + if error_code == "mfa_required": + error = ApiError( + error_code, + error_data.get("error_description", "MFA required") + ) + error.mfa_token = error_data.get("mfa_token") + mfa_requirements_data = error_data.get("mfa_requirements") + error.mfa_requirements = None + if mfa_requirements_data: + error.mfa_requirements = MfaRequirements(**mfa_requirements_data) + raise error + raise ApiError( - error_data.get("error", "refresh_token_error"), + error_code, error_data.get("error_description", "Failed to exchange refresh token") ) diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index 4b36ca3..4525d0b 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -3,7 +3,7 @@ These Pydantic models provide type safety and validation for all SDK data structures. """ -from typing import Any, Optional +from typing import Any, Literal, Optional, Union from pydantic import BaseModel, Field, field_validator, model_validator @@ -398,3 +398,162 @@ class ListConnectedAccountConnectionsResponse(BaseModel): connections: list[ConnectedAccountConnection] next: Optional[str] = None + +# ============================================================================= +# MFA Types +# ============================================================================= + +# Type aliases using Literal types +AuthenticatorType = Literal["otp", "oob", "recovery-code"] +OobChannel = Literal["sms", "voice", "auth0", "email"] +ChallengeType = Literal["otp", "oob"] + + +class AuthenticatorResponse(BaseModel): + """Represents an MFA authenticator enrolled by a user.""" + id: str + authenticator_type: AuthenticatorType + active: bool + name: Optional[str] = None + oob_channels: Optional[list[OobChannel]] = None + type: Optional[str] = None + + +# Enrollment Options + +class EnrollOtpOptions(BaseModel): + """Options for enrolling an OTP authenticator.""" + authenticator_types: list[str] + mfa_token: str + + +class EnrollOobOptions(BaseModel): + """Options for enrolling an OOB authenticator (SMS, Voice, Push).""" + authenticator_types: list[str] + oob_channels: list[OobChannel] + phone_number: Optional[str] = None + mfa_token: str + + +class EnrollEmailOptions(BaseModel): + """Options for enrolling an email authenticator.""" + authenticator_types: list[str] + oob_channels: list[OobChannel] + email: Optional[str] = None + mfa_token: str + + +EnrollAuthenticatorOptions = Union[EnrollOtpOptions, EnrollOobOptions, EnrollEmailOptions] + + +# Enrollment Responses + +class OtpEnrollmentResponse(BaseModel): + """Response when enrolling an OTP authenticator.""" + authenticator_type: Literal["otp"] + secret: str + barcode_uri: str + recovery_codes: Optional[list[str]] = None + id: Optional[str] = None + + +class OobEnrollmentResponse(BaseModel): + """Response when enrolling an OOB authenticator.""" + authenticator_type: Literal["oob"] + oob_channel: OobChannel + oob_code: Optional[str] = None + binding_method: Optional[str] = None + id: Optional[str] = None + + +EnrollmentResponse = Union[OtpEnrollmentResponse, OobEnrollmentResponse] + + +# Challenge Types + +class ChallengeOptions(BaseModel): + """Options for initiating an MFA challenge.""" + challenge_type: ChallengeType + authenticator_id: Optional[str] = None + mfa_token: str + + +class ChallengeResponse(BaseModel): + """Response from initiating an MFA challenge.""" + challenge_type: ChallengeType + oob_code: Optional[str] = None + binding_method: Optional[str] = None + + +# List / Delete Options + +class ListAuthenticatorsOptions(BaseModel): + """Options for listing MFA authenticators.""" + mfa_token: str + + +class DeleteAuthenticatorOptions(BaseModel): + """Options for deleting an MFA authenticator.""" + authenticator_id: str + mfa_token: str + + +# Verify Types + +class VerifyOtpOptions(BaseModel): + """Verify with OTP code.""" + mfa_token: str + otp: str + + +class VerifyOobOptions(BaseModel): + """Verify with OOB code + binding code.""" + mfa_token: str + oob_code: str + binding_code: str + + +class VerifyRecoveryCodeOptions(BaseModel): + """Verify with recovery code.""" + mfa_token: str + recovery_code: str + + +VerifyMfaOptions = Union[VerifyOtpOptions, VerifyOobOptions, VerifyRecoveryCodeOptions] + + +class MfaVerifyResponse(BaseModel): + """Response from MFA verification.""" + access_token: str + token_type: str = "Bearer" + expires_in: int + id_token: Optional[str] = None + refresh_token: Optional[str] = None + scope: Optional[str] = None + audience: Optional[str] = None + recovery_code: Optional[str] = None + + +# MFA Requirements + +class MfaRequirement(BaseModel): + """A single MFA requirement entry.""" + type: str + + +class MfaRequirements(BaseModel): + """MFA requirements from an mfa_required error response.""" + enroll: Optional[list[MfaRequirement]] = None + challenge: Optional[list[MfaRequirement]] = None + + +# MFA Token Context (for encrypted storage) + +class MfaTokenContext(BaseModel): + """Internal context stored inside encrypted mfa_token.""" + mfa_token: str + audience: str + scope: str + mfa_requirements: Optional[MfaRequirements] = None + created_at: int + diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index c593368..8eb13eb 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -2,7 +2,7 @@ Error classes for the auth0-server-python SDK. These exceptions provide specific error types for different failure scenarios. """ -from typing import Optional +from typing import Any, Optional class Auth0Error(Exception): @@ -184,3 +184,95 @@ class CustomTokenExchangeErrorCode: MISSING_ACTOR_TOKEN_TYPE = "missing_actor_token_type" TOKEN_EXCHANGE_FAILED = "token_exchange_failed" INVALID_RESPONSE = "invalid_response" + + +# ============================================================================= +# MFA Error Classes +# ============================================================================= + +class MfaApiError(Auth0Error): + """Base class for MFA API errors.""" + + def __init__( + self, + code: str, + message: str, + cause: Optional[dict[str, Any]] = None + ): + super().__init__(message) + self.code = code + self.cause = cause + + +class MfaListAuthenticatorsError(MfaApiError): + """Error thrown when listing authenticators fails.""" + + def __init__(self, message: str, cause: Optional[dict] = None): + super().__init__("mfa_list_authenticators_error", message, cause) + + +class MfaEnrollmentError(MfaApiError): + """Error thrown when enrolling an authenticator fails.""" + + def __init__(self, message: str, cause: Optional[dict] = None): + super().__init__("mfa_enrollment_error", message, cause) + + +class MfaDeleteAuthenticatorError(MfaApiError): + """Error thrown when deleting an authenticator fails.""" + + def __init__(self, message: str, cause: Optional[dict] = None): + super().__init__("mfa_delete_authenticator_error", message, cause) + + +class MfaChallengeError(MfaApiError): + """Error thrown when initiating an MFA challenge fails.""" + + def __init__(self, message: str, cause: Optional[dict] = None): + super().__init__("mfa_challenge_error", message, cause) + + +class MfaVerifyError(MfaApiError): + """Error thrown when MFA verification fails.""" + + def __init__(self, message: str, cause: Optional[dict] = None): + super().__init__("mfa_verify_error", message, cause) + + +class MfaRequiredError(Auth0Error): + """ + Error thrown when MFA step-up is required during token refresh. + + Contains an encrypted mfa_token that can be passed to MfaClient methods. + This error is raised in get_access_token() when the token endpoint returns + 'mfa_required'. + """ + + def __init__( + self, + message: str, + mfa_token: str, + mfa_requirements=None, + cause: Optional[Exception] = None + ): + super().__init__(message) + self.code = "mfa_required" + self.mfa_token = mfa_token + self.mfa_requirements = mfa_requirements + self.cause = cause + + +class MfaTokenExpiredError(Auth0Error): + """Error thrown when the encrypted MFA token has expired.""" + + def __init__(self): + super().__init__("The MFA token has expired.") + self.code = "mfa_token_expired" + + +class MfaTokenInvalidError(Auth0Error): + """Error thrown when the encrypted MFA token is invalid or tampered.""" + + def __init__(self): + super().__init__("The MFA token is invalid.") + self.code = "mfa_token_invalid" diff --git a/src/auth0_server_python/tests/test_mfa_client.py b/src/auth0_server_python/tests/test_mfa_client.py new file mode 100644 index 0000000..a53bacd --- /dev/null +++ b/src/auth0_server_python/tests/test_mfa_client.py @@ -0,0 +1,632 @@ +""" +Tests for MfaClient — MFA API operations. +""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from auth0_server_python.auth_server.mfa_client import DEFAULT_MFA_TOKEN_TTL, MfaClient +from auth0_server_python.auth_types import ( + AuthenticatorResponse, + ChallengeResponse, + MfaRequirements, + MfaVerifyResponse, + OobEnrollmentResponse, + OtpEnrollmentResponse, +) +from auth0_server_python.error import ( + MfaChallengeError, + MfaEnrollmentError, + MfaListAuthenticatorsError, + MfaRequiredError, + MfaTokenExpiredError, + MfaTokenInvalidError, + MfaVerifyError, +) + +# Shared fixtures +DOMAIN = "auth0.local" +CLIENT_ID = "" +CLIENT_SECRET = "" +SECRET = "test-secret-long-enough-for-encryption" + + +def _make_client() -> MfaClient: + return MfaClient( + domain=DOMAIN, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + secret=SECRET + ) + + +# ── Constructor ────────────────────────────────────────────────────────────── + +class TestMfaClientConstructor: + def test_constructor_sets_properties(self): + client = _make_client() + assert client._domain == DOMAIN + assert client._base_url == f"https://{DOMAIN}" + assert client._client_id == CLIENT_ID + assert client._client_secret == CLIENT_SECRET + assert client._secret == SECRET + + +# ── Token Encryption / Decryption ──────────────────────────────────────────── + +class TestMfaTokenEncryption: + def test_encrypt_decrypt_roundtrip(self): + client = _make_client() + requirements = MfaRequirements( + enroll=[{"type": "otp"}], + challenge=[{"type": "oob"}] + ) + encrypted = client.encrypt_mfa_token( + raw_mfa_token="raw_token_123", + audience="https://api.example.com", + scope="openid profile", + mfa_requirements=requirements + ) + assert isinstance(encrypted, str) + assert encrypted != "raw_token_123" + + context = client.decrypt_mfa_token(encrypted) + assert context.mfa_token == "raw_token_123" + assert context.audience == "https://api.example.com" + assert context.scope == "openid profile" + assert context.mfa_requirements is not None + + def test_decrypt_expired_token_raises(self, mocker): + client = _make_client() + mocker.patch("auth0_server_python.auth_server.mfa_client.time.time", + return_value=1000) + encrypted = client.encrypt_mfa_token( + raw_mfa_token="raw", + audience="aud", + scope="scope" + ) + + # Move time forward past TTL + mocker.patch("auth0_server_python.auth_server.mfa_client.time.time", + return_value=1000 + DEFAULT_MFA_TOKEN_TTL + 1) + with pytest.raises(MfaTokenExpiredError): + client.decrypt_mfa_token(encrypted) + + def test_decrypt_invalid_token_raises(self): + client = _make_client() + with pytest.raises(MfaTokenInvalidError): + client.decrypt_mfa_token("not-a-valid-encrypted-token") + + def test_decrypt_tampered_token_raises(self): + client = _make_client() + encrypted = client.encrypt_mfa_token( + raw_mfa_token="raw", audience="aud", scope="scope" + ) + tampered = encrypted[:-5] + "XXXXX" + with pytest.raises(MfaTokenInvalidError): + client.decrypt_mfa_token(tampered) + + def test_encrypt_without_mfa_requirements(self): + client = _make_client() + encrypted = client.encrypt_mfa_token( + raw_mfa_token="raw", audience="aud", scope="scope" + ) + context = client.decrypt_mfa_token(encrypted) + assert context.mfa_requirements is None + + +# ── list_authenticators ────────────────────────────────────────────────────── + +class TestListAuthenticators: + @pytest.mark.asyncio + async def test_list_authenticators_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value=[ + { + "id": "auth|123", + "authenticator_type": "otp", + "active": True, + "name": "Google Authenticator" + }, + { + "id": "auth|456", + "authenticator_type": "oob", + "active": True, + "oob_channels": ["sms"] + } + ]) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + result = await client.list_authenticators({"mfa_token": "mfa_tok"}) + assert len(result) == 2 + assert isinstance(result[0], AuthenticatorResponse) + assert result[0].id == "auth|123" + assert result[1].oob_channels == ["sms"] + + @pytest.mark.asyncio + async def test_list_authenticators_api_error(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 401 + response.json = MagicMock(return_value={ + "error": "invalid_token", + "error_description": "Invalid MFA token" + }) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaListAuthenticatorsError) as exc: + await client.list_authenticators({"mfa_token": "bad_tok"}) + assert "Invalid MFA token" in str(exc.value) + + @pytest.mark.asyncio + async def test_list_authenticators_unexpected_error(self, mocker): + client = _make_client() + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, side_effect=Exception("network down")) + + with pytest.raises(MfaListAuthenticatorsError) as exc: + await client.list_authenticators({"mfa_token": "tok"}) + assert "network down" in str(exc.value) + + +# ── enroll_authenticator ───────────────────────────────────────────────────── + +class TestEnrollAuthenticator: + @pytest.mark.asyncio + async def test_enroll_otp_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "authenticator_type": "otp", + "secret": "JBSWY3DPEHPK3PXP", + "barcode_uri": "otpauth://totp/auth0:user?secret=JBSWY3DPEHPK3PXP", + "recovery_codes": ["code1", "code2"] + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.enroll_authenticator({ + "mfa_token": "tok", + "factor_type": "otp" + }) + assert isinstance(result, OtpEnrollmentResponse) + assert result.secret == "JBSWY3DPEHPK3PXP" + assert result.recovery_codes == ["code1", "code2"] + + @pytest.mark.asyncio + async def test_enroll_sms_oob_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "authenticator_type": "oob", + "oob_channel": "sms", + "oob_code": "oob_123", + "binding_method": "prompt" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.enroll_authenticator({ + "mfa_token": "tok", + "factor_type": "sms", + "phone_number": "+1234567890" + }) + assert isinstance(result, OobEnrollmentResponse) + assert result.oob_channel == "sms" + + @pytest.mark.asyncio + async def test_enroll_email_oob_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "authenticator_type": "oob", + "oob_channel": "email", + "oob_code": "oob_email_123" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.enroll_authenticator({ + "mfa_token": "tok", + "factor_type": "email", + "email": "user@example.com" + }) + assert isinstance(result, OobEnrollmentResponse) + assert result.oob_channel == "email" + + @pytest.mark.asyncio + async def test_enroll_push_auth0_channel_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "authenticator_type": "oob", + "oob_channel": "auth0", + "oob_code": "oob_push_123", + "binding_method": "prompt", + "recovery_codes": ["rc1", "rc2"] + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.enroll_authenticator({ + "mfa_token": "tok", + "factor_type": "sms" + }) + assert isinstance(result, OobEnrollmentResponse) + assert result.oob_channel == "auth0" + + @pytest.mark.asyncio + async def test_enroll_api_error(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 400 + response.json = MagicMock(return_value={ + "error": "invalid_request", + "error_description": "Bad enrollment request" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaEnrollmentError) as exc: + await client.enroll_authenticator({ + "mfa_token": "tok", + "factor_type": "otp" + }) + assert "Bad enrollment request" in str(exc.value) + + @pytest.mark.asyncio + async def test_enroll_unexpected_authenticator_type(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "authenticator_type": "unknown_type" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaEnrollmentError) as exc: + await client.enroll_authenticator({ + "mfa_token": "tok", + "factor_type": "unknown" + }) + assert "Unsupported factor_type" in str(exc.value) + + +# ── delete_authenticator ───────────────────────────────────────────────────── + +# ── challenge_authenticator ────────────────────────────────────────────────── + +class TestChallengeAuthenticator: + @pytest.mark.asyncio + async def test_challenge_otp_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "challenge_type": "otp" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.challenge_authenticator({ + "mfa_token": "tok", + "factor_type": "otp" + }) + assert isinstance(result, ChallengeResponse) + assert result.challenge_type == "otp" + + @pytest.mark.asyncio + async def test_challenge_oob_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "challenge_type": "oob", + "oob_code": "oob_challenge_123", + "binding_method": "prompt" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.challenge_authenticator({ + "mfa_token": "tok", + "factor_type": "sms", + "authenticator_id": "auth|456" + }) + assert result.challenge_type == "oob" + assert result.oob_code == "oob_challenge_123" + + @pytest.mark.asyncio + async def test_challenge_api_error(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 403 + response.json = MagicMock(return_value={ + "error": "invalid_token", + "error_description": "Token expired" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaChallengeError) as exc: + await client.challenge_authenticator({ + "mfa_token": "tok", + "factor_type": "otp" + }) + assert "Token expired" in str(exc.value) + + @pytest.mark.asyncio + async def test_challenge_expired_mfa_token(self, mocker): + """When Auth0 returns expired_token for an expired mfa_token.""" + client = _make_client() + response = AsyncMock() + response.status_code = 401 + response.json = MagicMock(return_value={ + "error": "expired_token", + "error_description": "mfa_token is expired" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaChallengeError) as exc: + await client.challenge_authenticator({ + "mfa_token": "expired_tok", + "factor_type": "otp" + }) + assert "mfa_token is expired" in str(exc.value) + + @pytest.mark.asyncio + async def test_challenge_email_with_authenticator_id(self, mocker): + """Challenge an email authenticator with a specific authenticator_id.""" + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "challenge_type": "oob", + "oob_code": "oob_email_challenge_123", + "binding_method": "prompt" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.challenge_authenticator({ + "mfa_token": "tok", + "factor_type": "email", + "authenticator_id": "email|dev_Fvx38nHufsGL5lWI" + }) + assert result.challenge_type == "oob" + assert result.oob_code == "oob_email_challenge_123" + assert result.binding_method == "prompt" + + @pytest.mark.asyncio + async def test_challenge_sms_with_authenticator_id(self, mocker): + """Challenge an SMS authenticator with a specific authenticator_id.""" + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "challenge_type": "oob", + "oob_code": "oob_sms_challenge_456", + "binding_method": "prompt" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.challenge_authenticator({ + "mfa_token": "tok", + "factor_type": "sms", + "authenticator_id": "sms|dev_h1uXXoVjQ5BpU9iQ" + }) + assert result.challenge_type == "oob" + assert result.oob_code == "oob_sms_challenge_456" + + +# ── verify ─────────────────────────────────────────────────────────────────── + +class TestVerify: + @pytest.mark.asyncio + async def test_verify_otp_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "access_token": "new_at", + "token_type": "Bearer", + "expires_in": 3600 + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.verify({ + "mfa_token": "tok", + "otp": "123456" + }) + assert isinstance(result, MfaVerifyResponse) + assert result.access_token == "new_at" + + @pytest.mark.asyncio + async def test_verify_oob_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "access_token": "new_at", + "token_type": "Bearer", + "expires_in": 3600 + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.verify({ + "mfa_token": "tok", + "oob_code": "oob_123", + "binding_code": "bind_456" + }) + assert isinstance(result, MfaVerifyResponse) + + @pytest.mark.asyncio + async def test_verify_recovery_code_success(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "access_token": "new_at", + "token_type": "Bearer", + "expires_in": 3600 + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.verify({ + "mfa_token": "tok", + "recovery_code": "ABCD-1234-EFGH" + }) + assert isinstance(result, MfaVerifyResponse) + + @pytest.mark.asyncio + async def test_verify_no_credential_raises(self): + client = _make_client() + with pytest.raises(MfaVerifyError) as exc: + await client.verify({"mfa_token": "tok"}) + assert "No verification credential" in str(exc.value) + + @pytest.mark.asyncio + async def test_verify_sends_mfa_token_as_form_data(self, mocker): + """Verify that mfa_token is sent as form_data, not as Authorization header.""" + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "access_token": "at", + "token_type": "Bearer", + "expires_in": 3600 + }) + + captured_request = {} + + async def mock_post(self_client, url, **kwargs): + captured_request["url"] = url + captured_request["kwargs"] = kwargs + return response + + mocker.patch("httpx.AsyncClient.post", new=mock_post) + + await client.verify({ + "mfa_token": "my_mfa_token", + "otp": "123456" + }) + + # Verify: mfa_token in form data body, NOT in Authorization header + assert "data" in captured_request["kwargs"] + form_data = captured_request["kwargs"]["data"] + assert form_data["mfa_token"] == "my_mfa_token" + assert "Content-Type" in captured_request["kwargs"].get("headers", {}) + assert captured_request["kwargs"]["headers"]["Content-Type"] == "application/x-www-form-urlencoded" + # Should NOT use auth= parameter (no BearerAuth) + assert "auth" not in captured_request["kwargs"] + + @pytest.mark.asyncio + async def test_verify_expired_mfa_token(self, mocker): + """When Auth0 returns expired_token for an expired mfa_token.""" + client = _make_client() + response = AsyncMock() + response.status_code = 401 + response.json = MagicMock(return_value={ + "error": "expired_token", + "error_description": "mfa_token is expired" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaVerifyError) as exc: + await client.verify({"mfa_token": "expired_tok", "otp": "123456"}) + assert "mfa_token is expired" in str(exc.value) + + @pytest.mark.asyncio + async def test_verify_invalid_challenge_type(self, mocker): + """When Auth0 returns invalid_request for an unsupported challenge type.""" + client = _make_client() + response = AsyncMock() + response.status_code = 400 + response.json = MagicMock(return_value={ + "error": "invalid_request", + "error_description": "Invalid challenge type" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaVerifyError) as exc: + await client.verify({"mfa_token": "tok", "recovery_code": "ABCD-1234"}) + assert "Invalid challenge type" in str(exc.value) + + @pytest.mark.asyncio + async def test_verify_response_includes_recovery_code(self, mocker): + """When MFA verification returns a new recovery_code (e.g., after recovery code use).""" + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "access_token": "new_at", + "token_type": "Bearer", + "expires_in": 3600, + "recovery_code": "NEW-RECOVERY-CODE-XYZ" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.verify({ + "mfa_token": "tok", + "recovery_code": "OLD-RECOVERY-CODE" + }) + assert isinstance(result, MfaVerifyResponse) + assert result.recovery_code == "NEW-RECOVERY-CODE-XYZ" + + @pytest.mark.asyncio + async def test_verify_push_oob_success(self, mocker): + """Verify with OOB code from push notification challenge.""" + client = _make_client() + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "access_token": "push_at", + "token_type": "Bearer", + "expires_in": 3600 + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + result = await client.verify({ + "mfa_token": "tok", + "oob_code": "oob_push_code", + "binding_code": "" + }) + assert isinstance(result, MfaVerifyResponse) + assert result.access_token == "push_at" + + @pytest.mark.asyncio + async def test_verify_wrong_code_raises(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 403 + response.json = MagicMock(return_value={ + "error": "invalid_grant", + "error_description": "Invalid OTP" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaVerifyError) as exc: + await client.verify({"mfa_token": "tok", "otp": "000000"}) + assert "Invalid OTP" in str(exc.value) + + @pytest.mark.asyncio + async def test_verify_chained_mfa_raises_mfa_required(self, mocker): + client = _make_client() + response = AsyncMock() + response.status_code = 403 + response.json = MagicMock(return_value={ + "error": "mfa_required", + "error_description": "Additional factor required", + "mfa_token": "new_raw_mfa_token" + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MfaRequiredError) as exc: + await client.verify({"mfa_token": "tok", "otp": "123456"}) + assert exc.value.mfa_token == "new_raw_mfa_token" + assert exc.value.code == "mfa_required" + + @pytest.mark.asyncio + async def test_verify_unexpected_error(self, mocker): + client = _make_client() + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, side_effect=Exception("connection reset")) + + with pytest.raises(MfaVerifyError) as exc: + await client.verify({"mfa_token": "tok", "otp": "123456"}) + assert "connection reset" in str(exc.value) diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 4f1b90b..d3c6868 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -19,6 +19,7 @@ ListConnectedAccountsResponse, LoginWithCustomTokenExchangeOptions, LogoutOptions, + MfaRequirements, TransactionData, ) from auth0_server_python.error import ( @@ -28,6 +29,7 @@ CustomTokenExchangeError, CustomTokenExchangeErrorCode, InvalidArgumentError, + MfaRequiredError, MissingRequiredArgumentError, MissingTransactionError, PollingApiError, @@ -2824,3 +2826,188 @@ async def test_login_with_custom_token_exchange_failure_propagates(mocker): ) ) assert exc.value.code == "unauthorized" + + +# ── MFA Integration Tests ──────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_server_client_mfa_property(): + """ + The ServerClient should expose an 'mfa' property returning an MfaClient instance. + """ + from auth0_server_python.auth_server.mfa_client import MfaClient + + mock_secret = "a-test-secret-with-enough-length" + mock_store = MagicMock() + mock_store.get = AsyncMock(return_value=None) + mock_store.set = AsyncMock() + mock_store.delete = AsyncMock() + + # Patch OIDC metadata + import auth0_server_python.auth_server.server_client as sc_mod + + original_fetch = sc_mod.ServerClient._fetch_oidc_metadata + + async def _fake_fetch(self, domain): + return { + "authorization_endpoint": f"https://{domain}/authorize", + "token_endpoint": f"https://{domain}/oauth/token", + "end_session_endpoint": f"https://{domain}/v2/logout", + "backchannel_logout_supported": True, + } + + sc_mod.ServerClient._fetch_oidc_metadata = _fake_fetch + try: + client = ServerClient( + domain="auth0.local", + client_id="cid", + client_secret="csecret", + secret=mock_secret, + transaction_store=mock_store, + state_store=mock_store, + ) + assert isinstance(client.mfa, MfaClient) + finally: + sc_mod.ServerClient._fetch_oidc_metadata = original_fetch + + +@pytest.mark.asyncio +async def test_get_access_token_mfa_required(mocker): + """ + When get_token_by_refresh_token returns an mfa_required error, + get_access_token should raise MfaRequiredError with an encrypted mfa_token. + """ + mock_secret = "a-test-secret-with-enough-length" + mock_store = MagicMock() + mock_store.get = AsyncMock(return_value=None) + mock_store.set = AsyncMock() + mock_store.delete = AsyncMock() + + import auth0_server_python.auth_server.server_client as sc_mod + + original_fetch = sc_mod.ServerClient._fetch_oidc_metadata + + async def _fake_fetch(self, domain): + return { + "authorization_endpoint": f"https://{domain}/authorize", + "token_endpoint": f"https://{domain}/oauth/token", + "end_session_endpoint": f"https://{domain}/v2/logout", + "backchannel_logout_supported": True, + } + + sc_mod.ServerClient._fetch_oidc_metadata = _fake_fetch + try: + client = ServerClient( + domain="auth0.local", + client_id="cid", + client_secret="csecret", + secret=mock_secret, + transaction_store=mock_store, + state_store=mock_store, + ) + + # Simulate state with a refresh_token and expired access token + mock_store.get = AsyncMock(return_value={ + "refresh_token": "rt_123", + "token_sets": [ + { + "audience": "default", + "access_token": "expired_at", + "expires_at": 0, + } + ] + }) + + # Simulate mfa_required ApiError from token refresh + mfa_err = ApiError( + code="mfa_required", + message="Multifactor authentication required", + ) + mfa_err.mfa_token = "raw_mfa_token_xyz" + mfa_err.mfa_requirements = None + + mocker.patch.object(client, "get_token_by_refresh_token", + new_callable=AsyncMock, side_effect=mfa_err) + + with pytest.raises(MfaRequiredError) as exc: + await client.get_access_token() + + assert exc.value.mfa_token is not None + assert exc.value.mfa_token != "raw_mfa_token_xyz" # encrypted + finally: + sc_mod.ServerClient._fetch_oidc_metadata = original_fetch + + +@pytest.mark.asyncio +async def test_get_access_token_mfa_required_with_enroll_requirements(mocker): + """ + When get_token_by_refresh_token returns mfa_required with enroll requirements, + get_access_token should raise MfaRequiredError with mfa_requirements containing enroll. + """ + mock_secret = "a-test-secret-with-enough-length" + mock_store = MagicMock() + mock_store.get = AsyncMock(return_value=None) + mock_store.set = AsyncMock() + mock_store.delete = AsyncMock() + + import auth0_server_python.auth_server.server_client as sc_mod + + original_fetch = sc_mod.ServerClient._fetch_oidc_metadata + + async def _fake_fetch(self, domain): + return { + "authorization_endpoint": f"https://{domain}/authorize", + "token_endpoint": f"https://{domain}/oauth/token", + "end_session_endpoint": f"https://{domain}/v2/logout", + "backchannel_logout_supported": True, + } + + sc_mod.ServerClient._fetch_oidc_metadata = _fake_fetch + try: + client = ServerClient( + domain="auth0.local", + client_id="cid", + client_secret="csecret", + secret=mock_secret, + transaction_store=mock_store, + state_store=mock_store, + ) + + # Simulate state with a refresh_token and expired access token + mock_store.get = AsyncMock(return_value={ + "refresh_token": "rt_123", + "token_sets": [ + { + "audience": "default", + "access_token": "expired_at", + "expires_at": 0, + } + ] + }) + + # Simulate mfa_required with enroll requirements + mfa_err = ApiError( + code="mfa_required", + message="Multifactor authentication required", + ) + mfa_err.mfa_token = "raw_mfa_token_enroll" + mfa_err.mfa_requirements = MfaRequirements( + enroll=[ + {"type": "otp"}, + {"type": "phone"}, + {"type": "push-notification"} + ] + ) + + mocker.patch.object(client, "get_token_by_refresh_token", + new_callable=AsyncMock, side_effect=mfa_err) + + with pytest.raises(MfaRequiredError) as exc: + await client.get_access_token() + + assert exc.value.mfa_token is not None + assert exc.value.mfa_token != "raw_mfa_token_enroll" # encrypted + assert exc.value.mfa_requirements is not None + finally: + sc_mod.ServerClient._fetch_oidc_metadata = original_fetch