RATIS-1240. Add input stream to DataStreamApi for read operations in Server#1469
Conversation
szetszwo
left a comment
There was a problem hiding this comment.
@peterxcli , thanks a lot working on stream read! It is going to be very useful.
This change is very big and quite involved. In particular, it defines several new public APIs. We need to careful design the APIs.
Could you split this into two or three subtasks? One usual way is to change the server first and then the client.
| private CompletableFuture<Void> submitReadOnlyRequest(DataStreamRequestByteBuf request, | ||
| RaftClientRequest raftClientRequest, ChannelHandlerContext ctx) { | ||
| try { | ||
| final StateMachine.ReadOnlyDataStream readOnlyDataStream = new StateMachine.ReadOnlyDataStream() { |
There was a problem hiding this comment.
We should reuse StateMachine.DataChannel which is a Java WritableByteChannel. Then, we can use other Java API such as FileChannel.transferTo (which is a highly efficient, zero-copy data transfer operation).
final StateMachine.DataChannel readOnlyDataStream = new StateMachine.DataChannel() {
private long streamOffset;
private boolean closed = false;
@Override
public synchronized boolean isOpen() {
return !closed;
}
@Override
public synchronized void close() {
closed = true;
}
@Override
public synchronized void force(boolean metadata) throws AlreadyClosedException{
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
}
ctx.flush();
}
@Override
public synchronized int write(ByteBuffer buffer) throws IOException {
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
}
final DataStreamReplyByteBuffer reply = newDataStreamReadOnlyReplyByteBuffer(request, streamOffset, buffer);
final int length = buffer.remaining();
final ChannelFuture future = ctx.write(reply);
try {
future.await();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while writing " + length
+ " bytes at offset " + streamOffset);
}
streamOffset += length;
return length;
}
};
Thanks for the review! Sure, I will split this into server and client patches! |
Co-authored-by: Tsz-Wo Nicholas Sze <szetszwo@apache.org> Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
|
@szetszwo thanks for the review, I address the data channel reuse suggestion and only keep server side code in this PR. Please take another look, thanks! |
|
@szetszwo I integrated the Ratis stream read API into Ozone as a POC, and the results are promising. The no-MD5 sequential reads comparison:
The no-MD5 random reads comparison:
ozone poc: peterxcli/ozone#16 |
szetszwo
left a comment
There was a problem hiding this comment.
@peterxcli , thanks for working on this! Please see the comments inlined and also linearizable.
| final Throwable cause = future.cause(); | ||
| if (cause instanceof IOException) { | ||
| throw (IOException) cause; | ||
| } |
There was a problem hiding this comment.
Let's create a new IOException in both cases since it can add the offset to the message.
| default CompletableFuture<RaftClientReply> streamReadOnlyAsync( | ||
| RaftClientRequest request, StateMachine.DataChannel stream) throws IOException { | ||
| throw new UnsupportedOperationException("This method is NOT supported."); | ||
| } |
There was a problem hiding this comment.
This new method seems not needed since we may:
- Phase 1: Directly call DataApi.streamReadOnly(..) and ignore all linearizable checks.
- Phase 2: Reuse RaftClientAsynchronousProtocol.submitClientRequestAsync(..) to submit a dummy read request for linearizable checks and then call DataApi.streamReadOnly(..).
Of course, we should start with Phase 1 for simpilcity.
| * @param stream the output stream for response data chunks | ||
| * @return a future for the terminal reply message | ||
| */ | ||
| default CompletableFuture<Message> streamReadOnly(RaftClientRequest request, DataChannel stream) { |
There was a problem hiding this comment.
This method should be similar to query(Message):
/**
* Similar to {@link #query(Message)} except below:
* - In {@link #query(Message)}, the state machine returns the result in a future.
* - In this method, the state machine sends the result using the given stream.
*
* @param request the client request
* @param stream the output stream to send the result. The state machine must close the stream at the end.
*/
default void query(Message request, DataChannel stream) {
}
What changes were proposed in this pull request?
New Server and StateMachine API:
streamReadOnlyAsync(RaftClientRequest, StateMachine.DataChannel)to theRaftServerinterface, allowing clients to submit read-only requests whose responses are streamed via the data stream RPC.streamReadOnly(RaftClientRequest, DataChannel)to theStateMachine.Datainterface, enabling state machines to implement custom streaming logic for read-only queries.Server Implementation:
DataStreamManagement, including request parsing, channel management, and response streaming.executeStreamReadOnlyAsynctoRaftServerImplto coordinate read-only stream execution and integrate with the read index logic.What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/RATIS-1086
How was this patch tested?
(Please explain how this patch was tested. Ex: unit tests, manual tests)
(If this patch involves UI changes, please attach a screen-shot; otherwise, remove this)