diff --git a/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py b/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py index ca7398733f..5a71833119 100644 --- a/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py +++ b/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py @@ -31,7 +31,7 @@ def resolve_file_path( session_state: Optional[Dict[str, Any]] = None, working_directory: Optional[str] = None, ) -> Path: - """Resolve a file path using root directory from session state. + """Resolve a file path under the root directory from session state. This is a helper function that other tools can use to resolve file paths without needing to be async or return detailed resolution information. @@ -43,32 +43,28 @@ def resolve_file_path( Returns: Resolved absolute Path object + + Raises: + ValueError: If the resolved file path escapes the session root directory. """ normalized_path = sanitize_generated_file_path(file_path) file_path_obj = Path(normalized_path) + resolved_root = _resolve_root_directory(session_state, working_directory) - # If already absolute, use as-is if file_path_obj.is_absolute(): - return file_path_obj - - # Get root directory from session state, default to "./" - root_directory = "./" - if session_state and "root_directory" in session_state: - root_directory = session_state["root_directory"] - - # Use the same resolution logic as the main function - root_path_obj = Path(root_directory) - - if root_path_obj.is_absolute(): - resolved_root = root_path_obj + resolved_path = file_path_obj.resolve() else: - if working_directory: - resolved_root = Path(working_directory) / root_directory - else: - resolved_root = Path(os.getcwd()) / root_directory + resolved_path = (resolved_root / file_path_obj).resolve() - # Resolve file path relative to root directory - return resolved_root / file_path_obj + try: + resolved_path.relative_to(resolved_root) + except ValueError as exc: + raise ValueError( + f"Resolved path escapes project root: {resolved_path} is not under " + f"{resolved_root}" + ) from exc + + return resolved_path def resolve_file_paths( @@ -90,3 +86,22 @@ def resolve_file_paths( resolve_file_path(path, session_state, working_directory) for path in file_paths ] + + +def _resolve_root_directory( + session_state: Optional[Dict[str, Any]] = None, + working_directory: Optional[str] = None, +) -> Path: + """Resolve the trusted project root used for builder file operations.""" + root_directory = "./" + if session_state and "root_directory" in session_state: + root_directory = session_state["root_directory"] + + root_path_obj = Path(root_directory) + if root_path_obj.is_absolute(): + return root_path_obj.resolve() + + base_directory = ( + Path(working_directory) if working_directory else Path(os.getcwd()) + ) + return (base_directory / root_path_obj).resolve() diff --git a/tests/unittests/cli/built_in_agents/test_builder_file_paths.py b/tests/unittests/cli/built_in_agents/test_builder_file_paths.py new file mode 100644 index 0000000000..453429733b --- /dev/null +++ b/tests/unittests/cli/built_in_agents/test_builder_file_paths.py @@ -0,0 +1,94 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from google.adk.cli.built_in_agents.tools.delete_files import delete_files +from google.adk.cli.built_in_agents.tools.read_files import read_files +from google.adk.cli.built_in_agents.tools.write_files import write_files +from google.adk.cli.built_in_agents.utils.resolve_root_directory import ( + resolve_file_path, +) + + +def _tool_context(root_directory): + return SimpleNamespace( + _invocation_context=SimpleNamespace( + session=SimpleNamespace(state={"root_directory": str(root_directory)}) + ) + ) + + +def test_resolve_file_path_allows_paths_inside_project_root(tmp_path): + project_root = tmp_path / "project" + project_root.mkdir() + + assert ( + resolve_file_path("tools/agent.py", {"root_directory": str(project_root)}) + == project_root / "tools" / "agent.py" + ) + assert ( + resolve_file_path( + str(project_root / "root_agent.yaml"), + {"root_directory": str(project_root)}, + ) + == project_root / "root_agent.yaml" + ) + + +@pytest.mark.parametrize("path", ["../outside.txt", "/tmp/outside.txt"]) +def test_resolve_file_path_rejects_paths_outside_project_root(tmp_path, path): + project_root = tmp_path / "project" + project_root.mkdir() + + with pytest.raises(ValueError, match="escapes project root"): + resolve_file_path(path, {"root_directory": str(project_root)}) + + +def test_resolve_file_path_rejects_symlink_escape(tmp_path): + project_root = tmp_path / "project" + outside_dir = tmp_path / "outside" + project_root.mkdir() + outside_dir.mkdir() + (project_root / "linked").symlink_to(outside_dir, target_is_directory=True) + + with pytest.raises(ValueError, match="escapes project root"): + resolve_file_path( + "linked/agent.py", + {"root_directory": str(project_root)}, + ) + + +@pytest.mark.asyncio +async def test_builder_file_tools_do_not_access_paths_outside_project_root( + tmp_path, +): + project_root = tmp_path / "project" + project_root.mkdir() + outside_file = tmp_path / "outside.txt" + outside_file.write_text("secret", encoding="utf-8") + tool_context = _tool_context(project_root) + + read_result = await read_files([str(outside_file)], tool_context) + write_result = await write_files({str(outside_file): "changed"}, tool_context) + delete_result = await delete_files([str(outside_file)], tool_context) + + assert not read_result["success"] + assert not write_result["success"] + assert not delete_result["success"] + assert outside_file.read_text(encoding="utf-8") == "secret"