Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.collector.TabletCollector;
Expand All @@ -69,7 +68,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -159,18 +159,11 @@
this.allocatedMemoryBlock = new AtomicReference<>();
}

@Nonnull
public InsertNode getInsertNode() {
return insertNode;
}

public ByteBuffer getByteBuffer() throws WALPipeException {
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
throw new PipeException("InsertNode has been released");
}
return node.serializeToByteBuffer();
}

public String getDeviceId() {
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
Expand Down Expand Up @@ -355,7 +348,7 @@
public boolean mayEventTimeOverlappedWithTimeRange() {
try {
final InsertNode insertNode = getInsertNode();
if (Objects.isNull(insertNode)) {

Check warning on line 351 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Change this condition so that it does not always evaluate to "false"

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0fVBw8oNdODzBrdpcs&open=AZ0fVBw8oNdODzBrdpcs&pullRequest=17341
return true;
}

Expand Down Expand Up @@ -399,9 +392,6 @@
public boolean mayEventPathsOverlappedWithPattern() {
try {
final InsertNode insertNode = getInsertNode();
if (Objects.isNull(insertNode)) {
return true;
}

if (insertNode instanceof RelationalInsertRowNode
|| insertNode instanceof RelationalInsertTabletNode
Expand Down Expand Up @@ -503,7 +493,7 @@

eventParsers = new ArrayList<>();
final InsertNode node = getInsertNode();
if (Objects.isNull(node)) {

Check warning on line 496 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Change this condition so that it does not always evaluate to "false"

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0fVBw8oNdODzBrdpct&open=AZ0fVBw8oNdODzBrdpct&pullRequest=17341
throw new PipeException("InsertNode has been released");
}
switch (node.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@

public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {

private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();

private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
private final List<String> binaryDataBases = new ArrayList<>();
private final List<String> insertNodeDataBases = new ArrayList<>();
private final List<String> tabletDataBases = new ArrayList<>();

Expand Down Expand Up @@ -90,11 +88,9 @@ protected boolean constructBatch(final TabletInsertionEvent event) throws IOExce
public synchronized void onSuccess() {
super.onSuccess();

binaryBuffers.clear();
insertNodeBuffers.clear();
tabletBuffers.clear();

binaryDataBases.clear();
insertNodeDataBases.clear();
tabletDataBases.clear();
tableModelTabletMap.clear();
Expand Down Expand Up @@ -142,12 +138,7 @@ public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
tableModelTabletMap.clear();

return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
binaryBuffers,
insertNodeBuffers,
tabletBuffers,
binaryDataBases,
insertNodeDataBases,
tabletDataBases);
insertNodeBuffers, tabletBuffers, insertNodeDataBases, tabletDataBases);
}

public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

public class PipeTransferTabletBatchReq extends TPipeTransferReq {

private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();

Expand All @@ -61,26 +60,6 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();

for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement instanceof InsertRowStatement) {
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
} else if (statement instanceof InsertRowsStatement) {
insertRowStatementList.addAll(
((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
binaryReq));
}
}

for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
Expand Down Expand Up @@ -117,24 +96,19 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferTabletBatchReq toTPipeTransferReq(
final List<ByteBuffer> binaryBuffers,
final List<ByteBuffer> insertNodeBuffers,
final List<ByteBuffer> tabletBuffers)
final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> tabletBuffers)
throws IOException {
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();

// batchReq.binaryReqs, batchReq.insertNodeReqs, batchReq.tabletReqs are empty
// batchReq.insertNodeReqs, batchReq.tabletReqs are empty
// when this method is called from PipeTransferTabletBatchReqBuilder.toTPipeTransferReq()

batchReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
for (final ByteBuffer binaryBuffer : binaryBuffers) {
ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
}
// Binary buffer, for rolling upgrade
ReadWriteIOUtils.write(0, outputStream);

ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
Expand All @@ -157,16 +131,9 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
final TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();

// Binary size, for rolling upgrade
ReadWriteIOUtils.readInt(transferReq.body);
int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
final int length = ReadWriteIOUtils.readInt(transferReq.body);
final byte[] body = new byte[length];
transferReq.body.get(body);
batchReq.binaryReqs.add(
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
}

size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
Expand All @@ -188,11 +155,6 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(

/////////////////////////////// TestOnly ///////////////////////////////

@TestOnly
public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
return binaryReqs;
}

@TestOnly
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
return insertNodeReqs;
Expand All @@ -214,8 +176,7 @@ public boolean equals(final Object obj) {
return false;
}
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
return binaryReqs.equals(that.binaryReqs)
&& insertNodeReqs.equals(that.insertNodeReqs)
return insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
Expand All @@ -224,6 +185,6 @@ public boolean equals(final Object obj) {

@Override
public int hashCode() {
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
import java.util.Objects;

public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {

private final transient List<PipeTransferTabletBinaryReqV2> binaryReqs = new ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReqV2> insertNodeReqs =
new ArrayList<>();
private final transient List<PipeTransferTabletRawReqV2> tabletReqs = new ArrayList<>();
Expand All @@ -54,7 +52,7 @@
// Empty constructor
}

public List<InsertBaseStatement> constructStatements() {

Check warning on line 55 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 76 to 64, Complexity from 16 to 14, Nesting Level from 4 to 2, Number of Variables from 14 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0fVBwXoNdODzBrdpcr&open=AZ0fVBwXoNdODzBrdpcr&pullRequest=17341
final List<InsertBaseStatement> statements = new ArrayList<>();

final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
Expand All @@ -66,45 +64,6 @@
final Map<String, List<InsertRowStatement>> tableModelDatabaseInsertRowStatementMap =
new HashMap<>();

for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement.isWriteToTable()) {
if (statement instanceof InsertRowStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
statements.add(statement);
} else if (statement instanceof InsertRowsStatement) {
tableModelDatabaseInsertRowStatementMap
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
.addAll(((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
binaryReq));
}
continue;
}
if (statement instanceof InsertRowStatement) {
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
} else if (statement instanceof InsertRowsStatement) {
insertRowStatementList.addAll(
((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
binaryReq));
}
}

for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq : insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
Expand Down Expand Up @@ -180,10 +139,8 @@
/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
final List<ByteBuffer> binaryBuffers,
final List<ByteBuffer> insertNodeBuffers,
final List<ByteBuffer> tabletBuffers,
final List<String> binaryDataBases,
final List<String> insertNodeDataBases,
final List<String> tabletDataBases)
throws IOException {
Expand All @@ -193,13 +150,8 @@
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
for (int i = 0; i < binaryBuffers.size(); i++) {
final ByteBuffer binaryBuffer = binaryBuffers.get(i);
ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
ReadWriteIOUtils.write(binaryDataBases.get(i), outputStream);
}
// Binary buffer, for rolling upgrade
ReadWriteIOUtils.write(0, outputStream);

ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
for (int i = 0; i < insertNodeBuffers.size(); i++) {
Expand All @@ -226,17 +178,10 @@
final org.apache.iotdb.service.rpc.thrift.TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReqV2 batchReq = new PipeTransferTabletBatchReqV2();

int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
final int length = ReadWriteIOUtils.readInt(transferReq.body);
final byte[] body = new byte[length];
transferReq.body.get(body);
batchReq.binaryReqs.add(
PipeTransferTabletBinaryReqV2.toTPipeTransferBinaryReq(
ByteBuffer.wrap(body), ReadWriteIOUtils.readString(transferReq.body)));
}
// Binary req, for rolling upgrade
ReadWriteIOUtils.readInt(transferReq.body);

size = ReadWriteIOUtils.readInt(transferReq.body);
int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
Expand All @@ -258,11 +203,6 @@

/////////////////////////////// TestOnly ///////////////////////////////

@TestOnly
public List<PipeTransferTabletBinaryReqV2> getBinaryReqs() {
return binaryReqs;
}

@TestOnly
public List<PipeTransferTabletInsertNodeReqV2> getInsertNodeReqs() {
return insertNodeReqs;
Expand All @@ -284,8 +224,7 @@
return false;
}
final PipeTransferTabletBatchReqV2 that = (PipeTransferTabletBatchReqV2) obj;
return Objects.equals(binaryReqs, that.binaryReqs)
&& Objects.equals(insertNodeReqs, that.insertNodeReqs)
return Objects.equals(insertNodeReqs, that.insertNodeReqs)
&& Objects.equals(tabletReqs, that.tabletReqs)
&& version == that.version
&& type == that.type
Expand All @@ -294,6 +233,6 @@

@Override
public int hashCode() {
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
Expand Down Expand Up @@ -217,7 +216,7 @@
private void doTransferWrapper(
final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException, IOException {

Check warning on line 219 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException', as it cannot be thrown from method's body.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0fVBvcoNdODzBrdpco&open=AZ0fVBvcoNdODzBrdpco&pullRequest=17341
// We increase the reference count for this event to determine if the event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
IoTDBDataRegionAirGapSink.class.getName())) {
Expand All @@ -234,20 +233,14 @@
private void doTransfer(
final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException, IOException {
throws PipeException, IOException {
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
final byte[] bytes =
Objects.isNull(insertNode)
? PipeTransferTabletBinaryReqV2.toTPipeTransferBytes(
pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: null)
: PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
insertNode,
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: null);
PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
insertNode,
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: null);

if (!send(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
Expand Down
Loading
Loading