Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions core/src/main/java/com/google/adk/runner/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,8 @@ protected Flowable<Event> runAsyncImpl(
session.appName(), session.userId(), session.id(), Optional.empty())
.flatMapPublisher(
updatedSession ->
runAgentWithFreshSession(
session,
updatedSession,
event,
invocationId,
runConfig,
rootAgent))
runAgentWithUpdatedSession(
initialContext, updatedSession, event, rootAgent))
.compose(Tracing.<Event>withContext(capturedContext));
});
})
Expand All @@ -495,19 +490,27 @@ protected Flowable<Event> runAsyncImpl(
});
}

private Flowable<Event> runAgentWithFreshSession(
Session session,
Session updatedSession,
Event event,
String invocationId,
RunConfig runConfig,
BaseAgent rootAgent) {
/**
* Runs the agent with the updated session state.
*
* <p>This method is called after the user message has been persistent in the session. It creates
* a final {@link InvocationContext} that inherits state from the {@code initialContext} but uses
* the {@code updatedSession} to ensure the agent can access the latest conversation history.
*
* @param initialContext the context from the start of the invocation, used to preserve metadata
* and callback data.
* @param updatedSession the session object containing the latest message.
* @param event the event representing the user message that was just appended.
* @param rootAgent the agent to be executed.
* @return a stream of events from the agent execution and subsequent plugin callbacks.
*/
private Flowable<Event> runAgentWithUpdatedSession(
InvocationContext initialContext, Session updatedSession, Event event, BaseAgent rootAgent) {
// Create context with updated session for beforeRunCallback
InvocationContext contextWithUpdatedSession =
newInvocationContextBuilder(updatedSession)
.invocationId(invocationId)
initialContext.toBuilder()
.session(updatedSession)
.agent(this.findAgentToRun(updatedSession, rootAgent))
.runConfig(runConfig)
.userContent(event.content().orElseGet(Content::fromParts))
.build();

Expand Down Expand Up @@ -536,7 +539,7 @@ private Flowable<Event> runAgentWithFreshSession(
.flatMap(
registeredEvent -> {
// TODO: remove this hack after deprecating runAsync with Session.
copySessionStates(updatedSession, session);
copySessionStates(updatedSession, initialContext.session());
return contextWithUpdatedSession
.pluginManager()
.onEventCallback(contextWithUpdatedSession, registeredEvent)
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/java/com/google/adk/runner/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,29 @@ public void onEventCallback_success() {
verify(plugin).onEventCallback(any(), any());
}

@Test
public void callbackContextData_preservedAcrossInvocation() {
String testKey = "testKey";
String testValue = "testValue";

when(plugin.onUserMessageCallback(any(), any()))
.thenAnswer(
invocation -> {
InvocationContext context = invocation.getArgument(0);
context.callbackContextData().put(testKey, testValue);
return Maybe.empty();
});

ArgumentCaptor<InvocationContext> contextCaptor =
ArgumentCaptor.forClass(InvocationContext.class);
when(plugin.afterRunCallback(contextCaptor.capture())).thenReturn(Completable.complete());

var unused =
runner.runAsync("user", session.id(), createContent("test")).toList().blockingGet();

assertThat(contextCaptor.getValue().callbackContextData()).containsEntry(testKey, testValue);
}

@Test
public void runAsync_withSessionKey_success() {
var events =
Expand Down