From 740b834b3089c9be4345a2024a04cfa21f9d62aa Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 12 May 2026 16:33:24 -0400 Subject: [PATCH 1/5] Make sure session creation happens before starting agent --- .../ml/inference/agent_development_kit.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/agent_development_kit.py b/sdks/python/apache_beam/ml/inference/agent_development_kit.py index 1130598f06f8..8bcca74ee143 100644 --- a/sdks/python/apache_beam/ml/inference/agent_development_kit.py +++ b/sdks/python/apache_beam/ml/inference/agent_development_kit.py @@ -229,17 +229,6 @@ def run_inference( for element in batch: session_id: str = inference_args.get("session_id", str(uuid.uuid4())) - # Ensure a session exists for this invocation - try: - model.session_service.create_session( - app_name=self._app_name, - user_id=user_id, - session_id=session_id, - ) - except sessions.SessionExistsError: - # It's okay if the session already exists for shared session IDs. - pass - # Wrap plain strings in a Content object if isinstance(element, str): # pyrefly: ignore[bad-instantiation] @@ -288,6 +277,16 @@ async def _invoke_agent( The text of the agent's final response, or ``None`` if the agent produced no final text response. """ + # Ensure a session exists for this invocation + try: + await model.session_service.create_session( + app_name=self._app_name, + user_id=user_id, + session_id=session_id, + ) + except sessions.SessionExistsError: + # It's okay if the session already exists for shared session IDs. + pass async for event in runner.run_async( user_id=user_id, session_id=session_id, From 3054171eeaac7c7451a83eecb51b0d64e4e25023 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 13 May 2026 10:29:55 -0400 Subject: [PATCH 2/5] fix var --- sdks/python/apache_beam/ml/inference/agent_development_kit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/agent_development_kit.py b/sdks/python/apache_beam/ml/inference/agent_development_kit.py index 8bcca74ee143..f930fe0c2421 100644 --- a/sdks/python/apache_beam/ml/inference/agent_development_kit.py +++ b/sdks/python/apache_beam/ml/inference/agent_development_kit.py @@ -279,7 +279,7 @@ async def _invoke_agent( """ # Ensure a session exists for this invocation try: - await model.session_service.create_session( + await runner.session_service.create_session( app_name=self._app_name, user_id=user_id, session_id=session_id, From efd77682ce8fff7928a9f6a68b4a6f38fe351589 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 13 May 2026 12:33:52 -0400 Subject: [PATCH 3/5] Fix up --- .../ml/inference/agent_development_kit.py | 15 +++++++++------ .../ml/inference/agent_development_kit_test.py | 11 ++++++++++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/agent_development_kit.py b/sdks/python/apache_beam/ml/inference/agent_development_kit.py index f930fe0c2421..64103e9e019b 100644 --- a/sdks/python/apache_beam/ml/inference/agent_development_kit.py +++ b/sdks/python/apache_beam/ml/inference/agent_development_kit.py @@ -238,7 +238,8 @@ def run_inference( message = element agent_invocations.append( - self._invoke_agent(model, user_id, session_id, message)) + self._invoke_agent( + model, user_id, session_id, self._app_name, message)) elements_with_sessions.append(element) # Run all agent invocations concurrently @@ -263,6 +264,7 @@ async def _invoke_agent( runner: "Runner", user_id: str, session_id: str, + app_name: str, message: genai_Content, ) -> Optional[str]: """Drives the ADK event loop and returns the final response text. @@ -277,16 +279,17 @@ async def _invoke_agent( The text of the agent's final response, or ``None`` if the agent produced no final text response. """ - # Ensure a session exists for this invocation + # Check for your specific session ID try: + # Attempt to get the specific session + await runner.session_service.get_session(session_id) + except Exception as e: await runner.session_service.create_session( - app_name=self._app_name, + app_name=app_name, user_id=user_id, session_id=session_id, ) - except sessions.SessionExistsError: - # It's okay if the session already exists for shared session IDs. - pass + async for event in runner.run_async( user_id=user_id, session_id=session_id, diff --git a/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py b/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py index 6d59bceb9d39..0a866f42f11a 100644 --- a/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py +++ b/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py @@ -56,6 +56,9 @@ async def _async_gen(*args, **kwargs): runner.agent = agent runner.run_async = mock.MagicMock(side_effect=_async_gen) runner.session_service = mock.MagicMock() + runner.session_service.get_session = mock.AsyncMock( + side_effect=Exception("Session not found")) + runner.session_service.create_session = mock.AsyncMock() return runner @@ -266,6 +269,8 @@ async def _async_gen(*args, **kwargs): runner.agent = agent runner.run_async = mock.MagicMock(side_effect=_async_gen) runner.session_service = mock.MagicMock() + runner.session_service.get_session = mock.AsyncMock(side_effect=Exception()) + runner.session_service.create_session = mock.AsyncMock() handler = ADKAgentModelHandler(agent=agent) results = list(handler.run_inference(batch=["hello"], model=runner)) @@ -287,6 +292,8 @@ async def _async_gen(*args, **kwargs): runner.agent = agent runner.run_async = mock.MagicMock(side_effect=_async_gen) runner.session_service = mock.MagicMock() + runner.session_service.get_session = mock.AsyncMock(side_effect=Exception()) + runner.session_service.create_session = mock.AsyncMock() handler = ADKAgentModelHandler(agent=agent) results = list(handler.run_inference(batch=["hello"], model=runner)) @@ -313,6 +320,8 @@ async def _async_gen(*args, **kwargs): runner.agent = agent runner.run_async = mock.MagicMock(side_effect=_async_gen) runner.session_service = mock.MagicMock() + runner.session_service.get_session = mock.AsyncMock(side_effect=Exception()) + runner.session_service.create_session = mock.AsyncMock() handler = ADKAgentModelHandler(agent=agent) results = list(handler.run_inference(batch=["hi"], model=runner)) @@ -326,7 +335,7 @@ def test_invoke_agent_static_method_directly(self): result = asyncio.run( ADKAgentModelHandler._invoke_agent( - runner, "user", "session-1", mock.MagicMock())) + runner, "user", "session-1", "test_app", mock.MagicMock())) self.assertEqual(result, "direct result") From b3f1f2b5dc87e03d7d561002de2de8222eff3d47 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 18 May 2026 16:32:19 -0400 Subject: [PATCH 4/5] Fix text parsing --- .../ml/inference/agent_development_kit.py | 11 +++++--- .../inference/agent_development_kit_test.py | 27 ++++++++++--------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/agent_development_kit.py b/sdks/python/apache_beam/ml/inference/agent_development_kit.py index 64103e9e019b..e969cab05fc2 100644 --- a/sdks/python/apache_beam/ml/inference/agent_development_kit.py +++ b/sdks/python/apache_beam/ml/inference/agent_development_kit.py @@ -296,9 +296,14 @@ async def _invoke_agent( new_message=message, ): if event.is_final_response(): - if event.content: - return event.content.text - return None + if event.content and event.content.parts: + return "".join([p.text for p in event.content.parts]) + raise ValueError( + f"Agent {runner.agent.name} did not return a response, " + f"final event: {event}") + + raise ValueError( + f"Agent {runner.agent.name} did not return a response") def get_metrics_namespace(self) -> str: return "ADKAgentModelHandler" diff --git a/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py b/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py index 0a866f42f11a..6c8b5c5b351c 100644 --- a/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py +++ b/sdks/python/apache_beam/ml/inference/agent_development_kit_test.py @@ -41,9 +41,11 @@ def _make_mock_runner( final_text: str = "Hello from agent", ) -> mock.MagicMock: """Returns a mock Runner whose run_async yields one final-response event.""" - # Build a mock event that looks like a final response + part = mock.MagicMock() + part.text = final_text + content = mock.MagicMock() - content.text = final_text + content.parts = [part] event = mock.MagicMock() event.is_final_response.return_value = True @@ -254,8 +256,8 @@ def test_session_created_with_correct_app_name(self): class TestResponseExtraction(unittest.TestCase): """Tests for extraction of the final response from the event stream.""" - def test_returns_none_when_no_final_response(self): - """Agent emits only non-final events; inference should be None.""" + def test_raises_when_no_final_response(self): + """Agent emits only non-final events; should raise ValueError.""" agent = _make_mock_agent() # Build a runner that yields only non-final events @@ -273,12 +275,10 @@ async def _async_gen(*args, **kwargs): runner.session_service.create_session = mock.AsyncMock() handler = ADKAgentModelHandler(agent=agent) - results = list(handler.run_inference(batch=["hello"], model=runner)) - - self.assertEqual(len(results), 1) - self.assertIsNone(results[0].inference) + with self.assertRaisesRegex(ValueError, "did not return a response"): + list(handler.run_inference(batch=["hello"], model=runner)) - def test_returns_none_when_final_event_has_no_content(self): + def test_raises_when_final_event_has_no_content(self): agent = _make_mock_agent() event = mock.MagicMock() @@ -296,17 +296,18 @@ async def _async_gen(*args, **kwargs): runner.session_service.create_session = mock.AsyncMock() handler = ADKAgentModelHandler(agent=agent) - results = list(handler.run_inference(batch=["hello"], model=runner)) - - self.assertIsNone(results[0].inference) + with self.assertRaisesRegex(ValueError, "did not return a response"): + list(handler.run_inference(batch=["hello"], model=runner)) def test_stops_after_first_final_response(self): """Multiple final events: only the first one's text should be used.""" agent = _make_mock_agent() def _make_event(text: str): + part = mock.MagicMock() + part.text = text content = mock.MagicMock() - content.text = text + content.parts = [part] event = mock.MagicMock() event.is_final_response.return_value = True event.content = content From 005742dbfdde5e2b22186179649bea6ea83033ac Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 19 May 2026 13:47:03 -0400 Subject: [PATCH 5/5] yapf --- sdks/python/apache_beam/ml/inference/agent_development_kit.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/agent_development_kit.py b/sdks/python/apache_beam/ml/inference/agent_development_kit.py index e969cab05fc2..386955b0dfae 100644 --- a/sdks/python/apache_beam/ml/inference/agent_development_kit.py +++ b/sdks/python/apache_beam/ml/inference/agent_development_kit.py @@ -302,8 +302,7 @@ async def _invoke_agent( f"Agent {runner.agent.name} did not return a response, " f"final event: {event}") - raise ValueError( - f"Agent {runner.agent.name} did not return a response") + raise ValueError(f"Agent {runner.agent.name} did not return a response") def get_metrics_namespace(self) -> str: return "ADKAgentModelHandler"