RATIS-2529. Bound gRPC worker EventLoopGroup thread count.#1466
RATIS-2529. Bound gRPC worker EventLoopGroup thread count.#1466CRZbulabula wants to merge 9 commits into
Conversation
Add raft.grpc.server.worker.event-loop.threads and raft.grpc.client.worker.event-loop.threads (default 0 = gRPC default, i.e. availableProcessors * 2). When > 0, build a dedicated Epoll/Nio EventLoopGroup of that size and wire it into both the server NettyServerBuilders and the client NettyChannelBuilders so a follower catch-up burst can't permanently inflate the worker thread count.
| * a positive value caps the worker event-loop thread count. | ||
| */ | ||
| String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads"; | ||
| int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0; |
There was a problem hiding this comment.
we need some positive number here like 16/24/32 or some function from CPU cores number.
| * a positive value caps the worker event-loop thread count. | ||
| */ | ||
| String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads"; | ||
| int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0; |
There was a problem hiding this comment.
we need some positive number here like 16/24/32 or some function from CPU cores number.
yandrey321
left a comment
There was a problem hiding this comment.
Did you do any performance testing how thread pools size affect performance?
szetszwo
left a comment
There was a problem hiding this comment.
@CRZbulabula , thanks for working on this!
Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13082324/1466_review.patch
| * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2}); | ||
| * a positive value caps the worker event-loop thread count. | ||
| */ | ||
| String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads"; |
There was a problem hiding this comment.
In NettyConfigKeys, we have the following conf:
key: raft.netty.dataStream.client.use-epoll (boolean, default=true)
key: raft.netty.dataStream.client.worker-group.size (int, default=28)
Let's use similar conf for gRPC.
|
|
||
| EventLoopGroup getBossEventLoopGroup() { | ||
| if (workerEventLoopThreads > 0 && bossEventLoopGroup == null) { | ||
| bossEventLoopGroup = GrpcEventLoops.newEventLoopGroup(1, server.getId() + "-grpc-boss-ELG"); |
There was a problem hiding this comment.
Let's make bossEventLoopThreads configurable?
| private GrpcServicesImpl(Builder b) { | ||
| super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), b::newGrpcServerProtocolClient)); | ||
| super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), | ||
| peer -> b.newGrpcServerProtocolClient(peer, b.getWorkerEventLoopGroup()))); |
There was a problem hiding this comment.
We should create only one group for all clients.
| private final EventLoopGroup bossEventLoopGroup; | ||
| private final EventLoopGroup workerEventLoopGroup; |
There was a problem hiding this comment.
We also need a group for client. BTW, let's use shorter names.
private EventLoopGroup serverBosses;
private EventLoopGroup serverWorkers;
private EventLoopGroup clientWorkers;| GrpcEventLoops.shutdownGracefully(workerEventLoopGroup); | ||
| GrpcEventLoops.shutdownGracefully(bossEventLoopGroup); |
There was a problem hiding this comment.
We should call shutdownGracefully() for all groups and then await(..)
| * into {@link NettyServerBuilder} / {@link NettyChannelBuilder} consistently | ||
| * with the matching channel type. | ||
| */ | ||
| public final class GrpcEventLoops { |
There was a problem hiding this comment.
Let's move NettyUtils to ratis-common and then add the new methods there.
|
Thanks for the review. I have pushed commit 2088cdf addressing the comments:
For the performance question: I ran a manual MiniRaftClusterWithGrpc benchmark on my macOS arm64 host with JDK 21, using 500 warmup ops and 5000 measured async sends at concurrency 16, with 3 iterations per config. Median results for worker sizes 8/16/24/32 were about 290/285/291/313 ops/s, p50 53.4/56.0/54.3/49.8 ms, p99 75.6/75.4/71.2/77.0 ms, and 0 failures. This did not show a regression from using bounded/configurable worker groups; the gRPC worker group handles non-blocking I/O while request execution is still handled by raft.grpc.server.async.request.thread.pool.size. |
|
@CRZbulabula , thanks for the update! The change looks good. However, there are some test failures (attempted twice). I am not sure if it is related. It may be the cases that it uses too many threads in some tests and causing timeout. Let's try rerunning the failing tests one more time. |
szetszwo
left a comment
There was a problem hiding this comment.
@CRZbulabula , thanks for the update!
Thanks also fixing the other problems in Leader and other tests. Since they are unrelated to gRPC, could file one or more JIRAs for these changes?
Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13082486/1466_review2.patch
| private final int maxMessageSize; | ||
| private final TimeDuration requestTimeoutDuration; | ||
| private final TimeDuration watchRequestTimeoutDuration; | ||
| private final EventLoopGroup clientWorkers; |
There was a problem hiding this comment.
Let's use a MemoizedSupplier for lazy initialization.
There was a problem hiding this comment.
Done in 00bd24e. GrpcClientRpc now stores the client workers as a MemoizedSupplier<EventLoopGroup>, and GrpcClientProtocolClient obtains the group lazily when building a channel.
| public GrpcClientRpc(ClientId clientId, RaftProperties properties, | ||
| SslContext adminSslContext, SslContext clientSslContext) { | ||
| this(clientId, properties, adminSslContext, clientSslContext, newClientWorkers(clientId, properties)); | ||
| } |
There was a problem hiding this comment.
Let's change it to a static method:
public static GrpcClientRpc create(ClientId clientId, RaftProperties properties,
SslContext adminSslContext, SslContext clientSslContext) {
final MemoizedSupplier<EventLoopGroup> eventLoopGroup = MemoizedSupplier.valueOf(() -> NettyUtils.newEventLoopGroup(
clientId + "-client-workers",
GrpcConfigKeys.Client.workerGroupSize(properties),
GrpcConfigKeys.useEpoll(properties)));
return new GrpcClientRpc(clientId, properties, adminSslContext, clientSslContext, eventLoopGroup);
} private final MemoizedSupplier<EventLoopGroup> clientWorkers;
private GrpcClientRpc(ClientId clientId, RaftProperties properties,
SslContext adminSslContext, SslContext clientSslContext, MemoizedSupplier<EventLoopGroup> clientWorkers) {
super(new PeerProxyMap<>(clientId.toString(),
p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext, clientWorkers)));
this.clientWorkers = clientWorkers;
this.clientId = clientId;
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
}There was a problem hiding this comment.
Done in 00bd24e. I added GrpcClientRpc.create(...), made the constructor private, and updated GrpcFactory to use the factory method.
| try { | ||
| super.close(); | ||
| } finally { | ||
| NettyUtils.shutdownGracefully(clientWorkers); |
There was a problem hiding this comment.
Check if it is initialized:
if (clientWorkers.isInitialized()) {
NettyUtils.shutdownGracefully(clientWorkers.get());
}There was a problem hiding this comment.
Done in 00bd24e. close() now checks clientWorkers.isInitialized() before shutting down the shared client worker group.
| clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", | ||
| GrpcConfigKeys.Client.workerGroupSize(props), useEpoll); | ||
| return new GrpcServicesImpl(this); | ||
| } catch (RuntimeException | Error e) { |
There was a problem hiding this comment.
Done in 00bd24e. Builder#build() now catches Throwable, shuts down clientWorkers, serverWorkers, and serverBosses, then rethrows RuntimeException and Error directly.
| channelBuilder.negotiationType(NegotiationType.PLAINTEXT); | ||
| } | ||
| channelBuilder.disableRetry(); | ||
| if (eventLoopGroup != null) { |
There was a problem hiding this comment.
It won't be null. Let's add requireNonNull:
this.eventLoopGroup = Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null");There was a problem hiding this comment.
Done in 00bd24e. The constructor now uses Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null"), and buildChannel(...) always sets the channel type and event-loop group.
| } else { | ||
| channelBuilder.negotiationType(NegotiationType.PLAINTEXT); | ||
| } | ||
| if (eventLoopGroup != null) { |
There was a problem hiding this comment.
Similarly, it won't be null.
There was a problem hiding this comment.
Done in 00bd24e. GrpcStubPool.buildManagedChannel(...) now requires a non-null event-loop group and always wires it into the Netty channel builder.
|
Thanks for the review. I have pushed commit 00bd24e addressing the latest comments:
I also created the JIRAs for the unrelated fixes: What would you prefer as the next step for those unrelated fixes: keep them in this PR with the JIRA references, or split them into separate PRs? |
Summary
Fixes RATIS-2529: gRPC worker threads permanently inflate to
availableProcessors * 2after follower restart catch-up.raft.grpc.server.worker.event-loop.threadsandraft.grpc.client.worker.event-loop.threads(default0= current gRPC behavior).> 0, build a dedicatedEpollEventLoopGroup(orNioEventLoopGroupwhen Epoll is unavailable) of that size and wire it into both the serverNettyServerBuilders and the client / server-to-serverNettyChannelBuilders.NettyServerBuilders and theGrpcServerProtocolClientinstances managed by oneGrpcServicesImpl; both groups are shut down incloseImpl().This lets operators cap the worker thread count (e.g. 4–8) so a follower catch-up burst can't permanently expand the shared gRPC default
EventLoopGroup(which never shrinks its threads once started).Test plan
TestGrpcEventLoopscovers the helper (thread count, channel-type detection, config-key roundtrip, null shutdown).TestGrpcWorkerEventLoopThreadsbrings up a 3-nodeMiniRaftClusterWithGrpcwith capped server (2) and client (1) worker threads and asserts a client request succeeds.TestCustomGrpcServices,TestGrpcFactory,TestLeaderInstallSnapshotWithGrpc,TestLinearizableReadWithGrpc,TestGroupInfoWithGrpcall pass.