diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 42e1fdfebea..07ceb11fa59 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat public void createPendingStream() { } + /** + * A delay segment started with a specific reason during load balancing. + * + * @param reasonToken the reason for the delay, e.g., "pick_first:connecting" + * @since 1.82.0 + */ + public void delayStarted(String reasonToken) { + } + + /** + * The current delay segment ended. + * + * @since 1.82.0 + */ + public void delayEnded() { + } + /** * Headers has been sent to the socket. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 3187ae8ef1b..d3af8822058 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -549,25 +549,30 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; + @Nullable private final String delayReasonToken; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this.subchannel = subchannel; - this.streamTracerFactory = streamTracerFactory; - this.status = checkNotNull(status, "status"); - this.drop = drop; - this.authorityOverride = null; + this(subchannel, streamTracerFactory, status, drop, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null); + } + + private PickResult( + @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, + Status status, boolean drop, @Nullable String authorityOverride, + @Nullable String delayReasonToken) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; + this.delayReasonToken = delayReasonToken; } /** @@ -727,6 +732,22 @@ public static PickResult withNoResult() { return NO_RESULT; } + /** + * No decision could be made. The RPC will stay buffered with a specific reason. + * + * @since 1.82.0 + */ + public static PickResult withNoResult(String delayReasonToken) { + Preconditions.checkNotNull(delayReasonToken, "delayReasonToken"); + return new PickResult(null, null, Status.OK, false, null, delayReasonToken); + } + + /** Returns the delay reason token if any. */ + @Nullable + public String getDelayReasonToken() { + return delayReasonToken; + } + /** Returns the authority override if any. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656") @Nullable diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5569e1eecf8..bde573c7508 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -157,7 +158,8 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers, pickResult); + String token = pickResult != null ? pickResult.getDelayReasonToken() : null; + return createPendingStream(args, tracers, pickResult, token); } state = newerState; } @@ -173,8 +175,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult) { - PendingStream pendingStream = new PendingStream(args, tracers); + PickResult pickResult, @Nullable String delayReasonToken) { + PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -303,6 +305,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { + stream.endDelay(); Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since @@ -315,7 +318,9 @@ final void reprocess(@Nullable SubchannelPicker picker) { executor.execute(runnable); } toRemove.add(stream); - } // else: stay pending + } else { // stay pending + stream.updateDelayReason(pickResult.getDelayReasonToken()); + } } synchronized (lock) { @@ -361,11 +366,44 @@ private class PendingStream extends DelayedStream { private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; + @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; + this.delayReasonToken = initialToken; + if (initialToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(initialToken); + } + } + } + + void updateDelayReason(String newToken) { + if (!Objects.equals(delayReasonToken, newToken)) { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + } + delayReasonToken = newToken; + if (newToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(newToken); + } + } + } + } + + void endDelay() { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + delayReasonToken = null; + } } /** Runnable may be null. */ @@ -391,6 +429,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve @Override public void cancel(Status reason) { + endDelay(); super.cancel(reason); synchronized (lock) { if (reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e7679ea14cc..e4c2b3b9933 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -39,6 +39,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index cf4b4c94e04..4111700fffe 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,6 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; @@ -83,7 +85,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -135,7 +137,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker(CONNECTING_RESULT); break; case READY: picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d7e1d4ca4f6..8f34295a701 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -772,6 +772,55 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure( + " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]"); } + @Test + public void streamDelayMetrics() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + InOrder inOrder = inOrder(mockTracer); + inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); + + SubchannelPicker customDelayPicker = mock(SubchannelPicker.class); + when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("rls:lookup_pending")); + + delayedTransport.reprocess(customDelayPicker); + + inOrder.verify(mockTracer).delayEnded(); + inOrder.verify(mockTracer).delayStarted("rls:lookup_pending"); + + delayedTransport.reprocess(mockPicker); + + inOrder.verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_cancelled() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); + + verify(mockTracer).delayStarted("pick_first:connecting"); + + stream.cancel(Status.CANCELLED); + + verify(mockTracer).delayEnded(); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1e130423a45..5bfabd7ea0e 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception { verify(mockSubchannel).requestConnection(); // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting"); + assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } diff --git a/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java b/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java index c345fb35d0a..f26caedc1b3 100644 --- a/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java +++ b/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java @@ -74,7 +74,7 @@ public void configureServerBuilder(ServerBuilder serverBuilder) { } @VisibleForTesting - void configureChannelBuilder(ManagedChannelBuilder builder) { + public void configureChannelBuilder(ManagedChannelBuilder builder) { delegate.configureChannelBuilder(builder); } @@ -115,6 +115,14 @@ public Builder sdk(OpenTelemetry sdk) { return this; } + /** + * Enables or disables tracing. + */ + public Builder enableTracing(boolean enable) { + InternalGrpcOpenTelemetry.enableTracing(delegate, enable); + return this; + } + /** * Adds optionalLabelKey to all the metrics that can provide value for the * optionalLabelKey. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 705026a3fe3..df18ce249a1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -123,6 +123,7 @@ opencensus-exporter-trace-stackdriver = { module = "io.opencensus:opencensus-exp opencensus-impl = { module = "io.opencensus:opencensus-impl", version.ref = "opencensus" } opentelemetry-api = "io.opentelemetry:opentelemetry-api:1.60.1" opentelemetry-exporter-prometheus = "io.opentelemetry:opentelemetry-exporter-prometheus:1.60.1-alpha" +opentelemetry-exporter-otlp = "io.opentelemetry:opentelemetry-exporter-otlp:1.60.1" opentelemetry-gcp-resources = "io.opentelemetry.contrib:opentelemetry-gcp-resources:1.54.0-alpha" opentelemetry-sdk-extension-autoconfigure = "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.60.1" opentelemetry-sdk-testing = "io.opentelemetry:opentelemetry-sdk-testing:1.60.1" diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 5160759460c..b7b68482e0a 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -42,6 +42,7 @@ dependencies { libraries.netty.tcnative, libraries.netty.tcnative.classes, libraries.opentelemetry.exporter.prometheus, // For xds interop client + libraries.opentelemetry.exporter.otlp, project(':grpc-googleapis'), project(':grpc-grpclb'), project(':grpc-rls') diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 125d876b705..8b4582308ab 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -79,6 +79,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import io.grpc.gcp.csm.observability.CsmObservability; /** * Application that starts a client for the {@link TestServiceGrpc.TestServiceImplBase} and runs @@ -99,6 +100,13 @@ public class TestServiceClient { public static void main(String[] args) throws Exception { final TestServiceClient client = new TestServiceClient(); client.parseArgs(args); + if (client.enableOpentelemetryTracing) { + io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder() + .sdk(otel) + .build(); + gotel.registerGlobal(); + } customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider(); LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider); client.setUp(); @@ -107,6 +115,10 @@ public static void main(String[] args) throws Exception { client.run(); } finally { client.tearDown(); + if (client.enableOpentelemetryTracing) { + System.out.println("Sleeping to flush spans..."); + Thread.sleep(2000); + } } } @@ -136,6 +148,7 @@ public static void main(String[] args) throws Exception { private int soakResponseSize = 314159; private int numThreads = 1; private String additionalMetadata = ""; + private boolean enableOpentelemetryTracing = false; private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider; private Tester tester = new Tester(); @@ -167,6 +180,8 @@ void parseArgs(String[] args) throws Exception { serverHostOverride = value; } else if ("server_port".equals(key)) { serverPort = Integer.parseInt(value); + } else if ("enable_opentelemetry_tracing".equals(key)) { + enableOpentelemetryTracing = Boolean.parseBoolean(value); } else if ("test_case".equals(key)) { testCase = value; } else if ("num_times".equals(key)) { @@ -599,6 +614,9 @@ private class Tester extends AbstractInteropTest { @Override protected ManagedChannelBuilder createChannelBuilder() { boolean useGeneric = false; + if (enableOpentelemetryTracing) { + useGeneric = true; + } ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index fc4cdf9178f..9431790ee74 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -17,6 +17,7 @@ package io.grpc.testing.integration; import com.google.common.annotations.VisibleForTesting; +import io.grpc.gcp.csm.observability.CsmObservability; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.BindableService; import io.grpc.Grpc; @@ -46,6 +47,13 @@ public class TestServiceServer { public static void main(String[] args) throws Exception { final TestServiceServer server = new TestServiceServer(); server.parseArgs(args); + if (server.enableOpentelemetryTracing) { + io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder() + .sdk(otel) + .build(); + gotel.registerGlobal(); + } if (server.useTls) { System.out.println( "\nUsing fake CA for TLS certificate. Test clients should expect host\n" @@ -75,6 +83,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; + private boolean enableOpentelemetryTracing = false; private ScheduledExecutorService executor; private Server server; @@ -106,6 +115,8 @@ void parseArgs(String[] args) { port = Integer.parseInt(value); } else if ("use_tls".equals(key)) { useTls = Boolean.parseBoolean(value); + } else if ("enable_opentelemetry_tracing".equals(key)) { + enableOpentelemetryTracing = Boolean.parseBoolean(value); } else if ("use_alts".equals(key)) { useAlts = Boolean.parseBoolean(value); } else if ("local_handshaker_port".equals(key)) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java index 89519041a79..26a7b187064 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java @@ -39,6 +39,7 @@ import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; @@ -60,6 +61,7 @@ import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.xds.XdsChannelCredentials; +import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.util.ArrayList; import java.util.Collections; @@ -104,6 +106,7 @@ public final class XdsTestClient { private long currentRequestId; private ListeningScheduledExecutorService exec; private CsmObservability csmObservability; + private OpenTelemetrySdk openTelemetrySdk; /** * The main application allowing this client to be launched from the command line. @@ -265,14 +268,23 @@ private static RpcType parseRpc(String rpc) { @IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs private void run() { if (enableCsmObservability) { + Map props = new HashMap<>(); + props.put("otel.logs.exporter", "none"); + props.put("otel.metrics.exporter", "otlp"); + String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER"); + if (tracesExporter != null) { + props.put("otel.traces.exporter", tracesExporter); + } else { + props.put("otel.traces.exporter", "none"); + } + + AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder() + .addPropertiesSupplier(() -> props) + .build(); + openTelemetrySdk = autoSdk.getOpenTelemetrySdk(); csmObservability = CsmObservability.newBuilder() - .sdk(AutoConfiguredOpenTelemetrySdk.builder() - .addPropertiesSupplier(() -> ImmutableMap.of( - "otel.logs.exporter", "none", - "otel.metrics.exporter", "prometheus", - "otel.traces.exporter", "none")) - .build() - .getOpenTelemetrySdk()) + .sdk(openTelemetrySdk) + .enableTracing(!"none".equals(props.get("otel.traces.exporter"))) .build(); csmObservability.registerGlobal(); } @@ -289,14 +301,16 @@ private void run() { try { statsServer.start(); for (int i = 0; i < numChannels; i++) { - channels.add( - Grpc.newChannelBuilder( + ManagedChannelBuilder builder = Grpc.newChannelBuilder( server, secureMode ? XdsChannelCredentials.create(InsecureChannelCredentials.create()) : InsecureChannelCredentials.create()) - .enableRetry() - .build()); + .enableRetry(); + if (enableCsmObservability) { + csmObservability.configureChannelBuilder(builder); + } + channels.add(builder.build()); } exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); Payload requestPayload = Payload.newBuilder() @@ -325,6 +339,9 @@ private void stop() throws InterruptedException { if (csmObservability != null) { csmObservability.close(); } + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } } @@ -373,6 +390,13 @@ public void start(Listener responseListener, Metadata headers) { @Override public void onHeaders(Metadata headers) { hostnameRef.set(headers.get(XdsTestServer.HOSTNAME_KEY)); + io.opentelemetry.api.trace.Span currentSpan = io.opentelemetry.api.trace.Span.current(); + for (String key : config.metadata.keys()) { + String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + if (value != null) { + currentSpan.setAttribute("custom.metadata." + key, value); + } + } super.onHeaders(headers); } }, @@ -406,44 +430,56 @@ public void onNext(EmptyProtos.Empty response) {} .setPayload(requestPayload) .setResponseSize(responseSize) .build(); - stub.unaryCall( - request, - new StreamObserver() { - @Override - public void onCompleted() { - handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); - } - @Override - public void onError(Throwable t) { - if (printResponse) { - logger.log(Level.WARNING, "Rpc failed", t); + io.opentelemetry.api.baggage.BaggageBuilder baggageBuilder = io.opentelemetry.api.baggage.Baggage.builder(); + for (String key : config.metadata.keys()) { + String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + if (value != null) { + baggageBuilder.put(key, value); + } + } + io.opentelemetry.api.baggage.Baggage baggage = baggageBuilder.build(); + + try (io.opentelemetry.context.Scope scope = io.opentelemetry.context.Context.current().with(baggage).makeCurrent()) { + stub.unaryCall( + request, + new StreamObserver() { + @Override + public void onCompleted() { + handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); } - handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), - savedWatchers); - } - @Override - public void onNext(SimpleResponse response) { - // TODO(ericgribkoff) Currently some test environments cannot access the stats RPC - // service and rely on parsing stdout. - if (printResponse) { - System.out.println( - "Greeting: Hello world, this is " - + response.getHostname() - + ", from " - + clientCallRef - .get() - .getAttributes() - .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + @Override + public void onError(Throwable t) { + if (printResponse) { + logger.log(Level.WARNING, "Rpc failed", t); + } + handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), + savedWatchers); } - // Use the hostname from the response if not present in the metadata. - // TODO(ericgribkoff) Delete when server is deployed that sets metadata value. - if (hostnameRef.get() == null) { - hostnameRef.set(response.getHostname()); + + @Override + public void onNext(SimpleResponse response) { + // TODO(ericgribkoff) Currently some test environments cannot access the stats RPC + // service and rely on parsing stdout. + if (printResponse) { + System.out.println( + "Greeting: Hello world, this is " + + response.getHostname() + + ", from " + + clientCallRef + .get() + .getAttributes() + .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + } + // Use the hostname from the response if not present in the metadata. + // TODO(ericgribkoff) Delete when server is deployed that sets metadata value. + if (hostnameRef.get() == null) { + hostnameRef.set(response.getHostname()); + } } - } - }); + }); + } } else { throw new AssertionError("Unknown RPC type: " + config.rpcType); } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java index 88f1bf468b6..5b48e59f8c2 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java @@ -46,13 +46,16 @@ import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.xds.XdsServerBuilder; import io.grpc.xds.XdsServerCredentials; +import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Locale; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -92,6 +95,7 @@ public final class XdsTestServer { private String host; private Util.AddressType addressType = Util.AddressType.IPV4_IPV6; private CsmObservability csmObservability; + private OpenTelemetrySdk openTelemetrySdk; /** * The main application allowing this client to be launched from the command line. @@ -197,14 +201,23 @@ void parseArgs(String[] args) { @IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs void start() throws Exception { if (enableCsmObservability) { + Map props = new HashMap<>(); + props.put("otel.logs.exporter", "none"); + props.put("otel.metrics.exporter", "otlp"); + String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER"); + if (tracesExporter != null) { + props.put("otel.traces.exporter", tracesExporter); + } else { + props.put("otel.traces.exporter", "none"); + } + + AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder() + .addPropertiesSupplier(() -> props) + .build(); + openTelemetrySdk = autoSdk.getOpenTelemetrySdk(); csmObservability = CsmObservability.newBuilder() - .sdk(AutoConfiguredOpenTelemetrySdk.builder() - .addPropertiesSupplier(() -> ImmutableMap.of( - "otel.logs.exporter", "none", - "otel.metrics.exporter", "prometheus", - "otel.traces.exporter", "none")) - .build() - .getOpenTelemetrySdk()) + .sdk(openTelemetrySdk) + .enableTracing(!"none".equals(props.get("otel.traces.exporter"))) .build(); csmObservability.registerGlobal(); } @@ -301,6 +314,9 @@ void stop() throws Exception { if (csmObservability != null) { csmObservability.close(); } + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } } private void blockUntilShutdown() throws InterruptedException { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java new file mode 100644 index 00000000000..8232f7a00c0 --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java @@ -0,0 +1,139 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertTrue; + +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.opentelemetry.GrpcOpenTelemetry; +import io.grpc.opentelemetry.InternalGrpcOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OpenTelemetryTracingInteropPocTest extends AbstractInteropTest { + + private TestSpanExporter spanExporter; + private OpenTelemetrySdk openTelemetrySdk; + private GrpcOpenTelemetry grpcOpenTelemetry; + + private static class TestSpanExporter implements SpanExporter { + private final List spans = new ArrayList<>(); + + @Override + public CompletableResultCode export(Collection spans) { + this.spans.addAll(spans); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + + public List getSpans() { + return spans; + } + } + + @Before + @Override + public void setUp() { + spanExporter = new TestSpanExporter(); + openTelemetrySdk = OpenTelemetrySdk.builder() + .setTracerProvider(SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build()) + .build(); + + GrpcOpenTelemetry.Builder grpcOpentelemetryBuilder = GrpcOpenTelemetry.newBuilder() + .sdk(openTelemetrySdk); + InternalGrpcOpenTelemetry.enableTracing(grpcOpentelemetryBuilder, true); + grpcOpenTelemetry = grpcOpentelemetryBuilder.build(); + + super.setUp(); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } + } + + @Override + protected ServerBuilder getServerBuilder() { + NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + grpcOpenTelemetry.configureServerBuilder(builder); + return builder; + } + + @Override + protected ManagedChannelBuilder createChannelBuilder() { + NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .usePlaintext(); + grpcOpenTelemetry.configureChannelBuilder(builder); + return builder; + } + + @Override + protected boolean metricsExpected() { + return false; + } + + @Test + public void verifySpansGenerated() throws Exception { + blockingStub.emptyCall(io.grpc.testing.integration.EmptyProtos.Empty.getDefaultInstance()); + + // Wait a bit for spans to be exported (SimpleSpanProcessor is synchronous, so they should be there) + Thread.sleep(500); + + List spans = spanExporter.getSpans(); + System.out.println("Captured spans: " + spans.size()); + for (SpanData span : spans) { + System.out.println("Span: " + span.getName()); + } + + assertTrue("Expected at least one span", spans.size() > 0); + } +} diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a2846fd04c8..2748a2679f1 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult(); + return PickResult.withNoResult("rls:lookup_pending"); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index a52390743a6..d5d94c4dd6e 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,6 +262,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -493,6 +494,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 9c9998571e5..1bf24b12a19 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -38,6 +38,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 22940e875ac..ab0b2c49c21 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,8 +41,10 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); public RoundRobinLoadBalancer(Helper helper) { super(helper); @@ -68,7 +70,7 @@ protected void updateOverallBalancingState() { } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); } else { updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); } diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 18854ca1bb6..895cf9b4251 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,7 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult()); + new FixedResultPicker(PickResult.withNoResult("round_robin:connecting")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index f6ee60ab1ef..8be155ec0f8 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; @@ -119,6 +120,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet + helper.updateBalancingState( + CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index 6e4566de76d..ab84c2b96e5 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,7 +322,11 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = newPicker; + if (newState == CONNECTING || newState == IDLE) { + picker = new PriorityPicker(newPicker, priority); + } else { + picker = newPicker; + } if (deletionTimer != null && deletionTimer.isPending()) { return; @@ -357,4 +361,41 @@ protected Helper delegate() { } } } + + private static final class PriorityPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String priority; + + PriorityPicker(SubchannelPicker delegate, String priority) { + this.delegate = checkNotNull(delegate, "delegate"); + this.priority = checkNotNull(priority, "priority"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = delegate.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PriorityPicker that = (PriorityPicker) o; + return delegate.equals(that.delegate) && priority.equals(that.priority); + } + + @Override + public int hashCode() { + return Objects.hash(delegate, priority); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 513f4d643ea..15cd5dba621 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("ring_hash:connecting"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs // are failed unless there is a READY connection. if (subchannelView.connectivityState == CONNECTING) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } if (subchannelView.connectivityState == IDLE) { @@ -463,7 +465,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return PickResult.withNoResult(); // Indicates that this should be retried after backoff + // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; } } } else { @@ -487,7 +490,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } if (requestedConnection) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..51e0d08f223 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -343,6 +343,34 @@ public void dynamicCluster() { assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); } + @Test + public void discoverDynamicCluster_pending_emitsToken() { + String clusterName = "cluster2"; + CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); + + XdsConfig xdsConfig = new XdsConfig(null, null, null, ImmutableMap.of()); + + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set( + XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, + new XdsConfig.XdsClusterSubscriptionRegistry() { + @Override + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + return mock(XdsConfig.Subscription.class); + } + }) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + + verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); + } + @Test public void discoverAggregateCluster_createsPriorityLbPolicy() { CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index beb568be9ce..6f0db55a8a7 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -531,7 +531,8 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); + verify(helper, times(2)) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); Helper helper0 = Iterables.getOnlyElement(fooHelpers); @@ -547,7 +548,7 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { helper0.updateBalancingState( CONNECTING, EMPTY_PICKER); - verify(helper, times(2)) + verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // failover happens @@ -573,7 +574,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - inOrder.verify(helper) + inOrder.verify(helper, times(2)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); @@ -591,7 +592,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { fakeClock.forwardTime(5, TimeUnit.SECONDS); assertThat(fooBalancers).hasSize(2); assertThat(fooHelpers).hasSize(2); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); Helper helper1 = Iterables.getLast(fooHelpers); @@ -869,7 +870,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -907,7 +908,37 @@ public void noDuplicateOverallBalancingStateUpdate() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(4)).updateBalancingState(any(), any()); + verify(helper, times(6)).updateBalancingState(any(), any()); + } + + @Test + public void priorityPicker_prependsToken() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_token")); + + helper0.updateBalancingState(CONNECTING, mockChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayReasonToken()).isEqualTo("priority_p0:child_token"); } private void assertLatestConnectivityState(ConnectivityState expectedState) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b515ed81158..387bc525043 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,6 +160,7 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -524,6 +525,7 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(0); } @@ -546,6 +548,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(1); }