diff --git a/pyproject.toml b/pyproject.toml index 1af967046f..7b425e92bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ # go/keep-sorted start "PyYAML>=6.0.2, <7.0.0", # For APIHubToolset. "aiosqlite>=0.21.0", # For SQLite database + "agentic_sandbox @ git+https://github.com/kubernetes-sigs/agent-sandbox.git@dbac66ecba5497ac493ca5e4ab5e0fcb1c945134#subdirectory=clients/python/agentic-sandbox-client", # For Agent Sandboxed Code Execution "anyio>=4.9.0, <5.0.0", # For MCP Session Manager "authlib>=1.5.1, <2.0.0", # For RestAPI Tool "click>=8.1.8, <9.0.0", # For CLI tools diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index d07253c267..9434fa4b01 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -17,6 +17,7 @@ import logging import uuid +from agentic_sandbox import SandboxClient import kubernetes as k8s from kubernetes.watch import Watch @@ -36,9 +37,19 @@ class GkeCodeExecutor(BaseCodeExecutor): """Executes Python code in a secure gVisor-sandboxed Pod on GKE. - This executor securely runs code by dynamically creating a Kubernetes Job for - each execution request. The user's code is mounted via a ConfigMap, and the - Pod is hardened with a strict security context and resource limits. + This executor supports two modes of execution: 'job' and 'sandbox'. + + Job Mode (default): + Securely runs code by dynamically creating a Kubernetes Job for each execution + request. The user's code is mounted via a ConfigMap, and the Pod is hardened + with a strict security context and resource limits. + + Sandbox Mode: + Executes code using the Agent Sandbox Client. This mode requires additional + infrastructure to be deployed in the cluster, specifically: + - Agent-sandbox controller + - Sandbox templates (e.g., python-sandbox-template) + - Sandbox router and gateway Key Features: - Sandboxed execution using the gVisor runtime. @@ -70,6 +81,8 @@ class GkeCodeExecutor(BaseCodeExecutor): namespace: str = "default" image: str = "python:3.11-slim" timeout_seconds: int = 300 + executor_type: str = "job" # "job" or "sandbox" + sandbox_gateway_name: str | None = None cpu_requested: str = "200m" mem_requested: str = "256Mi" # The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core. @@ -79,11 +92,17 @@ class GkeCodeExecutor(BaseCodeExecutor): kubeconfig_path: str | None = None kubeconfig_context: str | None = None + # Sandbox constants + sandbox_template: str = "python-sandbox-template" + _batch_v1: k8s.client.BatchV1Api _core_v1: k8s.client.CoreV1Api def __init__( self, + executor_type: str = "job", + sandbox_gateway_name: str | None = None, + sandbox_template: str = "python-sandbox-template", kubeconfig_path: str | None = None, kubeconfig_context: str | None = None, **data, @@ -96,9 +115,17 @@ def __init__( 3. Automatically via the default local kubeconfig file (~/.kube/config). """ super().__init__(**data) + self.executor_type = executor_type + self.sandbox_gateway_name = sandbox_gateway_name + self.sandbox_template = sandbox_template self.kubeconfig_path = kubeconfig_path self.kubeconfig_context = kubeconfig_context + if executor_type not in ["job", "sandbox"]: + raise ValueError( + f"Invalid executor_type: '{executor_type}'. Must be 'job' or" + " 'sandbox'." + ) if self.kubeconfig_path: try: logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.") @@ -136,10 +163,28 @@ def __init__( self._batch_v1 = client.BatchV1Api() self._core_v1 = client.CoreV1Api() - def execute_code( - self, - invocation_context: InvocationContext, - code_execution_input: CodeExecutionInput, + def _execute_in_sandbox(self, code: str) -> CodeExecutionResult: + """Executes code using Agent Sandbox Client.""" + try: + with SandboxClient( + template_name=self.sandbox_template, + gateway_name=self.sandbox_gateway_name, + namespace=self.namespace, + ) as sandbox: + # Execute the code as a python script + logger.debug("Executing code in sandbox:\n```\n%s\n```", code) + sandbox.write("script.py", code) + result = sandbox.run("python3 script.py") + + return CodeExecutionResult(stdout=result.stdout) + except Exception as e: + logger.error("Sandbox execution failed", exc_info=True) + return CodeExecutionResult( + stderr=f"Sandbox execution failed: {str(e)}", + ) + + def _execute_as_job( + self, code: str, invocation_context: InvocationContext ) -> CodeExecutionResult: """Orchestrates the secure execution of a code snippet on GKE.""" job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" @@ -150,7 +195,7 @@ def execute_code( # 1. Create a ConfigMap to mount LLM-generated code into the Pod. # 2. Create a Job that runs the code from the ConfigMap. # 3. Set the Job as the ConfigMap's owner for automatic cleanup. - self._create_code_configmap(configmap_name, code_execution_input.code) + self._create_code_configmap(configmap_name, code) job_manifest = self._create_job_manifest( job_name, configmap_name, invocation_context ) @@ -162,7 +207,7 @@ def execute_code( logger.info( f"Submitted Job '{job_name}' to namespace '{self.namespace}'." ) - logger.debug("Executing code:\n```\n%s\n```", code_execution_input.code) + logger.debug("Executing code:\n```\n%s\n```", code) return self._watch_job_completion(job_name) except ApiException as e: @@ -186,6 +231,19 @@ def execute_code( stderr=f"An unexpected executor error occurred: {e}" ) + def execute_code( + self, + invocation_context: InvocationContext, + code_execution_input: CodeExecutionInput, + ) -> CodeExecutionResult: + """Overrides the base method to route execution based on executor_type.""" + code = code_execution_input.code + if self.executor_type == "sandbox": + return self._execute_in_sandbox(code) + else: + # Fallback to existing GKE Job logic + return self._execute_as_job(code, invocation_context) + def _create_job_manifest( self, job_name: str, diff --git a/tests/unittests/code_executors/test_gke_code_executor.py b/tests/unittests/code_executors/test_gke_code_executor.py index 5ef99792f3..c47d964f2a 100644 --- a/tests/unittests/code_executors/test_gke_code_executor.py +++ b/tests/unittests/code_executors/test_gke_code_executor.py @@ -71,6 +71,7 @@ def test_init_defaults(self): assert executor.timeout_seconds == 300 assert executor.cpu_requested == "200m" assert executor.mem_limit == "512Mi" + assert executor.executor_type == "job" def test_init_with_overrides(self): """Tests that class attributes can be overridden at instantiation.""" @@ -79,11 +80,19 @@ def test_init_with_overrides(self): image="custom-python:latest", timeout_seconds=60, cpu_limit="1000m", + executor_type="sandbox", ) assert executor.namespace == "test-ns" assert executor.image == "custom-python:latest" assert executor.timeout_seconds == 60 assert executor.cpu_limit == "1000m" + assert executor.executor_type == "sandbox" + assert executor.sandbox_template == "python-sandbox-template" + + def test_init_invalid_executor_type(self): + """Tests that init raises ValueError for invalid executor_type.""" + with pytest.raises(ValueError, match="Invalid executor_type"): + GkeCodeExecutor(executor_type="invalid_type") @patch("google.adk.code_executors.gke_code_executor.Watch") def test_execute_code_success( @@ -225,3 +234,100 @@ def test_create_job_manifest_structure(self, mock_invocation_context): assert sec_context.allow_privilege_escalation is False assert sec_context.read_only_root_filesystem is True assert sec_context.capabilities.drop == ["ALL"] + + @patch("google.adk.code_executors.gke_code_executor.SandboxClient") + def test_execute_code_forks_to_sandbox( + self, + mock_sandbox_client, + mock_invocation_context, + mock_k8s_clients, + ): + """Tests that execute_code uses SandboxClient when executor_type='sandbox'.""" + # Setup Sandbox mock + mock_sandbox_instance = ( + mock_sandbox_client.return_value.__enter__.return_value + ) + mock_run_result = MagicMock() + mock_run_result.stdout = "sandbox stdout" + mock_run_result.stderr = None + mock_sandbox_instance.run.return_value = mock_run_result + + # Instantiate with sandbox type + executor = GkeCodeExecutor(executor_type="sandbox") + code_input = CodeExecutionInput(code='print("sandbox")') + + # Execute + result = executor.execute_code(mock_invocation_context, code_input) + + # Assertions + assert result.stdout == "sandbox stdout" + + # Verify SandboxClient was used + mock_sandbox_client.assert_called_once() + mock_sandbox_instance.run.assert_called_once() + + # Verify Job path was NOT taken + mock_k8s_clients["batch_v1"].create_namespaced_job.assert_not_called() + + @patch("google.adk.code_executors.gke_code_executor.SandboxClient") + def test_execute_code_sandbox_exception( + self, + mock_sandbox_client, + mock_invocation_context, + ): + """Tests handling of exceptions from SandboxClient.""" + # Setup Sandbox mock to raise exception + mock_sandbox_client.return_value.__enter__.side_effect = Exception( + "Connection failed" + ) + + # Instantiate with sandbox type + executor = GkeCodeExecutor(executor_type="sandbox") + code_input = CodeExecutionInput(code='print("sandbox")') + + # Execute + result = executor.execute_code(mock_invocation_context, code_input) + + # Assertions + assert result.stdout == "" + assert "Sandbox execution failed: Connection failed" in result.stderr + + @patch("google.adk.code_executors.gke_code_executor.SandboxClient") + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_forks_to_job( + self, + mock_watch, + mock_sandbox_client, + mock_invocation_context, + mock_k8s_clients, + ): + """Tests that execute_code uses K8s Job when executor_type='job'.""" + # Setup K8s Job mocks (success path) + mock_job = MagicMock() + mock_job.status.succeeded = True + mock_watch.return_value.stream.return_value = [{"object": mock_job}] + + mock_pod = MagicMock() + mock_pod.metadata.name = "pod-1" + mock_k8s_clients["core_v1"].list_namespaced_pod.return_value.items = [ + mock_pod + ] + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "job stdout" + ) + + # Instantiate with job type + executor = GkeCodeExecutor(executor_type="job") + code_input = CodeExecutionInput(code='print("job")') + + # Execute + result = executor.execute_code(mock_invocation_context, code_input) + + # Assertions + assert result.stdout == "job stdout" + + # Verify Job path WAS taken + mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once() + + # Verify SandboxClient was NOT used + mock_sandbox_client.assert_not_called()