Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);


public static class KafkaHeaderAndRequest {

private static final String DEFAULT_CLIENT_HOST = "";
Expand All @@ -606,6 +605,8 @@ public static class KafkaHeaderAndRequest {
private final ByteBuf buffer;
private final SocketAddress remoteAddress;

private final AtomicBoolean released = new AtomicBoolean();

public KafkaHeaderAndRequest(RequestHeader header,
AbstractRequest request,
ByteBuf buffer,
Expand All @@ -617,6 +618,9 @@ public KafkaHeaderAndRequest(RequestHeader header,
}

public ByteBuf getBuffer() {
if (released.get()) {
throw new IllegalStateException("Already released");
}
return buffer;
}

Expand Down Expand Up @@ -650,7 +654,16 @@ public String toString() {
this.header, this.request, this.remoteAddress);
}

public void bufferReleased() {
if (!released.compareAndSet(false, true)) {
throw new IllegalStateException("Already released");
}
}

public void close() {
if (!released.compareAndSet(false, true)) {
return;
}
ReferenceCountUtil.safeRelease(this.buffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -41,7 +42,7 @@ public PendingRequest(final ApiKeys apiKeys,
this.responseConsumerHandler = responseConsumerHandler;
}

public ByteBuffer serialize() {
public ByteBuf serialize() {
return KopResponseUtils.serializeRequest(requestHeader, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.streamnative.pulsar.handlers.kop.security.PlainSaslServer;
Expand Down Expand Up @@ -73,7 +72,7 @@ public TransactionMarkerChannelHandler(
private void enqueueRequest(ChannelHandlerContext channel, PendingRequest pendingRequest) {
final long correlationId = pendingRequest.getCorrelationId();
pendingRequestMap.put(correlationId, pendingRequest);
channel.writeAndFlush(Unpooled.wrappedBuffer(pendingRequest.serialize())).addListener(writeFuture -> {
channel.writeAndFlush(pendingRequest.serialize()).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
pendingRequest.completeExceptionally(writeFuture.cause());
pendingRequestMap.remove(correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.ObjectSerializationCache;

/**
* Provide util classes to access protected fields in kafka structures.
Expand All @@ -34,12 +37,34 @@ public class KopResponseUtils {
public static ByteBuf serializeResponse(short version,
ResponseHeader responseHeader,
AbstractResponse response) {
return Unpooled.wrappedBuffer(response.serializeWithHeader(responseHeader, version));
return serializeWithHeader(response, responseHeader, version);
}

public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
return RequestUtils.serialize(requestHeader.data(), requestHeader.headerVersion(),
private static ByteBuf serializeWithHeader(AbstractResponse response, ResponseHeader header, short version) {
return serialize(header.data(), header.headerVersion(), response.data(), version);
}

public static ByteBuf serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
return serialize(requestHeader.data(), requestHeader.headerVersion(),
request.data(), request.version());
}

public static ByteBuf serialize(
Message header,
short headerVersion,
Message apiMessage,
short apiVersion
) {
ObjectSerializationCache cache = new ObjectSerializationCache();

int headerSize = header.size(cache, headerVersion);
int messageSize = apiMessage.size(cache, apiVersion);
ByteBuffer result = ByteBuffer.allocate(headerSize + messageSize);
ByteBufferAccessor writable = new ByteBufferAccessor(result);
header.write(writable, cache, headerVersion);
apiMessage.write(writable, cache, apiVersion);
result.flip();
return Unpooled.wrappedBuffer(result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static org.testng.Assert.assertEquals;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
Expand Down Expand Up @@ -108,22 +109,24 @@ public static ListOffsetsResponseData.ListOffsetsPartitionResponse getListOffset
return listOffsetsPartitionResponse;
}


public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder,
SocketAddress serviceAddress) {
AbstractRequest request = builder.build(builder.apiKey().latestVersion());
SocketAddress serviceAddress) {
return buildRequest(builder, serviceAddress, builder.latestAllowedVersion());
}
public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder,
SocketAddress serviceAddress, short version) {
AbstractRequest request = builder.build(version);
assertEquals(version, request.version());
RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233);


ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(mockHeader, request);

ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest);

RequestHeader header = RequestHeader.parse(serializedRequest);
ByteBuf byteBuf = KopResponseUtils.serializeRequest(mockHeader, request);
ByteBuffer byteBuffer = byteBuf.nioBuffer();
RequestHeader header = RequestHeader.parse(byteBuffer);

ApiKeys apiKey = header.apiKey();
short apiVersion = header.apiVersion();
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, serializedRequest).request;
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuffer).request;
return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,16 @@ public void testByteBufToRequest() {
correlationId);

// 1. serialize request into ByteBuf
ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);
int size = serializedRequest.remaining();
ByteBuf inputBuf = Unpooled.buffer(size);
inputBuf.writeBytes(serializedRequest);
ByteBuf serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);

// 2. turn Bytebuf into KafkaHeaderAndRequest.
KafkaHeaderAndRequest request = handler.byteBufToRequest(inputBuf, null);
KafkaHeaderAndRequest request = handler.byteBufToRequest(serializedRequest, null);

// 3. verify byteBufToRequest works well.
assertEquals(request.getHeader().data(), header.data());
assertTrue(request.getRequest() instanceof ApiVersionsRequest);

request.close();
}


Expand Down