-
Notifications
You must be signed in to change notification settings - Fork 4k
MCS connection scaling interop tests for Java #12651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
fb52bbf
f518d5e
3874631
0bad82f
51b38e2
4da06a4
df70a27
bee005f
efce818
2ac2d47
4214889
e099d1d
3d75bf8
84d9528
3136bca
c3fc7c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,7 @@ | |
| import io.grpc.testing.integration.Messages.TestOrcaReport; | ||
| import java.io.File; | ||
| import java.io.FileInputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.nio.charset.Charset; | ||
| import java.util.Arrays; | ||
|
|
@@ -563,7 +564,46 @@ private void runTest(TestCases testCase) throws Exception { | |
| tester.testOrcaOob(); | ||
| break; | ||
| } | ||
|
|
||
|
|
||
| case MCS_CS: { | ||
| ChannelCredentials channelCredentials; | ||
| if (useTls) { | ||
| if (!useTestCa) { | ||
| channelCredentials = TlsChannelCredentials.create(); | ||
| } else { | ||
| try { | ||
| channelCredentials = TlsChannelCredentials.newBuilder() | ||
| .trustManager(TlsTesting.loadCert("ca.pem")) | ||
| .build(); | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
| } else { | ||
| channelCredentials = InsecureChannelCredentials.create(); | ||
| } | ||
| ManagedChannelBuilder<?> channelBuilder; | ||
| if (serverPort == 0) { | ||
| channelBuilder = Grpc.newChannelBuilder(serverHost, channelCredentials); | ||
| } else { | ||
| channelBuilder = | ||
| Grpc.newChannelBuilderForAddress(serverHost, serverPort, channelCredentials); | ||
| } | ||
| if (serverHostOverride != null) { | ||
| channelBuilder.overrideAuthority(serverHostOverride); | ||
| } | ||
| channelBuilder.disableServiceConfigLookUp(); | ||
| try { | ||
| @SuppressWarnings("unchecked") | ||
| Map<String, ?> serviceConfigMap = (Map<String, ?>) JsonParser.parse( | ||
| "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); | ||
| channelBuilder.defaultServiceConfig(serviceConfigMap); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| tester.testMcs(TestServiceGrpc.newStub(channelBuilder.build())); | ||
| break; | ||
| } | ||
| default: | ||
| throw new IllegalArgumentException("Unknown test case: " + testCase); | ||
| } | ||
|
|
@@ -596,6 +636,7 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor( | |
| } | ||
|
|
||
| private class Tester extends AbstractInteropTest { | ||
|
|
||
| @Override | ||
| protected ManagedChannelBuilder<?> createChannelBuilder() { | ||
| boolean useGeneric = false; | ||
|
|
@@ -979,31 +1020,16 @@ public void testOrcaOob() throws Exception { | |
| .build(); | ||
|
|
||
| final int retryLimit = 5; | ||
| BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); | ||
| final Object lastItem = new Object(); | ||
| StreamingOutputCallResponseObserver streamingOutputCallResponseObserver = | ||
| new StreamingOutputCallResponseObserver(); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver = | ||
| asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() { | ||
|
|
||
| @Override | ||
| public void onNext(StreamingOutputCallResponse value) { | ||
| queue.add(value); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| queue.add(t); | ||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| queue.add(lastItem); | ||
| } | ||
| }); | ||
| asyncStub.fullDuplexCall(streamingOutputCallResponseObserver); | ||
|
|
||
| streamObserver.onNext(StreamingOutputCallRequest.newBuilder() | ||
| .setOrcaOobReport(answer) | ||
| .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); | ||
| assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); | ||
| assertThat(streamingOutputCallResponseObserver.take()) | ||
| .isInstanceOf(StreamingOutputCallResponse.class); | ||
| int i = 0; | ||
| for (; i < retryLimit; i++) { | ||
| Thread.sleep(1000); | ||
|
|
@@ -1016,7 +1042,8 @@ public void onCompleted() { | |
| streamObserver.onNext(StreamingOutputCallRequest.newBuilder() | ||
| .setOrcaOobReport(answer2) | ||
| .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); | ||
| assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); | ||
| assertThat(streamingOutputCallResponseObserver.take()) | ||
| .isInstanceOf(StreamingOutputCallResponse.class); | ||
|
|
||
| for (i = 0; i < retryLimit; i++) { | ||
| Thread.sleep(1000); | ||
|
|
@@ -1027,7 +1054,7 @@ public void onCompleted() { | |
| } | ||
| assertThat(i).isLessThan(retryLimit); | ||
| streamObserver.onCompleted(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we need this? Otherwise we are orphaning the RPC.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| assertThat(queue.take()).isSameInstanceAs(lastItem); | ||
| assertThat(streamingOutputCallResponseObserver.verifiedCompleted()).isTrue(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -1054,6 +1081,84 @@ protected ServerBuilder<?> getHandshakerServerBuilder() { | |
| protected int operationTimeoutMillis() { | ||
| return 15000; | ||
| } | ||
|
|
||
| class StreamingOutputCallResponseObserver implements | ||
| StreamObserver<StreamingOutputCallResponse> { | ||
| private final Object lastItem = new Object(); | ||
| private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only reason this held
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preserved the existing approach to identify rpc completion. |
||
|
|
||
| @Override | ||
| public void onNext(StreamingOutputCallResponse value) { | ||
| queue.add(value); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| queue.add(t); | ||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| queue.add(lastItem); | ||
| } | ||
|
|
||
| Object take() throws InterruptedException { | ||
| return queue.take(); | ||
| } | ||
|
|
||
| boolean verifiedCompleted() throws InterruptedException { | ||
| return queue.take() == lastItem; | ||
| } | ||
| } | ||
|
|
||
| public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception { | ||
| StreamingOutputCallResponseObserver responseObserver1 = | ||
| new StreamingOutputCallResponseObserver(); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver1 = | ||
| asyncStub.fullDuplexCall(responseObserver1); | ||
| StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() | ||
| .addResponseParameters(ResponseParameters.newBuilder() | ||
| .setSendClientSocketAddressInResponse( | ||
| Messages.BoolValue.newBuilder().setValue(true).build()) | ||
| .build()) | ||
| .build(); | ||
| streamObserver1.onNext(request); | ||
| Object responseObj = responseObserver1.take(); | ||
| StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj; | ||
| String clientSocketAddressInCall1 = callResponse.getClientSocketAddress(); | ||
| assertThat(clientSocketAddressInCall1).isNotEmpty(); | ||
|
|
||
| StreamingOutputCallResponseObserver responseObserver2 = | ||
| new StreamingOutputCallResponseObserver(); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver2 = | ||
| asyncStub.fullDuplexCall(responseObserver2); | ||
| streamObserver2.onNext(request); | ||
| callResponse = (StreamingOutputCallResponse) responseObserver2.take(); | ||
| String clientSocketAddressInCall2 = callResponse.getClientSocketAddress(); | ||
|
|
||
| assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2); | ||
|
|
||
| // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new | ||
| // connection to be created in the same subchannel and not get queued. | ||
| StreamingOutputCallResponseObserver responseObserver3 = | ||
| new StreamingOutputCallResponseObserver(); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver3 = | ||
| asyncStub.fullDuplexCall(responseObserver3); | ||
| streamObserver3.onNext(request); | ||
| callResponse = (StreamingOutputCallResponse) responseObserver3.take(); | ||
| String clientSocketAddressInCall3 = callResponse.getClientSocketAddress(); | ||
|
|
||
| // This assertion is currently failing because connection scaling when MCS limit has been | ||
| // reached is not yet implemented in gRPC Java. | ||
| assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1); | ||
|
|
||
| streamObserver1.onCompleted(); | ||
| assertThat(responseObserver1.verifiedCompleted()).isTrue(); | ||
| streamObserver2.onCompleted(); | ||
| assertThat(responseObserver2.verifiedCompleted()).isTrue(); | ||
| streamObserver3.onCompleted(); | ||
| assertThat(responseObserver3.verifiedCompleted()).isTrue(); | ||
| } | ||
| } | ||
|
|
||
| private static String validTestCasesHelpText() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,6 +75,7 @@ public void run() { | |
| private int port = 8080; | ||
| private boolean useTls = true; | ||
| private boolean useAlts = false; | ||
| private boolean setMcsLimit = false; | ||
|
|
||
| private ScheduledExecutorService executor; | ||
| private Server server; | ||
|
|
@@ -118,6 +119,10 @@ void parseArgs(String[] args) { | |
| usage = true; | ||
| break; | ||
| } | ||
| } else if ("set_max_concurrent_streams_limit".equals(key)) { | ||
| setMcsLimit = Boolean.parseBoolean(value); | ||
| // TODO: Make Netty server builder usable for IPV6 as well (not limited to MCS handling) | ||
| addressType = Util.AddressType.IPV4; // To use NettyServerBuilder | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's unfortunate. We really do want to support IPv6; IPv6-only is increasingly common. Also, it'd be good to stop hard-coding Netty for IPv4/IPv6. When this was done we only had the Netty server, but now we have the OkHttp server as well. But it also looks pretty hard to fix in a "cleaner" way.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a TODO to fix Netty server builder usage to work with either IPv4/6. |
||
| } else { | ||
| System.err.println("Unknown argument: " + key); | ||
| usage = true; | ||
|
|
@@ -141,6 +146,8 @@ void parseArgs(String[] args) { | |
| + "\n for testing. Only effective when --use_alts=true." | ||
| + "\n --address_type=IPV4|IPV6|IPV4_IPV6" | ||
| + "\n What type of addresses to listen on. Default IPV4_IPV6" | ||
| + "\n --set_max_concurrent_streams_limit" | ||
| + "\n Whether to set the maximum concurrent streams limit" | ||
| ); | ||
| System.exit(1); | ||
| } | ||
|
|
@@ -186,6 +193,9 @@ void start() throws Exception { | |
| if (v4Address != null && !v4Address.equals(localV4Address)) { | ||
| ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); | ||
| } | ||
| if (setMcsLimit) { | ||
| ((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2); | ||
| } | ||
| break; | ||
| case IPV6: | ||
| List<SocketAddress> v6Addresses = Util.getV6Addresses(port); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's a new test, then that should be defined in https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do in the PR on the core repo.