diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index b576ac507b875..1cb50747479d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -53,7 +53,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; -import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.collector.TabletCollector; @@ -69,7 +68,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; +import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -159,18 +159,11 @@ public PipeInsertNodeTabletInsertionEvent( this.allocatedMemoryBlock = new AtomicReference<>(); } + @Nonnull public InsertNode getInsertNode() { return insertNode; } - public ByteBuffer getByteBuffer() throws WALPipeException { - final InsertNode node = insertNode; - if (Objects.isNull(node)) { - throw new PipeException("InsertNode has been released"); - } - return node.serializeToByteBuffer(); - } - public String getDeviceId() { final InsertNode node = insertNode; if (Objects.isNull(node)) { @@ -399,9 +392,6 @@ public boolean mayEventTimeOverlappedWithTimeRange() { public boolean mayEventPathsOverlappedWithPattern() { try { final InsertNode insertNode = getInsertNode(); - if (Objects.isNull(insertNode)) { - return true; - } if (insertNode instanceof RelationalInsertRowNode || insertNode instanceof RelationalInsertTabletNode diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index 0db905a6dc086..065ad3be8401f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -51,12 +51,10 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { - private final List binaryBuffers = new ArrayList<>(); private final List insertNodeBuffers = new ArrayList<>(); private final List tabletBuffers = new ArrayList<>(); private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null; - private final List binaryDataBases = new ArrayList<>(); private final List insertNodeDataBases = new ArrayList<>(); private final List tabletDataBases = new ArrayList<>(); @@ -90,11 +88,9 @@ protected boolean constructBatch(final TabletInsertionEvent event) throws IOExce public synchronized void onSuccess() { super.onSuccess(); - binaryBuffers.clear(); insertNodeBuffers.clear(); tabletBuffers.clear(); - binaryDataBases.clear(); insertNodeDataBases.clear(); tabletDataBases.clear(); tableModelTabletMap.clear(); @@ -142,12 +138,7 @@ public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException { tableModelTabletMap.clear(); return PipeTransferTabletBatchReqV2.toTPipeTransferReq( - binaryBuffers, - insertNodeBuffers, - tabletBuffers, - binaryDataBases, - insertNodeDataBases, - tabletDataBases); + insertNodeBuffers, tabletBuffers, insertNodeDataBases, tabletDataBases); } public Map, Long> deepCopyPipeName2BytesAccumulated() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index 48bd1016763e2..8d75e9864bb52 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -45,7 +45,6 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { - private final transient List binaryReqs = new ArrayList<>(); private final transient List insertNodeReqs = new ArrayList<>(); private final transient List tabletReqs = new ArrayList<>(); @@ -61,26 +60,6 @@ public Pair constructStatement final List insertRowStatementList = new ArrayList<>(); final List insertTabletStatementList = new ArrayList<>(); - for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) { - final InsertBaseStatement statement = binaryReq.constructStatement(); - if (statement.isEmpty()) { - continue; - } - if (statement instanceof InsertRowStatement) { - insertRowStatementList.add((InsertRowStatement) statement); - } else if (statement instanceof InsertTabletStatement) { - insertTabletStatementList.add((InsertTabletStatement) statement); - } else if (statement instanceof InsertRowsStatement) { - insertRowStatementList.addAll( - ((InsertRowsStatement) statement).getInsertRowStatementList()); - } else { - throw new UnsupportedOperationException( - String.format( - "unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.", - binaryReq)); - } - } - for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) { final InsertBaseStatement statement = insertNodeReq.constructStatement(); if (statement.isEmpty()) { @@ -117,24 +96,19 @@ public Pair constructStatement /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTabletBatchReq toTPipeTransferReq( - final List binaryBuffers, - final List insertNodeBuffers, - final List tabletBuffers) + final List insertNodeBuffers, final List tabletBuffers) throws IOException { final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq(); - // batchReq.binaryReqs, batchReq.insertNodeReqs, batchReq.tabletReqs are empty + // batchReq.insertNodeReqs, batchReq.tabletReqs are empty // when this method is called from PipeTransferTabletBatchReqBuilder.toTPipeTransferReq() batchReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - ReadWriteIOUtils.write(binaryBuffers.size(), outputStream); - for (final ByteBuffer binaryBuffer : binaryBuffers) { - ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream); - outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit()); - } + // Binary buffer, for rolling upgrade + ReadWriteIOUtils.write(0, outputStream); ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream); for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) { @@ -157,16 +131,9 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( final TPipeTransferReq transferReq) { final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq(); + // Binary size, for rolling upgrade + ReadWriteIOUtils.readInt(transferReq.body); int size = ReadWriteIOUtils.readInt(transferReq.body); - for (int i = 0; i < size; ++i) { - final int length = ReadWriteIOUtils.readInt(transferReq.body); - final byte[] body = new byte[length]; - transferReq.body.get(body); - batchReq.binaryReqs.add( - PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body))); - } - - size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { batchReq.insertNodeReqs.add( PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq( @@ -188,11 +155,6 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( /////////////////////////////// TestOnly /////////////////////////////// - @TestOnly - public List getBinaryReqs() { - return binaryReqs; - } - @TestOnly public List getInsertNodeReqs() { return insertNodeReqs; @@ -214,8 +176,7 @@ public boolean equals(final Object obj) { return false; } final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj; - return binaryReqs.equals(that.binaryReqs) - && insertNodeReqs.equals(that.insertNodeReqs) + return insertNodeReqs.equals(that.insertNodeReqs) && tabletReqs.equals(that.tabletReqs) && version == that.version && type == that.type @@ -224,6 +185,6 @@ public boolean equals(final Object obj) { @Override public int hashCode() { - return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body); + return Objects.hash(insertNodeReqs, tabletReqs, version, type, body); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java index c136ffbe7d3e3..f626d496b5563 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java @@ -44,8 +44,6 @@ import java.util.Objects; public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq { - - private final transient List binaryReqs = new ArrayList<>(); private final transient List insertNodeReqs = new ArrayList<>(); private final transient List tabletReqs = new ArrayList<>(); @@ -66,45 +64,6 @@ public List constructStatements() { final Map> tableModelDatabaseInsertRowStatementMap = new HashMap<>(); - for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) { - final InsertBaseStatement statement = binaryReq.constructStatement(); - if (statement.isEmpty()) { - continue; - } - if (statement.isWriteToTable()) { - if (statement instanceof InsertRowStatement) { - tableModelDatabaseInsertRowStatementMap - .computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>()) - .add((InsertRowStatement) statement); - } else if (statement instanceof InsertTabletStatement) { - statements.add(statement); - } else if (statement instanceof InsertRowsStatement) { - tableModelDatabaseInsertRowStatementMap - .computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>()) - .addAll(((InsertRowsStatement) statement).getInsertRowStatementList()); - } else { - throw new UnsupportedOperationException( - String.format( - "unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.", - binaryReq)); - } - continue; - } - if (statement instanceof InsertRowStatement) { - insertRowStatementList.add((InsertRowStatement) statement); - } else if (statement instanceof InsertTabletStatement) { - insertTabletStatementList.add((InsertTabletStatement) statement); - } else if (statement instanceof InsertRowsStatement) { - insertRowStatementList.addAll( - ((InsertRowsStatement) statement).getInsertRowStatementList()); - } else { - throw new UnsupportedOperationException( - String.format( - "unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.", - binaryReq)); - } - } - for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq : insertNodeReqs) { final InsertBaseStatement statement = insertNodeReq.constructStatement(); if (statement.isEmpty()) { @@ -180,10 +139,8 @@ public List constructStatements() { /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTabletBatchReqV2 toTPipeTransferReq( - final List binaryBuffers, final List insertNodeBuffers, final List tabletBuffers, - final List binaryDataBases, final List insertNodeDataBases, final List tabletDataBases) throws IOException { @@ -193,13 +150,8 @@ public static PipeTransferTabletBatchReqV2 toTPipeTransferReq( batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - ReadWriteIOUtils.write(binaryBuffers.size(), outputStream); - for (int i = 0; i < binaryBuffers.size(); i++) { - final ByteBuffer binaryBuffer = binaryBuffers.get(i); - ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream); - outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit()); - ReadWriteIOUtils.write(binaryDataBases.get(i), outputStream); - } + // Binary buffer, for rolling upgrade + ReadWriteIOUtils.write(0, outputStream); ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream); for (int i = 0; i < insertNodeBuffers.size(); i++) { @@ -226,17 +178,10 @@ public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq( final org.apache.iotdb.service.rpc.thrift.TPipeTransferReq transferReq) { final PipeTransferTabletBatchReqV2 batchReq = new PipeTransferTabletBatchReqV2(); - int size = ReadWriteIOUtils.readInt(transferReq.body); - for (int i = 0; i < size; ++i) { - final int length = ReadWriteIOUtils.readInt(transferReq.body); - final byte[] body = new byte[length]; - transferReq.body.get(body); - batchReq.binaryReqs.add( - PipeTransferTabletBinaryReqV2.toTPipeTransferBinaryReq( - ByteBuffer.wrap(body), ReadWriteIOUtils.readString(transferReq.body))); - } + // Binary req, for rolling upgrade + ReadWriteIOUtils.readInt(transferReq.body); - size = ReadWriteIOUtils.readInt(transferReq.body); + int size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { batchReq.insertNodeReqs.add( PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq( @@ -258,11 +203,6 @@ public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq( /////////////////////////////// TestOnly /////////////////////////////// - @TestOnly - public List getBinaryReqs() { - return binaryReqs; - } - @TestOnly public List getInsertNodeReqs() { return insertNodeReqs; @@ -284,8 +224,7 @@ public boolean equals(final Object obj) { return false; } final PipeTransferTabletBatchReqV2 that = (PipeTransferTabletBatchReqV2) obj; - return Objects.equals(binaryReqs, that.binaryReqs) - && Objects.equals(insertNodeReqs, that.insertNodeReqs) + return Objects.equals(insertNodeReqs, that.insertNodeReqs) && Objects.equals(tabletReqs, that.tabletReqs) && version == that.version && type == that.type @@ -294,6 +233,6 @@ public boolean equals(final Object obj) { @Override public int hashCode() { - return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body); + return Objects.hash(insertNodeReqs, tabletReqs, version, type, body); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 5bee20c4dc0c8..622f4e4f0cd19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; -import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq; @@ -234,20 +233,14 @@ private void doTransferWrapper( private void doTransfer( final AirGapSocket socket, final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) - throws PipeException, WALPipeException, IOException { + throws PipeException, IOException { final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); final byte[] bytes = - Objects.isNull(insertNode) - ? PipeTransferTabletBinaryReqV2.toTPipeTransferBytes( - pipeInsertNodeTabletInsertionEvent.getByteBuffer(), - pipeInsertNodeTabletInsertionEvent.isTableModelEvent() - ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : null) - : PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes( - insertNode, - pipeInsertNodeTabletInsertionEvent.isTableModelEvent() - ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : null); + PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes( + insertNode, + pipeInsertNodeTabletInsertionEvent.isTableModelEvent() + ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() + : null); if (!send( pipeInsertNodeTabletInsertionEvent.getPipeName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java index 7cdeef9e826ac..6912290bb2bbb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java @@ -51,7 +51,6 @@ import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2TsFileInsertionEventHandler; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2AsyncBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; -import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.pipe.api.annotation.TableModel; @@ -70,7 +69,6 @@ import java.io.IOException; import java.util.Comparator; import java.util.Iterator; -import java.util.Objects; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -324,15 +322,8 @@ private boolean transferInEventWithoutCheck(PipeInsertionEvent tabletInsertionEv final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); final ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex(); final TIoTConsensusV2TransferReq iotConsensusV2TransferReq = - Objects.isNull(insertNode) - ? IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq( - pipeInsertNodeTabletInsertionEvent.getByteBuffer(), - tCommitId, - tConsensusGroupId, - progressIndex, - thisDataNodeId) - : IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( - insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId); + IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( + insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId); final IoTConsensusV2TabletInsertNodeEventHandler iotConsensusV2InsertNodeReqHandler = new IoTConsensusV2TabletInsertNodeEventHandler( pipeInsertNodeTabletInsertionEvent, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java index 5866a5ceeaa6c..e6fe44b346742 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java @@ -40,7 +40,6 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; -import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq; @@ -326,21 +325,10 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex(); - if (insertNode != null) { - resp = - syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer( - IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( - insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId)); - } else { - resp = - syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer( - IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq( - pipeInsertNodeTabletInsertionEvent.getByteBuffer(), - tCommitId, - tConsensusGroupId, - progressIndex, - thisDataNodeId)); - } + resp = + syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer( + IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( + insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId)); } catch (final Exception e) { throw new PipeRuntimeSinkRetryTimesConfigurableException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java index 4e0cb17f27047..20ba2e0552e47 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java @@ -28,7 +28,6 @@ import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBatchReq; -import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; @@ -201,17 +200,10 @@ protected int buildTabletInsertionBuffer(TabletInsertionEvent event) throws WALP final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); // IoTConsensusV2 will transfer binary data to TIoTConsensusV2TransferReq final ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex(); - if (Objects.isNull(insertNode)) { - buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer(); - batchReqs.add( - IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq( - buffer, commitId, consensusGroupId, progressIndex, thisDataNodeId)); - } else { - buffer = insertNode.serializeToByteBuffer(); - batchReqs.add( - IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( - insertNode, commitId, consensusGroupId, progressIndex, thisDataNodeId)); - } + buffer = insertNode.serializeToByteBuffer(); + batchReqs.add( + IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( + insertNode, commitId, consensusGroupId, progressIndex, thisDataNodeId)); return buffer.limit(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index f8d0b104096f1..1a4d7cf3863c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -40,7 +40,6 @@ import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder; -import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler; @@ -296,10 +295,7 @@ private boolean transferInEventWithoutCheck(final TabletInsertionEvent tabletIns : null; final TPipeTransferReq pipeTransferReq = compressIfNeeded( - Objects.isNull(insertNode) - ? PipeTransferTabletBinaryReqV2.toTPipeTransferReq( - pipeInsertNodeTabletInsertionEvent.getByteBuffer(), databaseName) - : PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, databaseName)); + PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, databaseName)); final PipeTransferTabletInsertNodeEventHandler pipeTransferInsertNodeReqHandler = new PipeTransferTabletInsertNodeEventHandler( pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 016f787afaa2a..d711c65b9e363 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -40,7 +40,6 @@ import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; -import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq; @@ -381,17 +380,11 @@ private void doTransfer( final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); final TPipeTransferReq req = compressIfNeeded( - insertNode != null - ? PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq( - insertNode, - pipeInsertNodeTabletInsertionEvent.isTableModelEvent() - ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : null) - : PipeTransferTabletBinaryReqV2.toTPipeTransferReq( - pipeInsertNodeTabletInsertionEvent.getByteBuffer(), - pipeInsertNodeTabletInsertionEvent.isTableModelEvent() - ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : null)); + PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq( + insertNode, + pipeInsertNodeTabletInsertionEvent.isTableModelEvent() + ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() + : null)); rateLimitIfNeeded( pipeInsertNodeTabletInsertionEvent.getPipeName(), pipeInsertNodeTabletInsertionEvent.getCreationTime(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 07b88a98053b2..b5dccb55c880a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; -import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2; import org.apache.iotdb.db.protocol.session.IClientSession; @@ -271,16 +270,9 @@ private void doTransfer( : TREE_MODEL_DATABASE_NAME_IDENTIFIER; final InsertBaseStatement insertBaseStatement; - if (Objects.isNull(insertNode)) { - insertBaseStatement = - PipeTransferTabletBinaryReqV2.toTPipeTransferReq( - pipeInsertNodeTabletInsertionEvent.getByteBuffer(), dataBaseName) - .constructStatement(); - } else { - insertBaseStatement = - PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, dataBaseName) - .constructStatement(); - } + insertBaseStatement = + PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, dataBaseName) + .constructStatement(); final TSStatus status = insertBaseStatement.isWriteToTable() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index 52007765fe323..52ac137ae4e6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -109,9 +109,9 @@ public boolean isClosed() { private void mayPrintExceedingLog() { final long remainingCapacity = ringBuffer.remainingCapacity(); final long bufferSize = ringBuffer.getBufferSize(); - if ((double) remainingCapacity / bufferSize >= 0.5 + if ((double) remainingCapacity / bufferSize <= 0.5 && System.currentTimeMillis() - - PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds() + - PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds() * 1000L >= lastLogTime) { LOGGER.warn( "The assigner queue content has exceeded half, it may be stuck and may block insertion. regionId: {}, capacity: {}, bufferSize: {}", diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 0cc4470882efd..d4af2135c95e0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -309,7 +309,6 @@ public void testPipeTransferTabletReqV2() { @Test public void testPipeTransferTabletBatchReq() throws IOException { - final List binaryBuffers = new ArrayList<>(); final List insertNodeBuffers = new ArrayList<>(); final List tabletBuffers = new ArrayList<>(); @@ -327,10 +326,6 @@ public void testPipeTransferTabletBatchReq() throws IOException { // InsertNode buffer insertNodeBuffers.add(node.serializeToByteBuffer()); - // Binary buffer - // Not do real test here since "serializeToWal" needs private inner class of walBuffer - binaryBuffers.add(ByteBuffer.wrap(new byte[] {'a', 'b'})); - // Raw buffer List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT32)); @@ -367,8 +362,7 @@ public void testPipeTransferTabletBatchReq() throws IOException { } final PipeTransferTabletBatchReq req = - PipeTransferTabletBatchReq.toTPipeTransferReq( - binaryBuffers, insertNodeBuffers, tabletBuffers); + PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers, tabletBuffers); final PipeTransferTabletBatchReq deserializedReq = PipeTransferTabletBatchReq.fromTPipeTransferReq(req); @@ -380,10 +374,8 @@ public void testPipeTransferTabletBatchReq() throws IOException { @Test public void testPipeTransferTabletBatchReqV2() throws IOException { - final List binaryBuffers = new ArrayList<>(); final List insertNodeBuffers = new ArrayList<>(); final List tabletBuffers = new ArrayList<>(); - final List binaryDataBase = new ArrayList<>(); final List insertDataBase = new ArrayList<>(); final List tabletDataBase = new ArrayList<>(); @@ -402,11 +394,6 @@ public void testPipeTransferTabletBatchReqV2() throws IOException { insertNodeBuffers.add(node.serializeToByteBuffer()); insertDataBase.add("test"); - // Binary buffer - // Not do real test here since "serializeToWal" needs private inner class of walBuffer - binaryBuffers.add(ByteBuffer.wrap(new byte[] {'a', 'b'})); - binaryDataBase.add("test"); - // Raw buffer List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT32)); @@ -445,23 +432,15 @@ public void testPipeTransferTabletBatchReqV2() throws IOException { final PipeTransferTabletBatchReqV2 req = PipeTransferTabletBatchReqV2.toTPipeTransferReq( - binaryBuffers, - insertNodeBuffers, - tabletBuffers, - binaryDataBase, - insertDataBase, - tabletDataBase); + insertNodeBuffers, tabletBuffers, insertDataBase, tabletDataBase); final PipeTransferTabletBatchReqV2 deserializedReq = PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req); - Assert.assertArrayEquals( - new byte[] {'a', 'b'}, deserializedReq.getBinaryReqs().get(0).getByteBuffer().array()); Assert.assertEquals(node, deserializedReq.getInsertNodeReqs().get(0).getInsertNode()); Assert.assertEquals(t, deserializedReq.getTabletReqs().get(0).getTablet()); Assert.assertTrue(deserializedReq.getTabletReqs().get(0).getIsAligned()); - Assert.assertEquals("test", deserializedReq.getBinaryReqs().get(0).getDataBaseName()); Assert.assertEquals("test", deserializedReq.getTabletReqs().get(0).getDataBaseName()); Assert.assertEquals("test", deserializedReq.getInsertNodeReqs().get(0).getDataBaseName()); }