Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private Flowable<Event> callLlm(
callLlmContext)
.doOnSubscribe(
s ->
Tracing.traceCallLlm(
traceCallLlm(
span,
context,
eventForCallbackUsage.id(),
Expand Down Expand Up @@ -520,6 +520,7 @@ public Flowable<Event> runLive(InvocationContext invocationContext) {
.doOnComplete(
() ->
Tracing.traceSendData(
Span.current(),
invocationContext,
eventIdForSendData,
llmRequestAfterPreprocess.contents()))
Expand All @@ -529,6 +530,7 @@ public Flowable<Event> runLive(InvocationContext invocationContext) {
span.setStatus(StatusCode.ERROR, error.getMessage());
span.recordException(error);
Tracing.traceSendData(
Span.current(),
invocationContext,
eventIdForSendData,
llmRequestAfterPreprocess.contents());
Expand Down Expand Up @@ -706,6 +708,19 @@ private Flowable<Event> buildPostprocessingEvents(
return processorEvents.concatWith(Flowable.just(modelResponseEvent)).concatWith(functionEvents);
}

/**
* Traces an LLM call without an associated exception. This is an overload for {@link
* Tracing#traceCallLlm} for successful calls.
*/
private void traceCallLlm(
Span span,
InvocationContext context,
String eventId,
LlmRequest llmRequest,
LlmResponse llmResponse) {
Tracing.traceCallLlm(span, context, eventId, llmRequest, llmResponse, null);
}

private Event buildModelResponseEvent(
Event baseEventForLlmResponse, LlmRequest llmRequest, LlmResponse llmResponse) {
Event.Builder eventBuilder =
Expand Down
42 changes: 23 additions & 19 deletions core/src/main/java/com/google/adk/flows/llmflows/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ public static Maybe<Event> handleFunctionCalls(

if (events.size() > 1) {
return Maybe.just(mergedEvent)
.doOnSuccess(event -> Tracing.traceToolResponse(event.id(), event))
.compose(Tracing.<Event>trace("tool_response").setParent(parentContext));
.compose(
Tracing.<Event>trace("execute_tool (merged)")
.setParent(parentContext)
.onSuccess(
(span, event) ->
Tracing.traceMergedToolCalls(span, event.id(), event)));
}
return Maybe.just(mergedEvent);
});
Expand Down Expand Up @@ -269,10 +273,8 @@ private static Function<FunctionCall, Maybe<Event>> getFunctionCallMapper(
tool,
toolContext,
functionCall,
functionArgs,
parentContext)
: callTool(
tool, functionArgs, toolContext, parentContext))
functionArgs)
: callTool(tool, functionArgs, toolContext))
.compose(Tracing.withContext(parentContext)));

return postProcessFunctionResult(
Expand All @@ -296,8 +298,7 @@ private static Maybe<Map<String, Object>> processFunctionLive(
BaseTool tool,
ToolContext toolContext,
FunctionCall functionCall,
Map<String, Object> args,
Context parentContext) {
Map<String, Object> args) {
// Case 1: Handle a call to stopStreaming
if (functionCall.name().get().equals("stopStreaming") && args.containsKey("functionName")) {
String functionNameToStop = (String) args.get("functionName");
Expand Down Expand Up @@ -365,7 +366,7 @@ private static Maybe<Map<String, Object>> processFunctionLive(
}

// Case 3: Fallback for regular, non-streaming tools
return callTool(tool, args, toolContext, parentContext);
return callTool(tool, args, toolContext);
}

public static Set<String> getLongRunningFunctionCalls(
Expand Down Expand Up @@ -426,12 +427,22 @@ private static Maybe<Event> postProcessFunctionResult(
Event event =
buildResponseEvent(
tool, finalFunctionResult, toolContext, invocationContext);
Tracing.traceToolResponse(event.id(), event);
return Maybe.just(event);
});
})
.compose(
Tracing.<Event>trace("tool_response [" + tool.name() + "]").setParent(parentContext));
Tracing.<Event>trace("execute_tool [" + tool.name() + "]")
.setParent(parentContext)
.onSuccess(
(span, event) ->
Tracing.traceToolExecution(
span,
tool.name(),
tool.description(),
tool.getClass().getSimpleName(),
functionArgs,
event,
null)));
}

private static Optional<Event> mergeParallelFunctionResponseEvents(
Expand Down Expand Up @@ -579,17 +590,10 @@ private static Maybe<Map<String, Object>> maybeInvokeAfterToolCall(
}

private static Maybe<Map<String, Object>> callTool(
BaseTool tool, Map<String, Object> args, ToolContext toolContext, Context parentContext) {
BaseTool tool, Map<String, Object> args, ToolContext toolContext) {
return tool.runAsync(args, toolContext)
.toMaybe()
.doOnSubscribe(
d ->
Tracing.traceToolCall(
tool.name(), tool.description(), tool.getClass().getSimpleName(), args))
.doOnError(t -> Span.current().recordException(t))
.compose(
Tracing.<Map<String, Object>>trace("tool_call [" + tool.name() + "]")
.setParent(parentContext))
.onErrorResumeNext(
e ->
Maybe.error(
Expand Down
184 changes: 107 additions & 77 deletions core/src/main/java/com/google/adk/telemetry/Tracing.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -77,6 +79,11 @@ public class Tracing {

private static final Logger log = LoggerFactory.getLogger(Tracing.class);

private static final String INVOKE_AGENT_OPERATION = "invoke_agent";
private static final String EXECUTE_TOOL_OPERATION = "execute_tool";
private static final String SEND_DATA_OPERATION = "send_data";
private static final String CALL_LLM_OPERATION = "call_llm";

private static final AttributeKey<List<String>> GEN_AI_RESPONSE_FINISH_REASONS =
AttributeKey.stringArrayKey("gen_ai.response.finish_reasons");

Expand Down Expand Up @@ -134,15 +141,6 @@ public class Tracing {

private Tracing() {}

private static void traceWithSpan(String methodName, Consumer<Span> traceAction) {
Span span = Span.current();
if (!span.getSpanContext().isValid()) {
log.trace("{}: No valid span in current context.", methodName);
return;
}
traceAction.accept(span);
}

private static void setInvocationAttributes(
Span span, InvocationContext invocationContext, String eventId) {
span.setAttribute(ADK_INVOCATION_ID, invocationContext.invocationId());
Expand All @@ -159,12 +157,6 @@ private static void setInvocationAttributes(
}
}

private static void setToolExecutionAttributes(Span span) {
span.setAttribute(GEN_AI_OPERATION_NAME, "execute_tool");
span.setAttribute(ADK_LLM_REQUEST, "{}");
span.setAttribute(ADK_LLM_RESPONSE, "{}");
}

private static void setJsonAttribute(Span span, AttributeKey<String> key, Object value) {
if (!CAPTURE_MESSAGE_CONTENT_IN_SPANS) {
span.setAttribute(key, "{}");
Expand Down Expand Up @@ -198,7 +190,7 @@ public static void setTracerForTesting(Tracer tracer) {
*/
public static void traceAgentInvocation(
Span span, String agentName, String agentDescription, InvocationContext invocationContext) {
span.setAttribute(GEN_AI_OPERATION_NAME, "invoke_agent");
span.setAttribute(GEN_AI_OPERATION_NAME, INVOKE_AGENT_OPERATION);
span.setAttribute(GEN_AI_AGENT_DESCRIPTION, agentDescription);
span.setAttribute(GEN_AI_AGENT_NAME, agentName);
if (invocationContext.session() != null && invocationContext.session().id() != null) {
Expand All @@ -207,58 +199,62 @@ public static void traceAgentInvocation(
}

/**
* Traces tool call arguments.
*
* @param args The arguments to the tool call.
*/
public static void traceToolCall(
String toolName, String toolDescription, String toolType, Map<String, Object> args) {
traceWithSpan(
"traceToolCall",
span -> {
setToolExecutionAttributes(span);
span.setAttribute(GEN_AI_TOOL_NAME, toolName);
span.setAttribute(GEN_AI_TOOL_DESCRIPTION, toolDescription);
span.setAttribute(GEN_AI_TOOL_TYPE, toolType);

setJsonAttribute(span, ADK_TOOL_CALL_ARGS, args);
});
}

/**
* Traces tool response event.
* Traces a tool execution, including its arguments, response, and any potential error.
*
* @param eventId The ID of the event.
* @param functionResponseEvent The function response event.
* @param span The span representing the tool execution.
* @param toolName The name of the tool.
* @param toolDescription The tool's description.
* @param toolType The tool's type (e.g., "FunctionTool").
* @param args The arguments passed to the tool.
* @param functionResponseEvent The event containing the tool's response, if successful.
* @param error The exception thrown during execution, if any.
*/
public static void traceToolResponse(String eventId, Event functionResponseEvent) {
traceWithSpan(
"traceToolResponse",
span -> {
setToolExecutionAttributes(span);
span.setAttribute(ADK_EVENT_ID, eventId);

FunctionResponse functionResponse =
functionResponseEvent.functionResponses().stream().findFirst().orElse(null);

String toolCallId = "<not specified>";
Object toolResponse = "<not specified>";
if (functionResponse != null) {
toolCallId = functionResponse.id().orElse(toolCallId);
if (functionResponse.response().isPresent()) {
toolResponse = functionResponse.response().get();
}
}

span.setAttribute(GEN_AI_TOOL_CALL_ID, toolCallId);
public static void traceToolExecution(
Span span,
String toolName,
String toolDescription,
String toolType,
Map<String, Object> args,
@Nullable Event functionResponseEvent,
@Nullable Exception error) {
span.setAttribute(GEN_AI_OPERATION_NAME, EXECUTE_TOOL_OPERATION);
span.setAttribute(GEN_AI_TOOL_NAME, toolName);
span.setAttribute(GEN_AI_TOOL_DESCRIPTION, toolDescription);
span.setAttribute(GEN_AI_TOOL_TYPE, toolType);

setJsonAttribute(span, ADK_TOOL_CALL_ARGS, args);

if (functionResponseEvent != null) {
span.setAttribute(ADK_EVENT_ID, functionResponseEvent.id());
FunctionResponse functionResponse =
functionResponseEvent.functionResponses().stream().findFirst().orElse(null);

String toolCallId = "<not specified>";
Object toolResponse = "<not specified>";
if (functionResponse != null) {
toolCallId = functionResponse.id().orElse(toolCallId);
if (functionResponse.response().isPresent()) {
toolResponse = functionResponse.response().get();
}
}
span.setAttribute(GEN_AI_TOOL_CALL_ID, toolCallId);
Object finalToolResponse =
(toolResponse instanceof Map) ? toolResponse : ImmutableMap.of("result", toolResponse);
setJsonAttribute(span, ADK_TOOL_RESPONSE, finalToolResponse);
} else {
// Set placeholder if no response event is available (e.g., due to an error)
span.setAttribute(GEN_AI_TOOL_CALL_ID, "<not specified>");
setJsonAttribute(span, ADK_TOOL_RESPONSE, "{}");
}

Object finalToolResponse =
(toolResponse instanceof Map)
? toolResponse
: ImmutableMap.of("result", toolResponse);
// Also set empty LLM attributes for UI compatibility, like in traceToolResponse
span.setAttribute(ADK_LLM_REQUEST, "{}");
span.setAttribute(ADK_LLM_RESPONSE, "{}");

setJsonAttribute(span, ADK_TOOL_RESPONSE, finalToolResponse);
});
if (error != null) {
span.setStatus(StatusCode.ERROR, error.getMessage());
span.recordException(error);
}
}

/**
Expand Down Expand Up @@ -303,15 +299,22 @@ public static void traceCallLlm(
InvocationContext invocationContext,
String eventId,
LlmRequest llmRequest,
LlmResponse llmResponse) {
LlmResponse llmResponse,
@Nullable Exception error) {
span.setAttribute(GEN_AI_SYSTEM, "gcp.vertex.agent");
span.setAttribute(GEN_AI_OPERATION_NAME, CALL_LLM_OPERATION);
llmRequest.model().ifPresent(modelName -> span.setAttribute(GEN_AI_REQUEST_MODEL, modelName));

setInvocationAttributes(span, invocationContext, eventId);

setJsonAttribute(span, ADK_LLM_REQUEST, buildLlmRequestForTrace(llmRequest));
setJsonAttribute(span, ADK_LLM_RESPONSE, llmResponse);

if (error != null) {
span.setStatus(StatusCode.ERROR, error.getMessage());
span.recordException(error);
}

llmRequest
.config()
.ifPresent(
Expand Down Expand Up @@ -352,18 +355,45 @@ public static void traceCallLlm(
* @param data A list of content objects being sent.
*/
public static void traceSendData(
InvocationContext invocationContext, String eventId, List<Content> data) {
traceWithSpan(
"traceSendData",
span -> {
setInvocationAttributes(span, invocationContext, eventId);

ImmutableList<Content> safeData =
Optional.ofNullable(data).orElse(ImmutableList.of()).stream()
.filter(Objects::nonNull)
.collect(toImmutableList());
setJsonAttribute(span, ADK_DATA, safeData);
});
Span span, InvocationContext invocationContext, String eventId, List<Content> data) {
if (!span.getSpanContext().isValid()) {
log.trace("traceSendData: No valid span in current context.");
return;
}
setInvocationAttributes(span, invocationContext, eventId);
span.setAttribute(GEN_AI_OPERATION_NAME, SEND_DATA_OPERATION);

ImmutableList<Content> safeData =
Optional.ofNullable(data).orElse(ImmutableList.of()).stream()
.filter(Objects::nonNull)
.collect(toImmutableList());
setJsonAttribute(span, ADK_DATA, safeData);
}

/**
* Traces merged tool call events.
*
* <p>Calling this function is not needed for telemetry purposes. This is provided for preventing
* /debug/trace requests (typically sent by web UI).
*
* @param responseEventId The ID of the response event.
* @param functionResponseEvent The merged response event.
*/
public static void traceMergedToolCalls(
Span span, String responseEventId, Event functionResponseEvent) {
if (!span.getSpanContext().isValid()) {
log.trace("traceMergedToolCalls: No valid span in current context.");
return;
}
span.setAttribute(GEN_AI_OPERATION_NAME, EXECUTE_TOOL_OPERATION);
span.setAttribute(GEN_AI_TOOL_NAME, "(merged tools)");
span.setAttribute(GEN_AI_TOOL_DESCRIPTION, "(merged tools)");
span.setAttribute(GEN_AI_TOOL_CALL_ID, responseEventId);
span.setAttribute(ADK_TOOL_CALL_ARGS, "N/A");
span.setAttribute(ADK_EVENT_ID, responseEventId);
setJsonAttribute(span, ADK_TOOL_RESPONSE, functionResponseEvent);
span.setAttribute(ADK_LLM_REQUEST, "{}");
span.setAttribute(ADK_LLM_RESPONSE, "{}");
}

/**
Expand Down
Loading
Loading