From fb52bbf17a6e983980879e0dcefa16ff27e5a1e6 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Mon, 19 Jan 2026 06:40:18 +0000 Subject: [PATCH 01/17] Save changes. --- .../main/java/io/grpc/testing/integration/TestCases.java | 3 ++- .../io/grpc/testing/integration/TestServiceClient.java | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 2d16065254a..9e7d31c55b0 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -59,7 +59,8 @@ public enum TestCases { RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"), CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), ORCA_PER_RPC("report backend metrics per query"), - ORCA_OOB("report backend metrics out-of-band"); + ORCA_OOB("report backend metrics out-of-band"), + MCS("max concurrent streaming"); private final String description; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 125d876b705..aa700e43494 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -563,6 +563,11 @@ private void runTest(TestCases testCase) throws Exception { tester.testOrcaOob(); break; } + + case MCS: { + tester.testMcs(); + break; + } default: throw new IllegalArgumentException("Unknown test case: " + testCase); @@ -1054,6 +1059,9 @@ protected ServerBuilder getHandshakerServerBuilder() { protected int operationTimeoutMillis() { return 15000; } + + public void testMcs() { + } } private static String validTestCasesHelpText() { From f518d5efaf011eb8c57bff1dcf703c6d7d017b41 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 20 Jan 2026 06:23:27 +0000 Subject: [PATCH 02/17] Save changes. --- bom/build.gradle | 38 ---------- build.gradle | 5 ++ interop-testing/build.gradle | 5 ++ .../integration/TestServiceClient.java | 76 ++++++++++++++----- .../integration/TestServiceServer.java | 7 ++ 5 files changed, 73 insertions(+), 58 deletions(-) delete mode 100644 bom/build.gradle diff --git a/bom/build.gradle b/bom/build.gradle deleted file mode 100644 index f7f3918372f..00000000000 --- a/bom/build.gradle +++ /dev/null @@ -1,38 +0,0 @@ -plugins { - id 'java-platform' - id "maven-publish" -} - -description = 'gRPC: BOM' - -gradle.projectsEvaluated { - def projectsToInclude = rootProject.subprojects.findAll { - return it.name != 'grpc-compiler' - && it.plugins.hasPlugin('java') - && it.plugins.hasPlugin('maven-publish') - && it.tasks.findByName('publishMavenPublicationToMavenRepository')?.enabled - } - dependencies { - constraints { - projectsToInclude.each { api it } - } - } -} - -publishing { - publications { - maven(MavenPublication) { - from components.javaPlatform - pom.withXml { - def dependencies = asNode().dependencyManagement.dependencies.last() - // add protoc gen (produced by grpc-compiler with different artifact name) - // not sure how to express "pom" in gradle, kept in XML - def dependencyNode = dependencies.appendNode('dependency') - dependencyNode.appendNode('groupId', project.group) - dependencyNode.appendNode('artifactId', 'protoc-gen-grpc-java') - dependencyNode.appendNode('version', project.version) - dependencyNode.appendNode('type', 'pom') - } - } - } -} diff --git a/build.gradle b/build.gradle index 91690c1e3c3..01e09a5c920 100644 --- a/build.gradle +++ b/build.gradle @@ -19,6 +19,11 @@ subprojects { apply plugin: "com.google.osdetector" apply plugin: "net.ltgt.errorprone" + apply plugin: "java" + test { + testLogging.showStandardStreams = true + systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" + } group = "io.grpc" version = "1.79.0-SNAPSHOT" // CURRENT_GRPC_VERSION diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 5160759460c..7be6dac279b 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -8,6 +8,11 @@ plugins { } description = "gRPC: Integration Testing" +apply plugin: "java-library" +test { + testLogging.showStandardStreams = true + systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" +} dependencies { implementation project(path: ':grpc-alts', configuration: 'shadow'), diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index aa700e43494..2412b8cc52d 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -17,6 +17,7 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.testing.integration.TestCases.MCS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -30,28 +31,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; import com.google.protobuf.ByteString; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ChannelCredentials; -import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors; -import io.grpc.Grpc; -import io.grpc.InsecureChannelCredentials; -import io.grpc.InsecureServerCredentials; -import io.grpc.LoadBalancerProvider; -import io.grpc.LoadBalancerRegistry; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.ServerBuilder; -import io.grpc.TlsChannelCredentials; +import io.grpc.*; import io.grpc.alts.AltsChannelCredentials; import io.grpc.alts.ComputeEngineChannelCredentials; import io.grpc.alts.GoogleDefaultChannelCredentials; import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.GrpcUtil; import io.grpc.internal.JsonParser; +import io.grpc.internal.testing.StreamRecorder; import io.grpc.netty.InsecureFromHttp1ChannelCredentials; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder; @@ -65,6 +52,8 @@ import io.grpc.testing.integration.Messages.ResponseParameters; import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; +import io.grpc.testing.integration.Messages.StreamingInputCallRequest; +import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.TestOrcaReport; @@ -72,8 +61,7 @@ import java.io.FileInputStream; import java.io.InputStream; import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Map; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -601,9 +589,12 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor( } private class Tester extends AbstractInteropTest { + private FakeMetricsSink fakeMetricsSink = new FakeMetricsSink(); + @Override protected ManagedChannelBuilder createChannelBuilder() { - boolean useGeneric = false; + boolean useSubchannelMetricsSink = testCase.equals(MCS.toString()); + boolean useGeneric = useSubchannelMetricsSink? true : false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -670,6 +661,9 @@ protected ManagedChannelBuilder createChannelBuilder() { if (addMdInterceptor != null) { channelBuilder.intercept(addMdInterceptor); } + if (useSubchannelMetricsSink) { + InternalManagedChannelBuilder.addMetricSink(channelBuilder, fakeMetricsSink); + } return channelBuilder; } if (!useOkHttp) { @@ -1060,7 +1054,19 @@ protected int operationTimeoutMillis() { return 15000; } - public void testMcs() { + public void testMcs() throws Exception { + final StreamingInputCallRequest request = StreamingInputCallRequest.newBuilder() + .setPayload(Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[27182]))) + .build(); + StreamRecorder responseObserver = StreamRecorder.create(); + StreamObserver requestObserver = + asyncStub.streamingInputCall(responseObserver); + requestObserver.onNext(request); + + // assertThat(fakeMetricsSink.longUpDownCounterMetricInstrumentValues.get("grpc.subchannel.open_connections")).isEqualTo(1); + requestObserver.onCompleted(); + responseObserver.awaitCompletion(); } } @@ -1075,4 +1081,34 @@ private static String validTestCasesHelpText() { } return builder.toString(); } + + static class FakeMetricsSink implements MetricSink { + Map longUpDownCounterMetricInstrumentValues = new HashMap<>(); + @Override + public Map getEnabledMetrics() { + return null; + } + + @Override + public Set getOptionalLabels() { + return null; + } + + @Override + public int getMeasuresSize() { + return 0; + } + + @Override + public void updateMeasures(List instruments) { + System.out.println("updateMeasures"); + } + + @Override + public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { + longUpDownCounterMetricInstrumentValues.put(metricInstrument, value); + } + } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index fc4cdf9178f..511e264410c 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,6 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; + private int mcsLimit = 0; private ScheduledExecutorService executor; private Server server; @@ -118,6 +119,9 @@ void parseArgs(String[] args) { usage = true; break; } + } else if ("mcs_limit".equals(key)) { + mcsLimit = Integer.parseInt(value); + addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); usage = true; @@ -186,6 +190,9 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } + if (mcsLimit > 0) { + ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcsLimit); + } break; case IPV6: List v6Addresses = Util.getV6Addresses(port); From 3874631bc39e347e98995c7341a749c5e26673e8 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 20 Jan 2026 12:15:52 +0000 Subject: [PATCH 03/17] Save changes. --- .../integration/TestServiceClient.java | 52 +++++++++++++++---- .../integration/TestServiceServer.java | 10 ++-- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 2412b8cc52d..b97a8f8f911 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -30,6 +30,8 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; import io.grpc.*; import io.grpc.alts.AltsChannelCredentials; @@ -1059,17 +1061,51 @@ public void testMcs() throws Exception { .setPayload(Payload.newBuilder() .setBody(ByteString.copyFrom(new byte[27182]))) .build(); - StreamRecorder responseObserver = StreamRecorder.create(); - StreamObserver requestObserver = - asyncStub.streamingInputCall(responseObserver); - requestObserver.onNext(request); + StreamRecorder responseObserver1 = StreamRecorder.create(); + StreamObserver requestObserver1 = + asyncStub.streamingInputCall(responseObserver1); + requestObserver1.onNext(request); + StreamRecorder responseObserver2 = StreamRecorder.create(); + StreamObserver requestObserver2 = + asyncStub.streamingInputCall(responseObserver2); + requestObserver2.onNext(request); // assertThat(fakeMetricsSink.longUpDownCounterMetricInstrumentValues.get("grpc.subchannel.open_connections")).isEqualTo(1); - requestObserver.onCompleted(); - responseObserver.awaitCompletion(); + requestObserver2.onCompleted(); + responseObserver2.awaitCompletion(); + requestObserver1.onCompleted(); + + responseObserver1.awaitCompletion(); + } } + /* + public static ListenableFuture performTaskAsync() { + // Create a SettableFuture. This is the "handle" we control externally. + final SettableFuture future = SettableFuture.create(); + + // Submit the actual work to the executor service + executorService.submit(() -> { + try { + System.out.println("Worker thread: Task starting..."); + // Simulate some work that takes time + TimeUnit.SECONDS.sleep(2); + System.out.println("Worker thread: Task finished."); + + // When the work is done, set the future's value to true + future.set(true); + + } catch (InterruptedException e) { + // If something goes wrong, set the future to an exception + future.setException(e); + } + }); + + // Return the future immediately + return future; + } +*/ private static String validTestCasesHelpText() { StringBuilder builder = new StringBuilder(); for (TestCases testCase : TestCases.values()) { @@ -1100,9 +1136,7 @@ public int getMeasuresSize() { } @Override - public void updateMeasures(List instruments) { - System.out.println("updateMeasures"); - } + public void updateMeasures(List instruments) {} @Override public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index 511e264410c..5845b1c387f 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,7 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; - private int mcsLimit = 0; + private int mcs = 0; private ScheduledExecutorService executor; private Server server; @@ -119,8 +119,8 @@ void parseArgs(String[] args) { usage = true; break; } - } else if ("mcs_limit".equals(key)) { - mcsLimit = Integer.parseInt(value); + } else if ("mcs".equals(key)) { + mcs = Integer.parseInt(value); addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); @@ -190,8 +190,8 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } - if (mcsLimit > 0) { - ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcsLimit); + if (mcs > 0) { + ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcs); } break; case IPV6: From 0bad82f1555fec97a24032cac82911129e3e7501 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:11:27 +0000 Subject: [PATCH 04/17] Save changes. --- .../integration/TestServiceClient.java | 172 ++++++++++-------- .../integration/TestServiceServer.java | 10 +- 2 files changed, 104 insertions(+), 78 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index b97a8f8f911..b375ee7e4bf 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.ComputeEngineCredentials; @@ -30,10 +31,27 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; -import io.grpc.*; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ChannelCredentials; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; +import io.grpc.InternalManagedChannelBuilder; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.LongUpDownCounterMetricInstrument; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MetricInstrument; +import io.grpc.MetricSink; +import io.grpc.ServerBuilder; +import io.grpc.TlsChannelCredentials; import io.grpc.alts.AltsChannelCredentials; import io.grpc.alts.ComputeEngineChannelCredentials; import io.grpc.alts.GoogleDefaultChannelCredentials; @@ -61,9 +79,13 @@ import io.grpc.testing.integration.Messages.TestOrcaReport; import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -596,7 +618,7 @@ private class Tester extends AbstractInteropTest { @Override protected ManagedChannelBuilder createChannelBuilder() { boolean useSubchannelMetricsSink = testCase.equals(MCS.toString()); - boolean useGeneric = useSubchannelMetricsSink? true : false; + boolean useGeneric = testCase.equals(MCS.toString())? true : false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -656,7 +678,17 @@ protected ManagedChannelBuilder createChannelBuilder() { if (serverHostOverride != null) { channelBuilder.overrideAuthority(serverHostOverride); } - if (serviceConfig != null) { + if (testCase.equals(MCS.toString())) { + channelBuilder.disableServiceConfigLookUp(); + try { + @SuppressWarnings("unchecked") + Map serviceConfigMap = (Map) JsonParser.parse( + "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); + channelBuilder.defaultServiceConfig(serviceConfigMap); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (serviceConfig != null) { channelBuilder.disableServiceConfigLookUp(); channelBuilder.defaultServiceConfig(serviceConfig); } @@ -980,31 +1012,16 @@ public void testOrcaOob() throws Exception { .build(); final int retryLimit = 5; - BlockingQueue queue = new LinkedBlockingQueue<>(); - final Object lastItem = new Object(); + StreamingOutputCallResponseObserver streamingOutputCallResponseObserver = + new StreamingOutputCallResponseObserver(); StreamObserver streamObserver = - asyncStub.fullDuplexCall(new StreamObserver() { - - @Override - public void onNext(StreamingOutputCallResponse value) { - queue.add(value); - } - - @Override - public void onError(Throwable t) { - queue.add(t); - } - - @Override - public void onCompleted() { - queue.add(lastItem); - } - }); + asyncStub.fullDuplexCall(streamingOutputCallResponseObserver); streamObserver.onNext(StreamingOutputCallRequest.newBuilder() .setOrcaOobReport(answer) .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + assertThat(streamingOutputCallResponseObserver.take()) + .isInstanceOf(StreamingOutputCallResponse.class); int i = 0; for (; i < retryLimit; i++) { Thread.sleep(1000); @@ -1017,7 +1034,7 @@ public void onCompleted() { streamObserver.onNext(StreamingOutputCallRequest.newBuilder() .setOrcaOobReport(answer2) .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + assertThat(streamingOutputCallResponseObserver.isCompleted).isTrue(); for (i = 0; i < retryLimit; i++) { Thread.sleep(1000); @@ -1027,8 +1044,6 @@ public void onCompleted() { } } assertThat(i).isLessThan(retryLimit); - streamObserver.onCompleted(); - assertThat(queue.take()).isSameInstanceAs(lastItem); } @Override @@ -1056,56 +1071,60 @@ protected int operationTimeoutMillis() { return 15000; } - public void testMcs() throws Exception { - final StreamingInputCallRequest request = StreamingInputCallRequest.newBuilder() - .setPayload(Payload.newBuilder() - .setBody(ByteString.copyFrom(new byte[27182]))) - .build(); - StreamRecorder responseObserver1 = StreamRecorder.create(); - StreamObserver requestObserver1 = - asyncStub.streamingInputCall(responseObserver1); - requestObserver1.onNext(request); - StreamRecorder responseObserver2 = StreamRecorder.create(); - StreamObserver requestObserver2 = - asyncStub.streamingInputCall(responseObserver2); - requestObserver2.onNext(request); - - // assertThat(fakeMetricsSink.longUpDownCounterMetricInstrumentValues.get("grpc.subchannel.open_connections")).isEqualTo(1); - requestObserver2.onCompleted(); - responseObserver2.awaitCompletion(); - requestObserver1.onCompleted(); - - responseObserver1.awaitCompletion(); + class StreamingOutputCallResponseObserver implements StreamObserver { + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private volatile boolean isCompleted = true; + + @Override + public void onNext(StreamingOutputCallResponse value) { + queue.add(value); + } + + @Override + public void onError(Throwable t) { + queue.add(t); + } + + @Override + public void onCompleted() { + isCompleted = true; + } + Object take() throws InterruptedException { + return queue.take(); + } } - } - /* - public static ListenableFuture performTaskAsync() { - // Create a SettableFuture. This is the "handle" we control externally. - final SettableFuture future = SettableFuture.create(); + public void testMcs() throws Exception { + StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver1 = + asyncStub.fullDuplexCall(responseObserver1); + streamObserver1.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver1.take()).isInstanceOf(StreamingOutputCallResponse.class); - // Submit the actual work to the executor service - executorService.submit(() -> { - try { - System.out.println("Worker thread: Task starting..."); - // Simulate some work that takes time - TimeUnit.SECONDS.sleep(2); - System.out.println("Worker thread: Task finished."); + StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver2 = + asyncStub.fullDuplexCall(responseObserver2); + streamObserver2.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver2.take()).isInstanceOf(StreamingOutputCallResponse.class); - // When the work is done, set the future's value to true - future.set(true); + assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(1); - } catch (InterruptedException e) { - // If something goes wrong, set the future to an exception - future.setException(e); - } - }); + // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new + // connection to be created in the same subchannel and not get queued. + StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver(); + StreamObserver streamObserver3 = + asyncStub.fullDuplexCall(responseObserver3); + streamObserver3.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(responseObserver3.take()).isInstanceOf(StreamingOutputCallResponse.class); - // Return the future immediately - return future; + assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(2); + } } -*/ + private static String validTestCasesHelpText() { StringBuilder builder = new StringBuilder(); for (TestCases testCase : TestCases.values()) { @@ -1119,7 +1138,8 @@ private static String validTestCasesHelpText() { } static class FakeMetricsSink implements MetricSink { - Map longUpDownCounterMetricInstrumentValues = new HashMap<>(); + private volatile long openConnectionCount; + @Override public Map getEnabledMetrics() { return null; @@ -1142,7 +1162,13 @@ public void updateMeasures(List instruments) {} public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, List requiredLabelValues, List optionalLabelValues) { - longUpDownCounterMetricInstrumentValues.put(metricInstrument, value); + if (metricInstrument.getName().equals("grpc.subchannel.open_connections")) { + openConnectionCount = value; + } + } + + synchronized long getOpenConnectionCount() { + return openConnectionCount; } } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index 5845b1c387f..cf995e4f8d4 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,7 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; - private int mcs = 0; + private boolean useMcs = false; private ScheduledExecutorService executor; private Server server; @@ -119,8 +119,8 @@ void parseArgs(String[] args) { usage = true; break; } - } else if ("mcs".equals(key)) { - mcs = Integer.parseInt(value); + } else if ("use_mcs".equals(key)) { + useMcs = Boolean.parseBoolean(value); addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); @@ -190,8 +190,8 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } - if (mcs > 0) { - ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcs); + if (useMcs) { + ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2); } break; case IPV6: From 51b38e27726eeb8b8f75fabbee17bd8b3370e5c1 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:19:19 +0000 Subject: [PATCH 05/17] Fix enum test. --- .../test/java/io/grpc/testing/integration/TestCasesTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java index ab32d584e7c..4f6ea8e7931 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java @@ -67,7 +67,8 @@ public void testCaseNamesShouldMapToEnums() { "cancel_after_first_response", "timeout_on_sleeping_server", "orca_per_rpc", - "orca_oob" + "orca_oob", + "mcs", }; // additional test cases From 4da06a418cfabfcee9980f5c59e5597dd489ef2b Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:37:36 +0000 Subject: [PATCH 06/17] Revert temp changes. --- bom/build.gradle | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 bom/build.gradle diff --git a/bom/build.gradle b/bom/build.gradle new file mode 100644 index 00000000000..f7f3918372f --- /dev/null +++ b/bom/build.gradle @@ -0,0 +1,38 @@ +plugins { + id 'java-platform' + id "maven-publish" +} + +description = 'gRPC: BOM' + +gradle.projectsEvaluated { + def projectsToInclude = rootProject.subprojects.findAll { + return it.name != 'grpc-compiler' + && it.plugins.hasPlugin('java') + && it.plugins.hasPlugin('maven-publish') + && it.tasks.findByName('publishMavenPublicationToMavenRepository')?.enabled + } + dependencies { + constraints { + projectsToInclude.each { api it } + } + } +} + +publishing { + publications { + maven(MavenPublication) { + from components.javaPlatform + pom.withXml { + def dependencies = asNode().dependencyManagement.dependencies.last() + // add protoc gen (produced by grpc-compiler with different artifact name) + // not sure how to express "pom" in gradle, kept in XML + def dependencyNode = dependencies.appendNode('dependency') + dependencyNode.appendNode('groupId', project.group) + dependencyNode.appendNode('artifactId', 'protoc-gen-grpc-java') + dependencyNode.appendNode('version', project.version) + dependencyNode.appendNode('type', 'pom') + } + } + } +} From df70a2721e437203760a5cfaea9ea0dcfff1baad Mon Sep 17 00:00:00 2001 From: Kannan J Date: Wed, 21 Jan 2026 14:39:45 +0000 Subject: [PATCH 07/17] Revert temp changes. --- build.gradle | 5 ----- interop-testing/build.gradle | 5 ----- 2 files changed, 10 deletions(-) diff --git a/build.gradle b/build.gradle index 01e09a5c920..91690c1e3c3 100644 --- a/build.gradle +++ b/build.gradle @@ -19,11 +19,6 @@ subprojects { apply plugin: "com.google.osdetector" apply plugin: "net.ltgt.errorprone" - apply plugin: "java" - test { - testLogging.showStandardStreams = true - systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" - } group = "io.grpc" version = "1.79.0-SNAPSHOT" // CURRENT_GRPC_VERSION diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 7be6dac279b..5160759460c 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -8,11 +8,6 @@ plugins { } description = "gRPC: Integration Testing" -apply plugin: "java-library" -test { - testLogging.showStandardStreams = true - systemProperty 'java.util.logging.config.file', "/home/kannanj/grpc-logger.properties" -} dependencies { implementation project(path: ':grpc-alts', configuration: 'shadow'), From bee005f6fdee2904b2f7fdfb8c5196339fe96b0f Mon Sep 17 00:00:00 2001 From: Kannan J Date: Fri, 23 Jan 2026 14:23:07 +0000 Subject: [PATCH 08/17] Try with server streaming alone. --- .../io/grpc/testing/integration/TestCases.java | 3 ++- .../testing/integration/TestServiceClient.java | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 9e7d31c55b0..23508999eb3 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -60,7 +60,8 @@ public enum TestCases { CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), ORCA_PER_RPC("report backend metrics per query"), ORCA_OOB("report backend metrics out-of-band"), - MCS("max concurrent streaming"); + MCS("max concurrent streaming"), + MCSSS("mcs server streaming"); private final String description; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index b375ee7e4bf..c1251e59da2 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -581,6 +581,11 @@ private void runTest(TestCases testCase) throws Exception { break; } + case MCSSS: { + tester.testMcs_serverStreaming(); + break; + } + default: throw new IllegalArgumentException("Unknown test case: " + testCase); } @@ -1123,7 +1128,16 @@ public void testMcs() throws Exception { assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(2); } - } + + public void testMcs_serverStreaming() throws Exception { + StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build(); + StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver(); + asyncStub.streamingOutputCall(request, responseObserver1); + assertThat(responseObserver1.take()).isInstanceOf(StreamingOutputCallResponse.class); + } + + } private static String validTestCasesHelpText() { StringBuilder builder = new StringBuilder(); From efce8189cf4ad6903834d680f21843edf2c94d08 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Sat, 31 Jan 2026 14:34:27 +0000 Subject: [PATCH 09/17] Interceptor logic. --- .../grpc/testing/integration/TestCases.java | 3 +- .../integration/TestServiceClient.java | 45 +++++++++---------- .../testing/integration/TestServiceImpl.java | 41 ++++++++++++++++- 3 files changed, 63 insertions(+), 26 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 23508999eb3..f058a9190cf 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -60,8 +60,7 @@ public enum TestCases { CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), ORCA_PER_RPC("report backend metrics per query"), ORCA_OOB("report backend metrics out-of-band"), - MCS("max concurrent streaming"), - MCSSS("mcs server streaming"); + MCS_CS("max concurrent streaming connection scaling"); private final String description; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index c1251e59da2..01fa9c21cf9 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -17,7 +17,7 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.testing.integration.TestCases.MCS; +import static io.grpc.testing.integration.TestCases.MCS_CS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -576,16 +576,10 @@ private void runTest(TestCases testCase) throws Exception { break; } - case MCS: { + case MCS_CS: { tester.testMcs(); break; } - - case MCSSS: { - tester.testMcs_serverStreaming(); - break; - } - default: throw new IllegalArgumentException("Unknown test case: " + testCase); } @@ -622,8 +616,8 @@ private class Tester extends AbstractInteropTest { @Override protected ManagedChannelBuilder createChannelBuilder() { - boolean useSubchannelMetricsSink = testCase.equals(MCS.toString()); - boolean useGeneric = testCase.equals(MCS.toString())? true : false; + boolean useSubchannelMetricsSink = testCase.equals(MCS_CS.toString()); + boolean useGeneric = testCase.equals(MCS_CS.toString())? true : false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -683,7 +677,7 @@ protected ManagedChannelBuilder createChannelBuilder() { if (serverHostOverride != null) { channelBuilder.overrideAuthority(serverHostOverride); } - if (testCase.equals(MCS.toString())) { + if (testCase.equals(MCS_CS.toString())) { channelBuilder.disableServiceConfigLookUp(); try { @SuppressWarnings("unchecked") @@ -1101,32 +1095,37 @@ Object take() throws InterruptedException { } public void testMcs() throws Exception { - StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver(); + StreamingOutputCallResponseObserver responseObserver1 = + new StreamingOutputCallResponseObserver(); StreamObserver streamObserver1 = asyncStub.fullDuplexCall(responseObserver1); - streamObserver1.onNext(StreamingOutputCallRequest.newBuilder() - .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(responseObserver1.take()).isInstanceOf(StreamingOutputCallResponse.class); + StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() + .setPayload(Payload.newBuilder().setBody( + ByteString.copyFrom(MCS_CS.description().getBytes())).build()).build(); + streamObserver1.onNext(request); + Object responseObj = responseObserver1.take(); + StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj; + String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody().toByteArray()); StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver(); StreamObserver streamObserver2 = asyncStub.fullDuplexCall(responseObserver2); - streamObserver2.onNext(StreamingOutputCallRequest.newBuilder() - .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(responseObserver2.take()).isInstanceOf(StreamingOutputCallResponse.class); + streamObserver2.onNext(request); + callResponse = (StreamingOutputCallResponse) responseObserver2.take(); + String clientSocketAddressInCall2 = new String(callResponse.getPayload().getBody().toByteArray()); - assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(1); + assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2); // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new // connection to be created in the same subchannel and not get queued. StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver(); StreamObserver streamObserver3 = asyncStub.fullDuplexCall(responseObserver3); - streamObserver3.onNext(StreamingOutputCallRequest.newBuilder() - .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(responseObserver3.take()).isInstanceOf(StreamingOutputCallResponse.class); + streamObserver3.onNext(request); + callResponse = (StreamingOutputCallResponse) responseObserver3.take(); + String clientSocketAddressInCall3 = new String(callResponse.getPayload().getBody().toByteArray()); - assertThat(fakeMetricsSink.openConnectionCount).isEqualTo(2); + assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1); } public void testMcs_serverStreaming() throws Exception { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index a9ee9382495..0aa3c3e686c 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -16,13 +16,19 @@ package io.grpc.testing.integration; +import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; +import static io.grpc.testing.integration.TestCases.MCS_CS; + import com.google.common.base.Preconditions; import com.google.common.collect.Queues; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.protobuf.ByteString; +import io.grpc.Attributes; +import io.grpc.Contexts; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; @@ -42,6 +48,7 @@ import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.TestOrcaReport; import io.grpc.testing.integration.TestServiceGrpc.AsyncService; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashMap; @@ -55,12 +62,14 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import io.grpc.Context; /** * Implementation of the business logic for the TestService. Uses an executor to schedule chunks * sent in response streams. */ public class TestServiceImpl implements io.grpc.BindableService, AsyncService { + static Context.Key PEER_ADDRESS_CONTEXT_KEY = Context.key("peer-address"); private final Random random = new Random(); private final ScheduledExecutorService executor; @@ -235,6 +244,16 @@ public void onNext(StreamingOutputCallRequest request) { .asRuntimeException()); return; } + if (new String(request.getPayload().getBody().toByteArray()).equals(MCS_CS.description())) { + SocketAddress peerAddress = PEER_ADDRESS_CONTEXT_KEY.get(); + ByteString payload = ByteString.copyFrom(peerAddress.toString().getBytes()); + StreamingOutputCallResponse.Builder responseBuilder = + StreamingOutputCallResponse.newBuilder(); + responseBuilder.setPayload( + Payload.newBuilder() + .setBody(payload)); + responseObserver.onNext(responseBuilder.build()); + } dispatcher.enqueue(toChunkQueue(request)); } @@ -507,7 +526,8 @@ public static List interceptors() { return Arrays.asList( echoRequestHeadersInterceptor(Util.METADATA_KEY), echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY), - echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY)); + echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY), + new McsScalingTestcaseInterceptor()); } /** @@ -539,6 +559,25 @@ public void close(Status status, Metadata trailers) { }; } + static class McsScalingTestcaseInterceptor implements ServerInterceptor { + @Override + public Listener interceptCall(ServerCall call, + Metadata headers, ServerCallHandler next) { + SocketAddress peerAddress = call.getAttributes().get(TRANSPORT_ATTR_REMOTE_ADDR); + + // Create a new context with the peer address value + Context newContext = Context.current().withValue(PEER_ADDRESS_CONTEXT_KEY, peerAddress); + try { + + // Continue the call processing within the new context + // return newContext.call(() -> next.startCall(call, headers)); + return Contexts.interceptCall(newContext, call, headers, next); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + /** * Echoes request headers with the specified key(s) from a client into response headers only. */ From 2ac2d4735bc1d53dcc60cdffb93a787830892a10 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 10 Feb 2026 09:21:18 +0000 Subject: [PATCH 10/17] Closing the stream after the rpc is done. --- .../testing/integration/TestServiceClient.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 01fa9c21cf9..8260a1c5371 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -1106,6 +1106,7 @@ public void testMcs() throws Exception { Object responseObj = responseObserver1.take(); StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj; String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody().toByteArray()); + assertThat(clientSocketAddressInCall1).isNotEmpty(); StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver(); StreamObserver streamObserver2 = @@ -1126,14 +1127,13 @@ public void testMcs() throws Exception { String clientSocketAddressInCall3 = new String(callResponse.getPayload().getBody().toByteArray()); assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1); - } - public void testMcs_serverStreaming() throws Exception { - StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() - .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build(); - StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver(); - asyncStub.streamingOutputCall(request, responseObserver1); - assertThat(responseObserver1.take()).isInstanceOf(StreamingOutputCallResponse.class); + streamObserver1.onCompleted(); + assertThat(responseObserver1.isCompleted).isTrue(); + streamObserver2.onCompleted(); + assertThat(responseObserver2.isCompleted).isTrue(); + streamObserver3.onCompleted(); + assertThat(responseObserver3.isCompleted).isTrue(); } } From 42148896ee20cecb9b6a99044ceebc472d42fc80 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 10 Feb 2026 12:40:06 +0000 Subject: [PATCH 11/17] Style fixes. --- .../integration/TestServiceClient.java | 75 ++++--------------- .../testing/integration/TestServiceImpl.java | 7 +- 2 files changed, 16 insertions(+), 66 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 8260a1c5371..d6b2e1ee112 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.ComputeEngineCredentials; @@ -40,16 +39,12 @@ import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; -import io.grpc.InternalManagedChannelBuilder; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; -import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -import io.grpc.MetricInstrument; -import io.grpc.MetricSink; import io.grpc.ServerBuilder; import io.grpc.TlsChannelCredentials; import io.grpc.alts.AltsChannelCredentials; @@ -58,7 +53,6 @@ import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.GrpcUtil; import io.grpc.internal.JsonParser; -import io.grpc.internal.testing.StreamRecorder; import io.grpc.netty.InsecureFromHttp1ChannelCredentials; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder; @@ -72,8 +66,6 @@ import io.grpc.testing.integration.Messages.ResponseParameters; import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; -import io.grpc.testing.integration.Messages.StreamingInputCallRequest; -import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.TestOrcaReport; @@ -83,9 +75,7 @@ import java.io.InputStream; import java.nio.charset.Charset; import java.util.Arrays; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -612,12 +602,10 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor( } private class Tester extends AbstractInteropTest { - private FakeMetricsSink fakeMetricsSink = new FakeMetricsSink(); @Override protected ManagedChannelBuilder createChannelBuilder() { - boolean useSubchannelMetricsSink = testCase.equals(MCS_CS.toString()); - boolean useGeneric = testCase.equals(MCS_CS.toString())? true : false; + boolean useGeneric = testCase.equals(MCS_CS.toString()) ? true : false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -694,9 +682,6 @@ protected ManagedChannelBuilder createChannelBuilder() { if (addMdInterceptor != null) { channelBuilder.intercept(addMdInterceptor); } - if (useSubchannelMetricsSink) { - InternalManagedChannelBuilder.addMetricSink(channelBuilder, fakeMetricsSink); - } return channelBuilder; } if (!useOkHttp) { @@ -1070,7 +1055,8 @@ protected int operationTimeoutMillis() { return 15000; } - class StreamingOutputCallResponseObserver implements StreamObserver { + class StreamingOutputCallResponseObserver implements + StreamObserver { private final BlockingQueue queue = new LinkedBlockingQueue<>(); private volatile boolean isCompleted = true; @@ -1101,30 +1087,35 @@ public void testMcs() throws Exception { asyncStub.fullDuplexCall(responseObserver1); StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() .setPayload(Payload.newBuilder().setBody( - ByteString.copyFrom(MCS_CS.description().getBytes())).build()).build(); + ByteString.copyFromUtf8(MCS_CS.description())).build()).build(); streamObserver1.onNext(request); Object responseObj = responseObserver1.take(); StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj; - String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody().toByteArray()); + String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody() + .toByteArray(), UTF_8); assertThat(clientSocketAddressInCall1).isNotEmpty(); - StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver(); + StreamingOutputCallResponseObserver responseObserver2 = + new StreamingOutputCallResponseObserver(); StreamObserver streamObserver2 = asyncStub.fullDuplexCall(responseObserver2); streamObserver2.onNext(request); callResponse = (StreamingOutputCallResponse) responseObserver2.take(); - String clientSocketAddressInCall2 = new String(callResponse.getPayload().getBody().toByteArray()); + String clientSocketAddressInCall2 = + new String(callResponse.getPayload().getBody().toByteArray(), UTF_8); assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2); // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new // connection to be created in the same subchannel and not get queued. - StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver(); + StreamingOutputCallResponseObserver responseObserver3 = + new StreamingOutputCallResponseObserver(); StreamObserver streamObserver3 = asyncStub.fullDuplexCall(responseObserver3); streamObserver3.onNext(request); callResponse = (StreamingOutputCallResponse) responseObserver3.take(); - String clientSocketAddressInCall3 = new String(callResponse.getPayload().getBody().toByteArray()); + String clientSocketAddressInCall3 = + new String(callResponse.getPayload().getBody().toByteArray(), UTF_8); assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1); @@ -1135,8 +1126,7 @@ public void testMcs() throws Exception { streamObserver3.onCompleted(); assertThat(responseObserver3.isCompleted).isTrue(); } - - } + } private static String validTestCasesHelpText() { StringBuilder builder = new StringBuilder(); @@ -1149,39 +1139,4 @@ private static String validTestCasesHelpText() { } return builder.toString(); } - - static class FakeMetricsSink implements MetricSink { - private volatile long openConnectionCount; - - @Override - public Map getEnabledMetrics() { - return null; - } - - @Override - public Set getOptionalLabels() { - return null; - } - - @Override - public int getMeasuresSize() { - return 0; - } - - @Override - public void updateMeasures(List instruments) {} - - @Override - public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, - List requiredLabelValues, - List optionalLabelValues) { - if (metricInstrument.getName().equals("grpc.subchannel.open_connections")) { - openConnectionCount = value; - } - } - - synchronized long getOpenConnectionCount() { - return openConnectionCount; - } - } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 0aa3c3e686c..88219408261 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -23,7 +23,7 @@ import com.google.common.collect.Queues; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.protobuf.ByteString; -import io.grpc.Attributes; +import io.grpc.Context; import io.grpc.Contexts; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; @@ -62,7 +62,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import io.grpc.Context; /** * Implementation of the business logic for the TestService. Uses an executor to schedule chunks @@ -71,7 +70,6 @@ public class TestServiceImpl implements io.grpc.BindableService, AsyncService { static Context.Key PEER_ADDRESS_CONTEXT_KEY = Context.key("peer-address"); private final Random random = new Random(); - private final ScheduledExecutorService executor; private final ByteString compressableBuffer; private final MetricRecorder metricRecorder; @@ -568,9 +566,6 @@ public Listener interceptCall(ServerCall call, // Create a new context with the peer address value Context newContext = Context.current().withValue(PEER_ADDRESS_CONTEXT_KEY, peerAddress); try { - - // Continue the call processing within the new context - // return newContext.call(() -> next.startCall(call, headers)); return Contexts.interceptCall(newContext, call, headers, next); } catch (Exception ex) { throw new RuntimeException(ex); From e099d1d7e696082105e3dd1a5d635a912badd175 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 10 Feb 2026 12:42:17 +0000 Subject: [PATCH 12/17] Fix test name. --- .../test/java/io/grpc/testing/integration/TestCasesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java index 4f6ea8e7931..4f035749111 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java @@ -68,7 +68,7 @@ public void testCaseNamesShouldMapToEnums() { "timeout_on_sleeping_server", "orca_per_rpc", "orca_oob", - "mcs", + "mcs_cs", }; // additional test cases From 3d75bf8ab425b9222c721ce6adce1fe999c8e56d Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 10 Feb 2026 16:09:34 +0000 Subject: [PATCH 13/17] Fix style warnings. --- .../java/io/grpc/testing/integration/TestServiceImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 88219408261..6838b48c413 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -18,6 +18,7 @@ import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; import static io.grpc.testing.integration.TestCases.MCS_CS; +import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.base.Preconditions; import com.google.common.collect.Queues; @@ -242,9 +243,10 @@ public void onNext(StreamingOutputCallRequest request) { .asRuntimeException()); return; } - if (new String(request.getPayload().getBody().toByteArray()).equals(MCS_CS.description())) { + if (new String(request.getPayload().getBody().toByteArray(), UTF_8) + .equals(MCS_CS.description())) { SocketAddress peerAddress = PEER_ADDRESS_CONTEXT_KEY.get(); - ByteString payload = ByteString.copyFrom(peerAddress.toString().getBytes()); + ByteString payload = ByteString.copyFromUtf8(peerAddress.toString()); StreamingOutputCallResponse.Builder responseBuilder = StreamingOutputCallResponse.newBuilder(); responseBuilder.setPayload( From 84d9528afb7790a521dfbfc42fa7f26ca5a7ecc8 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 10 Feb 2026 16:29:56 +0000 Subject: [PATCH 14/17] Fix build. --- android-interop-testing/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/android-interop-testing/build.gradle b/android-interop-testing/build.gradle index 17551465f05..ee163ef190c 100644 --- a/android-interop-testing/build.gradle +++ b/android-interop-testing/build.gradle @@ -17,6 +17,7 @@ android { srcDirs += "${projectDir}/../interop-testing/src/main/java/" setIncludes(["io/grpc/android/integrationtest/**", "io/grpc/testing/integration/AbstractInteropTest.java", + "io/grpc/testing/integration/TestCases.java", "io/grpc/testing/integration/TestServiceImpl.java", "io/grpc/testing/integration/Util.java"]) } From 3136bca59184c6e776bbcf8504b2441d5e881f6a Mon Sep 17 00:00:00 2001 From: Kannan J Date: Thu, 12 Feb 2026 12:44:08 +0000 Subject: [PATCH 15/17] Review comments - Build channel separately for MCS connection scaling test. --- .../integration/TestServiceClient.java | 63 +++++++++++++------ 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index d6b2e1ee112..45c97c2330e 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -567,7 +567,44 @@ private void runTest(TestCases testCase) throws Exception { } case MCS_CS: { - tester.testMcs(); + ChannelCredentials channelCredentials; + if (useTls) { + if (!useTestCa) { + channelCredentials = TlsChannelCredentials.create(); + } else { + try { + channelCredentials = TlsChannelCredentials.newBuilder() + .trustManager(TlsTesting.loadCert("ca.pem")) + .build(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } else { + channelCredentials = InsecureChannelCredentials.create(); + } + ManagedChannelBuilder channelBuilder; + if (serverPort == 0) { + channelBuilder = Grpc.newChannelBuilder(serverHost, channelCredentials); + } else { + channelBuilder = + Grpc.newChannelBuilderForAddress(serverHost, serverPort, channelCredentials); + } + if (serverHostOverride != null) { + channelBuilder.overrideAuthority(serverHostOverride); + } + if (testCase.equals(MCS_CS.toString())) { + channelBuilder.disableServiceConfigLookUp(); + try { + @SuppressWarnings("unchecked") + Map serviceConfigMap = (Map) JsonParser.parse( + "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); + channelBuilder.defaultServiceConfig(serviceConfigMap); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + tester.testMcs(TestServiceGrpc.newStub(channelBuilder.build())); break; } default: @@ -605,7 +642,7 @@ private class Tester extends AbstractInteropTest { @Override protected ManagedChannelBuilder createChannelBuilder() { - boolean useGeneric = testCase.equals(MCS_CS.toString()) ? true : false; + boolean useGeneric = false; ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible @@ -665,17 +702,7 @@ protected ManagedChannelBuilder createChannelBuilder() { if (serverHostOverride != null) { channelBuilder.overrideAuthority(serverHostOverride); } - if (testCase.equals(MCS_CS.toString())) { - channelBuilder.disableServiceConfigLookUp(); - try { - @SuppressWarnings("unchecked") - Map serviceConfigMap = (Map) JsonParser.parse( - "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); - channelBuilder.defaultServiceConfig(serviceConfigMap); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (serviceConfig != null) { + if (serviceConfig != null) { channelBuilder.disableServiceConfigLookUp(); channelBuilder.defaultServiceConfig(serviceConfig); } @@ -1080,7 +1107,7 @@ Object take() throws InterruptedException { } } - public void testMcs() throws Exception { + public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception { StreamingOutputCallResponseObserver responseObserver1 = new StreamingOutputCallResponseObserver(); StreamObserver streamObserver1 = @@ -1108,7 +1135,7 @@ public void testMcs() throws Exception { // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new // connection to be created in the same subchannel and not get queued. - StreamingOutputCallResponseObserver responseObserver3 = + /*StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver(); StreamObserver streamObserver3 = asyncStub.fullDuplexCall(responseObserver3); @@ -1117,14 +1144,14 @@ public void testMcs() throws Exception { String clientSocketAddressInCall3 = new String(callResponse.getPayload().getBody().toByteArray(), UTF_8); - assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1); + assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1);*/ streamObserver1.onCompleted(); assertThat(responseObserver1.isCompleted).isTrue(); streamObserver2.onCompleted(); assertThat(responseObserver2.isCompleted).isTrue(); - streamObserver3.onCompleted(); - assertThat(responseObserver3.isCompleted).isTrue(); + /*streamObserver3.onCompleted(); + assertThat(responseObserver3.isCompleted).isTrue();*/ } } From c3fc7c307c6bdb7e43fe8a3be60e97bc9a366d29 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Thu, 12 Feb 2026 18:11:30 +0000 Subject: [PATCH 16/17] Address Review comments. --- .../integration/TestServiceClient.java | 62 ++++++++++--------- .../testing/integration/TestServiceImpl.java | 30 +++++---- .../integration/TestServiceServer.java | 11 ++-- .../main/proto/grpc/testing/messages.proto | 7 +++ 4 files changed, 66 insertions(+), 44 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 45c97c2330e..7dbe8483d91 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -17,7 +17,6 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.testing.integration.TestCases.MCS_CS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -593,16 +592,14 @@ private void runTest(TestCases testCase) throws Exception { if (serverHostOverride != null) { channelBuilder.overrideAuthority(serverHostOverride); } - if (testCase.equals(MCS_CS.toString())) { - channelBuilder.disableServiceConfigLookUp(); - try { - @SuppressWarnings("unchecked") - Map serviceConfigMap = (Map) JsonParser.parse( - "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); - channelBuilder.defaultServiceConfig(serviceConfigMap); - } catch (IOException e) { - throw new RuntimeException(e); - } + channelBuilder.disableServiceConfigLookUp(); + try { + @SuppressWarnings("unchecked") + Map serviceConfigMap = (Map) JsonParser.parse( + "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); + channelBuilder.defaultServiceConfig(serviceConfigMap); + } catch (IOException e) { + throw new RuntimeException(e); } tester.testMcs(TestServiceGrpc.newStub(channelBuilder.build())); break; @@ -1045,7 +1042,8 @@ public void testOrcaOob() throws Exception { streamObserver.onNext(StreamingOutputCallRequest.newBuilder() .setOrcaOobReport(answer2) .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); - assertThat(streamingOutputCallResponseObserver.isCompleted).isTrue(); + assertThat(streamingOutputCallResponseObserver.take()) + .isInstanceOf(StreamingOutputCallResponse.class); for (i = 0; i < retryLimit; i++) { Thread.sleep(1000); @@ -1055,6 +1053,8 @@ public void testOrcaOob() throws Exception { } } assertThat(i).isLessThan(retryLimit); + streamObserver.onCompleted(); + assertThat(streamingOutputCallResponseObserver.verifiedCompleted()).isTrue(); } @Override @@ -1084,8 +1084,8 @@ protected int operationTimeoutMillis() { class StreamingOutputCallResponseObserver implements StreamObserver { + private final Object lastItem = new Object(); private final BlockingQueue queue = new LinkedBlockingQueue<>(); - private volatile boolean isCompleted = true; @Override public void onNext(StreamingOutputCallResponse value) { @@ -1099,12 +1099,16 @@ public void onError(Throwable t) { @Override public void onCompleted() { - isCompleted = true; + queue.add(lastItem); } Object take() throws InterruptedException { return queue.take(); } + + boolean verifiedCompleted() throws InterruptedException { + return queue.take() == lastItem; + } } public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception { @@ -1113,13 +1117,15 @@ public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception StreamObserver streamObserver1 = asyncStub.fullDuplexCall(responseObserver1); StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() - .setPayload(Payload.newBuilder().setBody( - ByteString.copyFromUtf8(MCS_CS.description())).build()).build(); + .addResponseParameters(ResponseParameters.newBuilder() + .setSendClientSocketAddressInResponse( + Messages.BoolValue.newBuilder().setValue(true).build()) + .build()) + .build(); streamObserver1.onNext(request); Object responseObj = responseObserver1.take(); StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj; - String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody() - .toByteArray(), UTF_8); + String clientSocketAddressInCall1 = callResponse.getClientSocketAddress(); assertThat(clientSocketAddressInCall1).isNotEmpty(); StreamingOutputCallResponseObserver responseObserver2 = @@ -1128,30 +1134,30 @@ public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception asyncStub.fullDuplexCall(responseObserver2); streamObserver2.onNext(request); callResponse = (StreamingOutputCallResponse) responseObserver2.take(); - String clientSocketAddressInCall2 = - new String(callResponse.getPayload().getBody().toByteArray(), UTF_8); + String clientSocketAddressInCall2 = callResponse.getClientSocketAddress(); assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2); // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new // connection to be created in the same subchannel and not get queued. - /*StreamingOutputCallResponseObserver responseObserver3 = + StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver(); StreamObserver streamObserver3 = asyncStub.fullDuplexCall(responseObserver3); streamObserver3.onNext(request); callResponse = (StreamingOutputCallResponse) responseObserver3.take(); - String clientSocketAddressInCall3 = - new String(callResponse.getPayload().getBody().toByteArray(), UTF_8); + String clientSocketAddressInCall3 = callResponse.getClientSocketAddress(); - assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1);*/ + // This assertion is currently failing because connection scaling when MCS limit has been + // reached is not yet implemented in gRPC Java. + assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1); streamObserver1.onCompleted(); - assertThat(responseObserver1.isCompleted).isTrue(); + assertThat(responseObserver1.verifiedCompleted()).isTrue(); streamObserver2.onCompleted(); - assertThat(responseObserver2.isCompleted).isTrue(); - /*streamObserver3.onCompleted(); - assertThat(responseObserver3.isCompleted).isTrue();*/ + assertThat(responseObserver2.verifiedCompleted()).isTrue(); + streamObserver3.onCompleted(); + assertThat(responseObserver3.verifiedCompleted()).isTrue(); } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 6838b48c413..27cfbb9fdb2 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -17,8 +17,6 @@ package io.grpc.testing.integration; import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; -import static io.grpc.testing.integration.TestCases.MCS_CS; -import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.base.Preconditions; import com.google.common.collect.Queues; @@ -54,6 +52,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Queue; @@ -243,20 +242,27 @@ public void onNext(StreamingOutputCallRequest request) { .asRuntimeException()); return; } - if (new String(request.getPayload().getBody().toByteArray(), UTF_8) - .equals(MCS_CS.description())) { - SocketAddress peerAddress = PEER_ADDRESS_CONTEXT_KEY.get(); - ByteString payload = ByteString.copyFromUtf8(peerAddress.toString()); - StreamingOutputCallResponse.Builder responseBuilder = - StreamingOutputCallResponse.newBuilder(); - responseBuilder.setPayload( - Payload.newBuilder() - .setBody(payload)); - responseObserver.onNext(responseBuilder.build()); + if (whetherSendClientSocketAddressInResponse(request)) { + responseObserver.onNext( + StreamingOutputCallResponse.newBuilder() + .setClientSocketAddress(PEER_ADDRESS_CONTEXT_KEY.get().toString()) + .build()); + return; } dispatcher.enqueue(toChunkQueue(request)); } + private boolean whetherSendClientSocketAddressInResponse(StreamingOutputCallRequest request) { + Iterator responseParametersIterator = + request.getResponseParametersList().iterator(); + while (responseParametersIterator.hasNext()) { + if (responseParametersIterator.next().getSendClientSocketAddressInResponse().getValue()) { + return true; + } + } + return false; + } + @Override public void onCompleted() { if (oobTestLocked) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index cf995e4f8d4..0227482f3f5 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -75,7 +75,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; - private boolean useMcs = false; + private boolean setMcsLimit = false; private ScheduledExecutorService executor; private Server server; @@ -119,8 +119,9 @@ void parseArgs(String[] args) { usage = true; break; } - } else if ("use_mcs".equals(key)) { - useMcs = Boolean.parseBoolean(value); + } else if ("set_max_concurrent_streams_limit".equals(key)) { + setMcsLimit = Boolean.parseBoolean(value); + // TODO: Make Netty server builder usable for IPV6 as well (not limited to MCS handling) addressType = Util.AddressType.IPV4; // To use NettyServerBuilder } else { System.err.println("Unknown argument: " + key); @@ -145,6 +146,8 @@ void parseArgs(String[] args) { + "\n for testing. Only effective when --use_alts=true." + "\n --address_type=IPV4|IPV6|IPV4_IPV6" + "\n What type of addresses to listen on. Default IPV4_IPV6" + + "\n --set_max_concurrent_streams_limit" + + "\n Whether to set the maximum concurrent streams limit" ); System.exit(1); } @@ -190,7 +193,7 @@ void start() throws Exception { if (v4Address != null && !v4Address.equals(localV4Address)) { ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); } - if (useMcs) { + if (setMcsLimit) { ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2); } break; diff --git a/interop-testing/src/main/proto/grpc/testing/messages.proto b/interop-testing/src/main/proto/grpc/testing/messages.proto index fbcb6b4ce9b..74deae15afc 100644 --- a/interop-testing/src/main/proto/grpc/testing/messages.proto +++ b/interop-testing/src/main/proto/grpc/testing/messages.proto @@ -159,6 +159,10 @@ message ResponseParameters { // implement the full compression tests by introspecting the call to verify // the response's compression status. BoolValue compressed = 3; + + // Whether to request the server to send the requesting client's socket + // address in the response. + BoolValue send_client_socket_address_in_response = 4; } // Server-streaming request. @@ -186,6 +190,9 @@ message StreamingOutputCallRequest { message StreamingOutputCallResponse { // Payload to increase response size. Payload payload = 1; + + // The client's socket address if requested. + string client_socket_address = 2; } // For reconnect interop test only. From 93cb9adb9aee4853c2473c19efa9185a5a8789cb Mon Sep 17 00:00:00 2001 From: Kannan J Date: Fri, 13 Feb 2026 07:55:56 +0000 Subject: [PATCH 17/17] Expand the test name on the client side as well. --- .../src/main/java/io/grpc/testing/integration/TestCases.java | 2 +- .../java/io/grpc/testing/integration/TestServiceClient.java | 2 +- .../test/java/io/grpc/testing/integration/TestCasesTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index f058a9190cf..1a6de6c8da4 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -60,7 +60,7 @@ public enum TestCases { CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), ORCA_PER_RPC("report backend metrics per query"), ORCA_OOB("report backend metrics out-of-band"), - MCS_CS("max concurrent streaming connection scaling"); + MAX_CONCURRENT_STREAMS_CONNECTION_SCALING("max concurrent streaming connection scaling"); private final String description; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 7dbe8483d91..458ea60330b 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -565,7 +565,7 @@ private void runTest(TestCases testCase) throws Exception { break; } - case MCS_CS: { + case MAX_CONCURRENT_STREAMS_CONNECTION_SCALING: { ChannelCredentials channelCredentials; if (useTls) { if (!useTestCa) { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java index 4f035749111..51099ea2498 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java @@ -68,7 +68,7 @@ public void testCaseNamesShouldMapToEnums() { "timeout_on_sleeping_server", "orca_per_rpc", "orca_oob", - "mcs_cs", + "max_concurrent_streams_connection_scaling", }; // additional test cases