diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index d1d8618278..548ff7c159 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -596,7 +596,6 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); - public static class KafkaHeaderAndRequest { private static final String DEFAULT_CLIENT_HOST = ""; @@ -606,6 +605,8 @@ public static class KafkaHeaderAndRequest { private final ByteBuf buffer; private final SocketAddress remoteAddress; + private final AtomicBoolean released = new AtomicBoolean(); + public KafkaHeaderAndRequest(RequestHeader header, AbstractRequest request, ByteBuf buffer, @@ -617,6 +618,9 @@ public KafkaHeaderAndRequest(RequestHeader header, } public ByteBuf getBuffer() { + if (released.get()) { + throw new IllegalStateException("Already released"); + } return buffer; } @@ -650,7 +654,16 @@ public String toString() { this.header, this.request, this.remoteAddress); } + public void bufferReleased() { + if (!released.compareAndSet(false, true)) { + throw new IllegalStateException("Already released"); + } + } + public void close() { + if (!released.compareAndSet(false, true)) { + return; + } ReferenceCountUtil.safeRelease(this.buffer); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java index c7e5ef6d60..d718d9424c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.coordinator.transaction; +import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -41,7 +42,7 @@ public PendingRequest(final ApiKeys apiKeys, this.responseConsumerHandler = responseConsumerHandler; } - public ByteBuffer serialize() { + public ByteBuf serialize() { return KopResponseUtils.serializeRequest(requestHeader, request); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java index 39a75c5af6..4ed3156755 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java @@ -17,7 +17,6 @@ import static org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.streamnative.pulsar.handlers.kop.security.PlainSaslServer; @@ -73,7 +72,7 @@ public TransactionMarkerChannelHandler( private void enqueueRequest(ChannelHandlerContext channel, PendingRequest pendingRequest) { final long correlationId = pendingRequest.getCorrelationId(); pendingRequestMap.put(correlationId, pendingRequest); - channel.writeAndFlush(Unpooled.wrappedBuffer(pendingRequest.serialize())).addListener(writeFuture -> { + channel.writeAndFlush(pendingRequest.serialize()).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { pendingRequest.completeExceptionally(writeFuture.cause()); pendingRequestMap.remove(correlationId); diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java index aa9de7a32c..9b65c5fe49 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java @@ -17,6 +17,9 @@ import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.common.protocol.ObjectSerializationCache; /** * Provide util classes to access protected fields in kafka structures. @@ -34,12 +37,34 @@ public class KopResponseUtils { public static ByteBuf serializeResponse(short version, ResponseHeader responseHeader, AbstractResponse response) { - return Unpooled.wrappedBuffer(response.serializeWithHeader(responseHeader, version)); + return serializeWithHeader(response, responseHeader, version); } - public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractRequest request) { - return RequestUtils.serialize(requestHeader.data(), requestHeader.headerVersion(), + private static ByteBuf serializeWithHeader(AbstractResponse response, ResponseHeader header, short version) { + return serialize(header.data(), header.headerVersion(), response.data(), version); + } + + public static ByteBuf serializeRequest(RequestHeader requestHeader, AbstractRequest request) { + return serialize(requestHeader.data(), requestHeader.headerVersion(), request.data(), request.version()); } + public static ByteBuf serialize( + Message header, + short headerVersion, + Message apiMessage, + short apiVersion + ) { + ObjectSerializationCache cache = new ObjectSerializationCache(); + + int headerSize = header.size(cache, headerVersion); + int messageSize = apiMessage.size(cache, apiVersion); + ByteBuffer result = ByteBuffer.allocate(headerSize + messageSize); + ByteBufferAccessor writable = new ByteBufferAccessor(result); + header.write(writable, cache, headerVersion); + apiMessage.write(writable, cache, apiVersion); + result.flip(); + return Unpooled.wrappedBuffer(result); + } + } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java index 1a28d88c3e..6ddaacbe49 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java @@ -13,8 +13,9 @@ */ package io.streamnative.pulsar.handlers.kop; +import static org.testng.Assert.assertEquals; + import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Collections; @@ -108,22 +109,24 @@ public static ListOffsetsResponseData.ListOffsetsPartitionResponse getListOffset return listOffsetsPartitionResponse; } - public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, - SocketAddress serviceAddress) { - AbstractRequest request = builder.build(builder.apiKey().latestVersion()); + SocketAddress serviceAddress) { + return buildRequest(builder, serviceAddress, builder.latestAllowedVersion()); + } + public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, + SocketAddress serviceAddress, short version) { + AbstractRequest request = builder.build(version); + assertEquals(version, request.version()); RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233); - ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(mockHeader, request); - - ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest); - - RequestHeader header = RequestHeader.parse(serializedRequest); + ByteBuf byteBuf = KopResponseUtils.serializeRequest(mockHeader, request); + ByteBuffer byteBuffer = byteBuf.nioBuffer(); + RequestHeader header = RequestHeader.parse(byteBuffer); ApiKeys apiKey = header.apiKey(); short apiVersion = header.apiVersion(); - AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, serializedRequest).request; + AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuffer).request; return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 134f46c548..a41e50cbb8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -188,17 +188,16 @@ public void testByteBufToRequest() { correlationId); // 1. serialize request into ByteBuf - ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest); - int size = serializedRequest.remaining(); - ByteBuf inputBuf = Unpooled.buffer(size); - inputBuf.writeBytes(serializedRequest); + ByteBuf serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest); // 2. turn Bytebuf into KafkaHeaderAndRequest. - KafkaHeaderAndRequest request = handler.byteBufToRequest(inputBuf, null); + KafkaHeaderAndRequest request = handler.byteBufToRequest(serializedRequest, null); // 3. verify byteBufToRequest works well. assertEquals(request.getHeader().data(), header.data()); assertTrue(request.getRequest() instanceof ApiVersionsRequest); + + request.close(); }