Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ def _resolve_azure_region(config: AgentAuthConfiguration) -> str | None:
return azure_region
return None

@staticmethod
def _resolve_idpm_resource(config: AgentAuthConfiguration) -> str:
"""Resolves the resource URL for Identity Proxy Manager (IDPM) token acquisition.

When no resource is configured, defaults to the AzureAdTokenExchange resource.
Otherwise the configured value must be a valid absolute URI.
"""
idpm_resource = getattr(config, "IDPM_RESOURCE", None)
if not idpm_resource:
return "api://AzureAdTokenExchange/.default"

valid_uri, _ = MsalAuth._uri_validator(idpm_resource)
if not valid_uri:
raise ValueError("IDPM_RESOURCE must be a valid absolute URI")
Comment thread
axelsrz marked this conversation as resolved.

return idpm_resource
Comment thread
axelsrz marked this conversation as resolved.

@staticmethod
def _resolve_tenant_id(
config: AgentAuthConfiguration, tenant_id: str | None = None
Expand All @@ -206,7 +223,10 @@ def _create_client_application(
self, tenant_id: str | None = None
) -> ConfidentialClientApplication | ManagedIdentityClient:

if self._msal_configuration.AUTH_TYPE == AuthTypes.user_managed_identity:
if self._msal_configuration.AUTH_TYPE in (
AuthTypes.user_managed_identity,
AuthTypes.identity_proxy_manager,
):
return ManagedIdentityClient(
UserAssignedManagedIdentity(
client_id=self._msal_configuration.CLIENT_ID
Expand Down Expand Up @@ -348,7 +368,29 @@ async def get_agentic_application_token(
if auth_result_payload:
return auth_result_payload.get("access_token")

return None
return None

if (
self._msal_configuration.AUTH_TYPE == AuthTypes.identity_proxy_manager
and isinstance(msal_auth_client, ManagedIdentityClient)
):
Comment thread
axelsrz marked this conversation as resolved.
resource = MsalAuth._resolve_idpm_resource(self._msal_configuration)
logger.info(
"Acquiring agentic application token using Identity Proxy Manager for resource %s",
resource,
)
auth_result_payload = await _async_acquire_token_for_client(
msal_auth_client, resource=resource
)

if auth_result_payload:
return auth_result_payload.get("access_token")

return None

raise RuntimeError(
"Agentic token acquisition supports ConfidentialClientApplication, or ManagedIdentityClient when AUTH_TYPE is AuthTypes.identity_proxy_manager."
)
Comment thread
Copilot marked this conversation as resolved.

async def get_agentic_instance_token(
self, tenant_id: str, agent_app_instance_id: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class AgentAuthConfiguration:
ALT_BLUEPRINT_ID: An optional alternative blueprint ID used when constructing a connector client.
AZURE_REGION: The Azure regional token service to use for token acquisition (ESTS-R).
This feature is currently available to first-party applications only.
IDPM_RESOURCE: The resource URL for Identity Proxy Manager (IDPM) token acquisition.
Only meaningful when AUTH_TYPE is AuthTypes.identity_proxy_manager. When not set,
it defaults to "api://AzureAdTokenExchange/.default".
Comment thread
axelsrz marked this conversation as resolved.
Comment thread
axelsrz marked this conversation as resolved.
"""

TENANT_ID: str | None
Expand All @@ -35,6 +38,7 @@ class AgentAuthConfiguration:
AUTHORITY: str | None
ALT_BLUEPRINT_ID: str | None
AZURE_REGION: str | None
IDPM_RESOURCE: str | None
ANONYMOUS_ALLOWED: bool = False

# Multi-connection support: Maintains a map of all configured connections
Expand All @@ -58,6 +62,7 @@ def __init__(
authority: str | None = None,
scopes: list[str] | None = None,
azure_region: str | None = None,
idpm_resource: str | None = None,
anonymous_allowed: bool = False,
**kwargs: str,
):
Expand All @@ -80,6 +85,9 @@ def __init__(
or kwargs.get("AZUREREGION", None)
or kwargs.get("REGIONALAUTHORITY", None)
)
# Resource URL for Identity Proxy Manager (IDPM) token acquisition.
# Only meaningful when AUTH_TYPE is AuthTypes.identity_proxy_manager.
self.IDPM_RESOURCE = idpm_resource or kwargs.get("IDPMRESOURCE", None)
self.ALT_BLUEPRINT_ID = kwargs.get("ALT_BLUEPRINT_NAME", None)
self.ANONYMOUS_ALLOWED = anonymous_allowed or kwargs.get(
"ANONYMOUS_ALLOWED", False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class AuthTypes(str, Enum):
user_managed_identity = "UserManagedIdentity"
system_managed_identity = "SystemManagedIdentity"
federated_credentials = "FederatedCredentials"
identity_proxy_manager = "IdentityProxyManager"
94 changes: 94 additions & 0 deletions tests/authentication_msal/test_msal_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,100 @@ def test_create_client_application_azure_region_defaults_none(self, mocker):
assert mock_cca.call_args.kwargs["azure_region"] is None


class TestMsalAuthIdentityProxyManager:
"""
Test suite for the Identity Proxy Manager (IDPM) authentication type.
"""

def test_resolve_idpm_resource_defaults_when_unset(self):
config = AgentAuthConfiguration(
auth_type=AuthTypes.identity_proxy_manager,
client_id="test-client-id",
)
assert (
MsalAuth._resolve_idpm_resource(config)
== "api://AzureAdTokenExchange/.default"
)

def test_resolve_idpm_resource_uses_custom_resource(self):
config = AgentAuthConfiguration(
auth_type=AuthTypes.identity_proxy_manager,
client_id="test-client-id",
idpm_resource="https://custom-resource/.default",
)
assert (
MsalAuth._resolve_idpm_resource(config)
== "https://custom-resource/.default"
)

def test_resolve_idpm_resource_raises_on_invalid_uri(self):
config = AgentAuthConfiguration(
auth_type=AuthTypes.identity_proxy_manager,
client_id="test-client-id",
idpm_resource="not-a-valid-uri",
)
with pytest.raises(ValueError):
MsalAuth._resolve_idpm_resource(config)

def test_create_client_application_returns_managed_identity_client(self, mocker):
config = AgentAuthConfiguration(
auth_type=AuthTypes.identity_proxy_manager,
client_id="test-client-id",
)
mock_mic = mocker.patch(
"microsoft_agents.authentication.msal.msal_auth.ManagedIdentityClient"
)
mock_umi = mocker.patch(
"microsoft_agents.authentication.msal.msal_auth.UserAssignedManagedIdentity"
)
auth = MsalAuth(config)
auth._create_client_application()
mock_umi.assert_called_once_with(client_id="test-client-id")
mock_mic.assert_called_once()

@pytest.mark.asyncio
async def test_get_agentic_application_token_identity_proxy_manager(self, mocker):
config = AgentAuthConfiguration(
auth_type=AuthTypes.identity_proxy_manager,
client_id="test-client-id",
idpm_resource="https://custom-resource/.default",
)
auth = MsalAuth(config)

mock_client = mocker.Mock(spec=ManagedIdentityClient)
mock_client.acquire_token_for_client.return_value = {
"access_token": "idpm-token"
}
mocker.patch.object(auth, "_get_client", return_value=mock_client)

token = await auth.get_agentic_application_token(
"test-tenant-id", "test-agent-app-instance-id"
)

assert token == "idpm-token"
mock_client.acquire_token_for_client.assert_called_once_with(
resource="https://custom-resource/.default"
)

@pytest.mark.asyncio
async def test_get_agentic_application_token_unsupported_client_raises(
self, mocker
):
config = AgentAuthConfiguration(
auth_type=AuthTypes.user_managed_identity,
client_id="test-client-id",
)
auth = MsalAuth(config)

mock_client = mocker.Mock(spec=ManagedIdentityClient)
mocker.patch.object(auth, "_get_client", return_value=mock_client)

with pytest.raises(RuntimeError):
await auth.get_agentic_application_token(
"test-tenant-id", "test-agent-app-instance-id"
)


# class TestMsalAuthAgentic:

# @pytest.mark.asyncio
Expand Down
19 changes: 19 additions & 0 deletions tests/hosting_core/test_auth_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,22 @@ def test_azure_region_prefers_azure_region_over_legacy(self):
AZUREREGION="eastus", REGIONALAUTHORITY="westeurope"
)
assert auth_config.AZURE_REGION == "eastus"

def test_idpm_resource_defaults_none(self):
auth_config = AgentAuthConfiguration()
assert auth_config.IDPM_RESOURCE is None

def test_idpm_resource_from_parameter(self):
auth_config = AgentAuthConfiguration(
auth_type=AuthTypes.identity_proxy_manager,
client_id="test-client-id",
idpm_resource="https://custom-resource/.default",
)
assert auth_config.AUTH_TYPE == AuthTypes.identity_proxy_manager
assert auth_config.IDPM_RESOURCE == "https://custom-resource/.default"

def test_idpm_resource_from_kwargs(self):
auth_config = AgentAuthConfiguration(
IDPMRESOURCE="https://custom-resource/.default"
)
assert auth_config.IDPM_RESOURCE == "https://custom-resource/.default"
Loading