From 927017f2758fa24806affc01ec0e7d5bccd7f378 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Sat, 30 May 2026 14:07:19 +0530 Subject: [PATCH 1/4] fix: support REST auth configuration from environment variables --- pyiceberg/catalog/rest/__init__.py | 28 +++++++- tests/catalog/test_rest.py | 110 +++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index d085c6fd87..4f32d30dd5 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import json from collections import deque from enum import Enum from typing import ( @@ -435,7 +436,32 @@ def _create_session(self) -> Session: elif ssl_client_cert := ssl_client.get(CERT): session.cert = ssl_client_cert - if auth_config := self.properties.get(AUTH): + raw_auth = self.properties.get(AUTH) + if isinstance(raw_auth, str): + try: + auth_config: dict[str, Any] | None = json.loads(raw_auth) + except json.JSONDecodeError as e: + raise ValueError("Failed to parse auth configuration as JSON") from e + elif raw_auth is not None: + auth_config = raw_auth + elif auth_type := self.properties.get(f"{AUTH}.type"): + type_prefix = f"{AUTH}.{auth_type}." + auth_config = { + "type": auth_type, + "impl": self.properties.get(f"{AUTH}.impl"), + auth_type: { + key[len(type_prefix) :].replace("-", "_"): value + for key, value in self.properties.items() + if key.startswith(type_prefix) + }, + } + else: + auth_config = None + + if auth_config is not None and not isinstance(auth_config, dict): + raise ValueError("auth configuration must be a dictionary") + + if auth_config: auth_type = auth_config.get("type") if auth_type is None: raise ValueError("auth.type must be defined") diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 1eb9f26a56..91b9a4a7fc 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -18,6 +18,7 @@ from __future__ import annotations import base64 +import json import os from collections.abc import Callable from typing import Any, cast @@ -2470,6 +2471,115 @@ def test_rest_catalog_oauth2_non_200_token_response(requests_mock: Mocker) -> No RestCatalog("rest", **catalog_properties) # type: ignore +def _rest_catalog_properties_from_environment() -> RecursiveDict: + env_config = Config._from_environment_variables({}) + catalogs = cast(RecursiveDict, env_config["catalog"]) + return cast(RecursiveDict, catalogs["rest"]) + + +@mock.patch.dict( + os.environ, + { + "PYICEBERG_CATALOG__REST__URI": TEST_URI, + "PYICEBERG_CATALOG__REST__AUTH": json.dumps({"type": "basic", "basic": {"username": "one", "password": "two"}}), + }, + clear=True, +) +def test_rest_catalog_with_basic_auth_json_environment_variable(rest_mock: Mocker) -> None: + rest_mock.get(f"{TEST_URI}v1/config", json={"defaults": {}, "overrides": {}}, status_code=200) + + RestCatalog("rest", **_rest_catalog_properties_from_environment()) # type: ignore + + encoded_user_pass = base64.b64encode(b"one:two").decode() + assert rest_mock.last_request.headers["Authorization"] == f"Basic {encoded_user_pass}" + + +@mock.patch.dict( + os.environ, + { + "PYICEBERG_CATALOG__REST__URI": TEST_URI, + "PYICEBERG_CATALOG__REST__AUTH": json.dumps( + { + "type": "oauth2", + "oauth2": { + "client_id": "some_client_id", + "client_secret": "some_client_secret", + "token_url": f"{TEST_URI}oauth2/token", + }, + } + ), + }, + clear=True, +) +def test_rest_catalog_with_oauth2_auth_json_environment_variable(requests_mock: Mocker) -> None: + requests_mock.post( + f"{TEST_URI}oauth2/token", + json={"access_token": TEST_TOKEN, "token_type": "Bearer", "expires_in": 3600}, + status_code=200, + ) + requests_mock.get(f"{TEST_URI}v1/config", json={"defaults": {}, "overrides": {}}, status_code=200) + + catalog = RestCatalog("rest", **_rest_catalog_properties_from_environment()) # type: ignore + + assert catalog.uri == TEST_URI + + +@mock.patch.dict( + os.environ, + { + "PYICEBERG_CATALOG__REST__URI": TEST_URI, + "PYICEBERG_CATALOG__REST__AUTH": "not-valid-json", + }, + clear=True, +) +def test_rest_catalog_with_invalid_json_auth_environment_variable() -> None: + with pytest.raises(ValueError, match="Failed to parse auth configuration as JSON"): + RestCatalog("rest", **_rest_catalog_properties_from_environment()) # type: ignore + + +@mock.patch.dict( + os.environ, + { + "PYICEBERG_CATALOG__REST__URI": TEST_URI, + "PYICEBERG_CATALOG__REST__AUTH__TYPE": "basic", + "PYICEBERG_CATALOG__REST__AUTH__BASIC__USERNAME": "one", + "PYICEBERG_CATALOG__REST__AUTH__BASIC__PASSWORD": "two", + }, + clear=True, +) +def test_rest_catalog_with_basic_auth_flat_environment_variables(rest_mock: Mocker) -> None: + rest_mock.get(f"{TEST_URI}v1/config", json={"defaults": {}, "overrides": {}}, status_code=200) + + RestCatalog("rest", **_rest_catalog_properties_from_environment()) # type: ignore + + encoded_user_pass = base64.b64encode(b"one:two").decode() + assert rest_mock.last_request.headers["Authorization"] == f"Basic {encoded_user_pass}" + + +@mock.patch.dict( + os.environ, + { + "PYICEBERG_CATALOG__REST__URI": TEST_URI, + "PYICEBERG_CATALOG__REST__AUTH__TYPE": "oauth2", + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__CLIENT_ID": "some_client_id", + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__CLIENT_SECRET": "some_client_secret", + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__TOKEN_URL": f"{TEST_URI}oauth2/token", + }, + clear=True, +) +def test_rest_catalog_with_oauth2_auth_flat_environment_variables(requests_mock: Mocker) -> None: + requests_mock.post( + f"{TEST_URI}oauth2/token", + json={"access_token": TEST_TOKEN, "token_type": "Bearer", "expires_in": 3600}, + status_code=200, + ) + requests_mock.get(f"{TEST_URI}v1/config", json={"defaults": {}, "overrides": {}}, status_code=200) + + catalog = RestCatalog("rest", **_rest_catalog_properties_from_environment()) # type: ignore + + assert catalog.uri == TEST_URI + + EXAMPLE_ENV = {"PYICEBERG_CATALOG__PRODUCTION__URI": TEST_URI} From 7403f51ee95712a5dc172a3ccabfdf31ac6e6c6b Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Sat, 30 May 2026 14:46:45 +0530 Subject: [PATCH 2/4] chore: retrigger CI From 41edaefed7fa9dfb44c4b8a6bb28830456cdbdce Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Mon, 1 Jun 2026 09:36:09 +0530 Subject: [PATCH 3/4] test: cover typed flat rest auth env vars --- pyiceberg/catalog/rest/__init__.py | 68 +++++++++++++++++++++++++++- tests/catalog/test_rest.py | 71 ++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 4f32d30dd5..75c3d84cb8 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -16,12 +16,18 @@ # under the License. from __future__ import annotations +import importlib import json from collections import deque from enum import Enum +from types import UnionType from typing import ( TYPE_CHECKING, Any, + Union, + get_args, + get_origin, + get_type_hints, ) from urllib.parse import quote, unquote @@ -397,6 +403,64 @@ class ListViewsResponse(IcebergBaseModel): _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse) +def _get_auth_manager_class(class_or_name: str) -> type[AuthManager]: + if class_or_name in AuthManagerFactory._registry: + return AuthManagerFactory._registry[class_or_name] + + try: + module_path, class_name = class_or_name.rsplit(".", 1) + module = importlib.import_module(module_path) + return getattr(module, class_name) + except Exception as err: + raise ValueError(f"Could not load AuthManager class for '{class_or_name}'") from err + + +def _coerce_auth_option_value(key: str, value: Any, annotation: Any) -> Any: + if not isinstance(value, str): + return value + + origin = get_origin(annotation) + if origin is list: + try: + parsed = json.loads(value) + except json.JSONDecodeError as err: + raise ValueError(f"Failed to parse auth configuration value '{key}' as JSON array") from err + + if not isinstance(parsed, list) or not all(isinstance(item, str) for item in parsed): + raise ValueError(f"auth configuration value '{key}' must be a JSON array of strings") + return parsed + + if origin in (Union, UnionType): + non_none_args = [arg for arg in get_args(annotation) if arg is not type(None)] + if len(non_none_args) == 1: + return _coerce_auth_option_value(key, value, non_none_args[0]) + + if origin is not None: + if origin is list: + try: + parsed = json.loads(value) + except json.JSONDecodeError as err: + raise ValueError(f"Failed to parse auth configuration value '{key}' as JSON array") from err + + if not isinstance(parsed, list) or not all(isinstance(item, str) for item in parsed): + raise ValueError(f"auth configuration value '{key}' must be a JSON array of strings") + return parsed + + if annotation is int: + try: + return int(value) + except ValueError as err: + raise ValueError(f"Failed to parse auth configuration value '{key}' as integer") from err + + return value + + +def _coerce_auth_config_values(class_or_name: str, config: dict[str, Any]) -> dict[str, Any]: + manager_class = _get_auth_manager_class(class_or_name) + hints = get_type_hints(manager_class.__init__) + return {key: _coerce_auth_option_value(key, value, hints.get(key, Any)) for key, value in config.items()} + + class RestCatalog(Catalog): uri: str _session: Session @@ -467,6 +531,7 @@ def _create_session(self) -> Session: raise ValueError("auth.type must be defined") auth_type_config = auth_config.get(auth_type, {}) auth_impl = auth_config.get("impl") + auth_manager_name = auth_impl or auth_type if auth_type == CUSTOM and not auth_impl: raise ValueError("auth.impl must be specified when using custom auth.type") @@ -474,7 +539,8 @@ def _create_session(self) -> Session: if auth_type != CUSTOM and auth_impl: raise ValueError("auth.impl can only be specified when using custom auth.type") - self._auth_manager = AuthManagerFactory.create(auth_impl or auth_type, auth_type_config) + typed_auth_type_config = _coerce_auth_config_values(auth_manager_name, auth_type_config) + self._auth_manager = AuthManagerFactory.create(auth_manager_name, typed_auth_type_config) session.auth = AuthManagerAdapter(self._auth_manager) else: self._auth_manager = self._create_legacy_oauth2_auth_manager(session) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 91b9a4a7fc..b6fecb049a 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -2580,6 +2580,77 @@ def test_rest_catalog_with_oauth2_auth_flat_environment_variables(requests_mock: assert catalog.uri == TEST_URI +@pytest.mark.parametrize( + "auth_type, env_overrides, expected_config", + [ + pytest.param( + "oauth2", + { + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__CLIENT_ID": "some_client_id", + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__CLIENT_SECRET": "some_client_secret", + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__TOKEN_URL": f"{TEST_URI}oauth2/token", + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__REFRESH_MARGIN": "90", + "PYICEBERG_CATALOG__REST__AUTH__OAUTH2__EXPIRES_IN": "3600", + }, + { + "client_id": "some_client_id", + "client_secret": "some_client_secret", + "token_url": f"{TEST_URI}oauth2/token", + "refresh_margin": 90, + "expires_in": 3600, + }, + id="oauth2-numeric-fields", + ), + pytest.param( + "google", + { + "PYICEBERG_CATALOG__REST__AUTH__GOOGLE__CREDENTIALS_PATH": "/fake/path.json", + "PYICEBERG_CATALOG__REST__AUTH__GOOGLE__SCOPES": '["scope-a", "scope-b"]', + }, + { + "credentials_path": "/fake/path.json", + "scopes": ["scope-a", "scope-b"], + }, + id="google-scopes", + ), + pytest.param( + "entra", + { + "PYICEBERG_CATALOG__REST__AUTH__ENTRA__SCOPES": '["scope-a", "scope-b"]', + }, + { + "scopes": ["scope-a", "scope-b"], + }, + id="entra-scopes", + ), + ], +) +def test_rest_catalog_with_typed_auth_flat_environment_variables( + rest_mock: Mocker, + auth_type: str, + env_overrides: dict[str, str], + expected_config: dict[str, Any], +) -> None: + rest_mock.get(f"{TEST_URI}v1/config", json={"defaults": {}, "overrides": {}}, status_code=200) + + fake_auth_manager = mock.Mock() + fake_auth_manager.auth_header.return_value = "" + env = { + "PYICEBERG_CATALOG__REST__URI": TEST_URI, + "PYICEBERG_CATALOG__REST__AUTH__TYPE": auth_type, + **env_overrides, + } + + with ( + mock.patch.dict(os.environ, env, clear=True), + mock.patch("pyiceberg.catalog.rest.AuthManagerFactory.create", return_value=fake_auth_manager) as create_auth_manager, + ): + catalog = RestCatalog("rest", **_rest_catalog_properties_from_environment()) # type: ignore + + assert catalog.uri == TEST_URI + assert create_auth_manager.call_args_list == [mock.call(auth_type, expected_config), mock.call(auth_type, expected_config)] + + EXAMPLE_ENV = {"PYICEBERG_CATALOG__PRODUCTION__URI": TEST_URI} From 81f723552bbf0ae3ca2d562d44e265e05000dd72 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Thu, 4 Jun 2026 02:33:49 +0530 Subject: [PATCH 4/4] refactor(rest): add typed auth config parsing helpers --- pyiceberg/catalog/rest/__init__.py | 118 +++++++++++++++++++---------- 1 file changed, 77 insertions(+), 41 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 75c3d84cb8..f5cab70080 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -34,7 +34,7 @@ from pydantic import ConfigDict, Field, TypeAdapter, field_validator from requests import HTTPError, Session from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt -from typing_extensions import override +from typing_extensions import NotRequired, TypedDict, override from pyiceberg import __version__ from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary @@ -403,6 +403,17 @@ class ListViewsResponse(IcebergBaseModel): _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse) +class ParsedAuthConfig(TypedDict): + auth_type: str + auth_manager_name: str + auth_type_config: dict[str, Any] + + +class AuthConfigEnvelope(TypedDict): + type: str + impl: NotRequired[str] + + def _get_auth_manager_class(class_or_name: str) -> type[AuthManager]: if class_or_name in AuthManagerFactory._registry: return AuthManagerFactory._registry[class_or_name] @@ -461,6 +472,65 @@ def _coerce_auth_config_values(class_or_name: str, config: dict[str, Any]) -> di return {key: _coerce_auth_option_value(key, value, hints.get(key, Any)) for key, value in config.items()} +def _load_auth_config_from_properties(properties: Properties) -> AuthConfigEnvelope | dict[str, Any] | None: + raw_auth = properties.get(AUTH) + if isinstance(raw_auth, str): + try: + decoded_auth = json.loads(raw_auth) + except json.JSONDecodeError as e: + raise ValueError("Failed to parse auth configuration as JSON") from e + if decoded_auth is not None and not isinstance(decoded_auth, dict): + raise ValueError("auth configuration must be a dictionary") + return decoded_auth + + if raw_auth is not None: + if not isinstance(raw_auth, dict): + raise ValueError("auth configuration must be a dictionary") + return raw_auth + + if auth_type := properties.get(f"{AUTH}.type"): + type_prefix = f"{AUTH}.{auth_type}." + return { + "type": auth_type, + "impl": properties.get(f"{AUTH}.impl"), + auth_type: { + key[len(type_prefix) :].replace("-", "_"): value + for key, value in properties.items() + if key.startswith(type_prefix) + }, + } + + return None + + +def _resolve_auth_config(auth_config: AuthConfigEnvelope | dict[str, Any]) -> ParsedAuthConfig: + auth_type = auth_config.get("type") + if not isinstance(auth_type, str): + raise ValueError("auth.type must be defined") + + auth_type_config = auth_config.get(auth_type, {}) + if not isinstance(auth_type_config, dict): + raise ValueError(f"auth.{auth_type} must be a dictionary") + + auth_impl = auth_config.get("impl") + if auth_impl is not None and not isinstance(auth_impl, str): + raise ValueError("auth.impl must be a string") + + auth_manager_name = auth_impl or auth_type + + if auth_type == CUSTOM and not auth_impl: + raise ValueError("auth.impl must be specified when using custom auth.type") + + if auth_type != CUSTOM and auth_impl: + raise ValueError("auth.impl can only be specified when using custom auth.type") + + return { + "auth_type": auth_type, + "auth_manager_name": auth_manager_name, + "auth_type_config": auth_type_config, + } + + class RestCatalog(Catalog): uri: str _session: Session @@ -500,47 +570,13 @@ def _create_session(self) -> Session: elif ssl_client_cert := ssl_client.get(CERT): session.cert = ssl_client_cert - raw_auth = self.properties.get(AUTH) - if isinstance(raw_auth, str): - try: - auth_config: dict[str, Any] | None = json.loads(raw_auth) - except json.JSONDecodeError as e: - raise ValueError("Failed to parse auth configuration as JSON") from e - elif raw_auth is not None: - auth_config = raw_auth - elif auth_type := self.properties.get(f"{AUTH}.type"): - type_prefix = f"{AUTH}.{auth_type}." - auth_config = { - "type": auth_type, - "impl": self.properties.get(f"{AUTH}.impl"), - auth_type: { - key[len(type_prefix) :].replace("-", "_"): value - for key, value in self.properties.items() - if key.startswith(type_prefix) - }, - } - else: - auth_config = None - - if auth_config is not None and not isinstance(auth_config, dict): - raise ValueError("auth configuration must be a dictionary") - + auth_config = _load_auth_config_from_properties(self.properties) if auth_config: - auth_type = auth_config.get("type") - if auth_type is None: - raise ValueError("auth.type must be defined") - auth_type_config = auth_config.get(auth_type, {}) - auth_impl = auth_config.get("impl") - auth_manager_name = auth_impl or auth_type - - if auth_type == CUSTOM and not auth_impl: - raise ValueError("auth.impl must be specified when using custom auth.type") - - if auth_type != CUSTOM and auth_impl: - raise ValueError("auth.impl can only be specified when using custom auth.type") - - typed_auth_type_config = _coerce_auth_config_values(auth_manager_name, auth_type_config) - self._auth_manager = AuthManagerFactory.create(auth_manager_name, typed_auth_type_config) + resolved_auth = _resolve_auth_config(auth_config) + typed_auth_type_config = _coerce_auth_config_values( + resolved_auth["auth_manager_name"], resolved_auth["auth_type_config"] + ) + self._auth_manager = AuthManagerFactory.create(resolved_auth["auth_manager_name"], typed_auth_type_config) session.auth = AuthManagerAdapter(self._auth_manager) else: self._auth_manager = self._create_legacy_oauth2_auth_manager(session)