Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat
public void createPendingStream() {
}

/**
* A delay segment started with a specific reason during load balancing.
*
* @param reasonToken the reason for the delay, e.g., "pick_first:connecting"
* @since 1.82.0
*/
public void delayStarted(String reasonToken) {
}

/**
* The current delay segment ended.
*
* @since 1.82.0
*/
public void delayEnded() {
}

/**
* Headers has been sent to the socket.
*/
Expand Down
31 changes: 26 additions & 5 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -549,25 +549,30 @@ public static final class PickResult {
// True if the result is created by withDrop()
private final boolean drop;
@Nullable private final String authorityOverride;
@Nullable private final String delayReasonToken;

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = null;
this(subchannel, streamTracerFactory, status, drop, null, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride) {
this(subchannel, streamTracerFactory, status, drop, authorityOverride, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride,
@Nullable String delayReasonToken) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = authorityOverride;
this.delayReasonToken = delayReasonToken;
}

/**
Expand Down Expand Up @@ -727,6 +732,22 @@ public static PickResult withNoResult() {
return NO_RESULT;
}

/**
* No decision could be made. The RPC will stay buffered with a specific reason.
*
* @since 1.82.0
*/
public static PickResult withNoResult(String delayReasonToken) {
Preconditions.checkNotNull(delayReasonToken, "delayReasonToken");
return new PickResult(null, null, Status.OK, false, null, delayReasonToken);
}

/** Returns the delay reason token if any. */
@Nullable
public String getDelayReasonToken() {
return delayReasonToken;
}

/** Returns the authority override if any. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656")
@Nullable
Expand Down
49 changes: 44 additions & 5 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -157,7 +158,8 @@ public final ClientStream newStream(
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers, pickResult);
String token = pickResult != null ? pickResult.getDelayReasonToken() : null;
return createPendingStream(args, tracers, pickResult, token);
}
state = newerState;
}
Expand All @@ -173,8 +175,8 @@ public final ClientStream newStream(
*/
@GuardedBy("lock")
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
PickResult pickResult) {
PendingStream pendingStream = new PendingStream(args, tracers);
PickResult pickResult, @Nullable String delayReasonToken) {
PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken);
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
pendingStream.lastPickStatus = pickResult.getStatus();
}
Expand Down Expand Up @@ -303,6 +305,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
stream.endDelay();
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
Expand All @@ -315,7 +318,9 @@ final void reprocess(@Nullable SubchannelPicker picker) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
} else { // stay pending
stream.updateDelayReason(pickResult.getDelayReasonToken());
}
}

synchronized (lock) {
Expand Down Expand Up @@ -361,11 +366,44 @@ private class PendingStream extends DelayedStream {
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private volatile Status lastPickStatus;
@Nullable private String delayReasonToken;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
@Nullable String initialToken) {
super("connecting_and_lb");
this.args = args;
this.tracers = tracers;
this.delayReasonToken = initialToken;
if (initialToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayStarted(initialToken);
}
}
}

void updateDelayReason(String newToken) {
if (!Objects.equals(delayReasonToken, newToken)) {
if (delayReasonToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
}
delayReasonToken = newToken;
if (newToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayStarted(newToken);
}
}
}
}

void endDelay() {
if (delayReasonToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
delayReasonToken = null;
}
}

/** Runnable may be null. */
Expand All @@ -391,6 +429,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve

@Override
public void cancel(Status reason) {
endDelay();
super.cancel(reason);
synchronized (lock) {
if (reportTransportTerminated != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void createPendingStream() {
delegate().createPendingStream();
}

@Override
public void delayStarted(String reasonToken) {
delegate().delayStarted(reasonToken);
}

@Override
public void delayEnded() {
delegate().delayEnded();
}

@Override
public void outboundHeaders() {
delegate().outboundHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
* list and sticking to the first that works.
*/
final class PickFirstLoadBalancer extends LoadBalancer {
private static final PickResult CONNECTING_RESULT =
PickResult.withNoResult("pick_first:connecting");
private final Helper helper;
private Subchannel subchannel;
private ConnectivityState currentState = IDLE;
Expand Down Expand Up @@ -83,7 +85,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {

// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
Expand Down Expand Up @@ -135,7 +137,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
// the current picker in-place. But ignoring the potential optimization is simpler.
picker = new FixedResultPicker(PickResult.withNoResult());
picker = new FixedResultPicker(CONNECTING_RESULT);
break;
case READY:
picker = new FixedResultPicker(PickResult.withSubchannel(subchannel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,55 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure(
+ " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]");
}

@Test
public void streamDelayMetrics() {
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer };

SubchannelPicker connectingPicker = mock(SubchannelPicker.class);
when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("pick_first:connecting"));

delayedTransport.reprocess(connectingPicker);
delayedTransport.newStream(method, headers, callOptions, customTracers);

InOrder inOrder = inOrder(mockTracer);
inOrder.verify(mockTracer).delayStarted("pick_first:connecting");

SubchannelPicker customDelayPicker = mock(SubchannelPicker.class);
when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("rls:lookup_pending"));

delayedTransport.reprocess(customDelayPicker);

inOrder.verify(mockTracer).delayEnded();
inOrder.verify(mockTracer).delayStarted("rls:lookup_pending");

delayedTransport.reprocess(mockPicker);

inOrder.verify(mockTracer).delayEnded();
}

@Test
public void streamDelayMetrics_cancelled() {
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer };

SubchannelPicker connectingPicker = mock(SubchannelPicker.class);
when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("pick_first:connecting"));

delayedTransport.reprocess(connectingPicker);
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers);
stream.start(streamListener);

verify(mockTracer).delayStarted("pick_first:connecting");

stream.cancel(Status.CANCELLED);

verify(mockTracer).delayEnded();
}

private static TransportProvider newTransportProvider(final ClientTransport transport) {
return new TransportProvider() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception {
verify(mockSubchannel).requestConnection();

// Calling pickSubchannel() twice gave the same result
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs));
PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting");
assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs));

verifyNoMoreInteractions(mockHelper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
}

@VisibleForTesting
void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
delegate.configureChannelBuilder(builder);
}

Expand Down Expand Up @@ -115,6 +115,14 @@ public Builder sdk(OpenTelemetry sdk) {
return this;
}

/**
* Enables or disables tracing.
*/
public Builder enableTracing(boolean enable) {
InternalGrpcOpenTelemetry.enableTracing(delegate, enable);
return this;
}

/**
* Adds optionalLabelKey to all the metrics that can provide value for the
* optionalLabelKey.
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ opencensus-exporter-trace-stackdriver = { module = "io.opencensus:opencensus-exp
opencensus-impl = { module = "io.opencensus:opencensus-impl", version.ref = "opencensus" }
opentelemetry-api = "io.opentelemetry:opentelemetry-api:1.60.1"
opentelemetry-exporter-prometheus = "io.opentelemetry:opentelemetry-exporter-prometheus:1.60.1-alpha"
opentelemetry-exporter-otlp = "io.opentelemetry:opentelemetry-exporter-otlp:1.60.1"
opentelemetry-gcp-resources = "io.opentelemetry.contrib:opentelemetry-gcp-resources:1.54.0-alpha"
opentelemetry-sdk-extension-autoconfigure = "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.60.1"
opentelemetry-sdk-testing = "io.opentelemetry:opentelemetry-sdk-testing:1.60.1"
Expand Down
1 change: 1 addition & 0 deletions interop-testing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
libraries.netty.tcnative,
libraries.netty.tcnative.classes,
libraries.opentelemetry.exporter.prometheus, // For xds interop client
libraries.opentelemetry.exporter.otlp,
project(':grpc-googleapis'),
project(':grpc-grpclb'),
project(':grpc-rls')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import io.grpc.gcp.csm.observability.CsmObservability;

/**
* Application that starts a client for the {@link TestServiceGrpc.TestServiceImplBase} and runs
Expand All @@ -99,6 +100,13 @@ public class TestServiceClient {
public static void main(String[] args) throws Exception {
final TestServiceClient client = new TestServiceClient();
client.parseArgs(args);
if (client.enableOpentelemetryTracing) {
io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder()
.sdk(otel)
.build();
gotel.registerGlobal();
}
customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider();
LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider);
client.setUp();
Expand All @@ -107,6 +115,10 @@ public static void main(String[] args) throws Exception {
client.run();
} finally {
client.tearDown();
if (client.enableOpentelemetryTracing) {
System.out.println("Sleeping to flush spans...");
Thread.sleep(2000);
}
}
}

Expand Down Expand Up @@ -136,6 +148,7 @@ public static void main(String[] args) throws Exception {
private int soakResponseSize = 314159;
private int numThreads = 1;
private String additionalMetadata = "";
private boolean enableOpentelemetryTracing = false;
private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider;

private Tester tester = new Tester();
Expand Down Expand Up @@ -167,6 +180,8 @@ void parseArgs(String[] args) throws Exception {
serverHostOverride = value;
} else if ("server_port".equals(key)) {
serverPort = Integer.parseInt(value);
} else if ("enable_opentelemetry_tracing".equals(key)) {
enableOpentelemetryTracing = Boolean.parseBoolean(value);
} else if ("test_case".equals(key)) {
testCase = value;
} else if ("num_times".equals(key)) {
Expand Down Expand Up @@ -599,6 +614,9 @@ private class Tester extends AbstractInteropTest {
@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
boolean useGeneric = false;
if (enableOpentelemetryTracing) {
useGeneric = true;
}
ChannelCredentials channelCredentials;
if (customCredentialsType != null) {
useGeneric = true; // Retain old behavior; avoids erroring if incompatible
Expand Down
Loading
Loading