diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
index 9544ac5cf2b3..ec04aab39bd7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
@@ -37,6 +37,8 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -69,14 +71,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300;
protected static final String INSERTION1 =
- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (100, 1, 2, 3)";
protected static final String INSERTION2 =
- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)";
+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (101, 4, 5, 6)";
protected static final String INSERTION3 =
- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)";
+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (102, 7, 8, 9)";
protected static final String FLUSH_COMMAND = "flush on cluster";
protected static final String COUNT_QUERY = "select count(*) from root.sg.**";
- protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1";
+ protected static final String SELECT_ALL_QUERY =
+ "select speed, temperature, power from root.sg.d1";
+ protected static final String DELETE_TIMESERIES_SPEED = "DELETE TIMESERIES root.sg.d1.speed";
+ protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES root.sg.d1.*";
+ protected static final String SELECT_SURVIVING_QUERY =
+ "SELECT temperature, power FROM root.sg.d1";
/**
* Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link
@@ -210,6 +217,187 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
}
}
+ /**
+ * Test that DELETE TIMESERIES is properly replicated to all DataNode replicas via IoTConsensusV2.
+ *
+ *
This test reproduces the scenario from the historical deletion replication bug: when a
+ * timeseries is deleted after data insertion (with some unflushed data), the deletion event must
+ * be consistently replicated to all replicas. After waiting for replication to complete, stopping
+ * each DataNode in turn should show the same schema on all surviving nodes.
+ *
+ *
Scenario:
+ *
+ *
+ * - Insert data into root.sg.d1 with 3 measurements (speed, temperature, power), flush
+ *
- Insert more data (unflushed to create WAL-only entries)
+ *
- DELETE TIMESERIES root.sg.d1.speed
+ *
- Flush again to persist deletion
+ *
- Wait for replication to complete on all DataNodes
+ *
- Verify that every DataNode independently shows the same timeseries (speed is gone)
+ *
+ */
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ Statement statement = makeItCloseQuietly(connection.createStatement())) {
+
+ // Step 1: Insert data with 3 measurements and flush
+ LOGGER.info(
+ "Step 1: Inserting data with 3 measurements and flushing (mode: {})...",
+ getIoTConsensusV2Mode());
+ statement.execute(INSERTION1);
+ statement.execute(INSERTION2);
+ statement.execute(FLUSH_COMMAND);
+
+ // Step 2: Insert more data without flush (creates WAL-only entries)
+ LOGGER.info("Step 2: Inserting more data without flush (WAL-only entries)...");
+ statement.execute(INSERTION3);
+
+ // Step 3: Delete one timeseries
+ LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed...");
+ statement.execute(DELETE_TIMESERIES_SPEED);
+
+ // Step 4: Flush again to persist the deletion
+ LOGGER.info("Step 4: Flushing to persist deletion...");
+ statement.execute(FLUSH_COMMAND);
+
+ // Verify on the current connection: speed should be gone, 2 timeseries remain
+ verifyTimeSeriesAfterDelete(statement, "via initial connection");
+
+ // Step 5: Wait for replication to complete on data region leaders
+ LOGGER.info("Step 5: Waiting for replication to complete on data region leaders...");
+ Map>> dataRegionMap =
+ getDataRegionMapWithLeader(statement);
+ Set leaderNodeIds = new HashSet<>();
+ for (Pair> leaderAndReplicas : dataRegionMap.values()) {
+ if (leaderAndReplicas.getLeft() > 0) {
+ leaderNodeIds.add(leaderAndReplicas.getLeft());
+ }
+ }
+ for (int leaderNodeId : leaderNodeIds) {
+ EnvFactory.getEnv()
+ .dataNodeIdToWrapper(leaderNodeId)
+ .ifPresent(this::waitForReplicationComplete);
+ }
+
+ // Step 6: Verify schema consistency on each DataNode independently
+ LOGGER.info("Step 6: Verifying schema consistency on each DataNode independently...");
+ List dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList();
+ for (DataNodeWrapper wrapper : dataNodeWrappers) {
+ String nodeDescription = "DataNode " + wrapper.getIp() + ":" + wrapper.getPort();
+ LOGGER.info("Verifying schema on {}", nodeDescription);
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (Connection nodeConn =
+ makeItCloseQuietly(
+ EnvFactory.getEnv()
+ .getConnection(
+ wrapper,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TREE_SQL_DIALECT));
+ Statement nodeStmt = makeItCloseQuietly(nodeConn.createStatement())) {
+ verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription);
+ }
+ });
+ }
+
+ // Step 7: Stop each DataNode one by one and verify remaining nodes still consistent
+ LOGGER.info(
+ "Step 7: Stopping each DataNode in turn and verifying remaining nodes show consistent schema...");
+ for (DataNodeWrapper stoppedNode : dataNodeWrappers) {
+ String stoppedDesc = "DataNode " + stoppedNode.getIp() + ":" + stoppedNode.getPort();
+ LOGGER.info("Stopping {}", stoppedDesc);
+ stoppedNode.stopForcibly();
+ Assert.assertFalse(stoppedDesc + " should be stopped", stoppedNode.isAlive());
+
+ try {
+ // Verify schema on each surviving node
+ for (DataNodeWrapper aliveNode : dataNodeWrappers) {
+ if (aliveNode == stoppedNode) {
+ continue;
+ }
+ String aliveDesc = "DataNode " + aliveNode.getIp() + ":" + aliveNode.getPort();
+ Awaitility.await()
+ .pollDelay(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (Connection aliveConn =
+ makeItCloseQuietly(
+ EnvFactory.getEnv()
+ .getConnection(
+ aliveNode,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TREE_SQL_DIALECT));
+ Statement aliveStmt = makeItCloseQuietly(aliveConn.createStatement())) {
+ verifyTimeSeriesAfterDelete(
+ aliveStmt, aliveDesc + " (while " + stoppedDesc + " is down)");
+ }
+ });
+ }
+ } finally {
+ // Restart the stopped node before moving to the next iteration
+ LOGGER.info("Restarting {}", stoppedDesc);
+ stoppedNode.start();
+ // Wait for the restarted node to rejoin
+ Awaitility.await()
+ .atMost(120, TimeUnit.SECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .until(stoppedNode::isAlive);
+ }
+ }
+
+ LOGGER.info(
+ "DELETE TIMESERIES replica consistency test passed for mode: {}",
+ getIoTConsensusV2Mode());
+ }
+ }
+
+ /**
+ * Verify that after deleting root.sg.d1.speed, only temperature and power timeseries remain, and
+ * that data queries do not return the deleted timeseries.
+ */
+ private void verifyTimeSeriesAfterDelete(Statement statement, String context) throws Exception {
+ // Verify via SHOW TIMESERIES: speed should be gone, only temperature and power remain
+ Set timeseries = new HashSet<>();
+ try (ResultSet resultSet = statement.executeQuery(SHOW_TIMESERIES_D1)) {
+ while (resultSet.next()) {
+ timeseries.add(resultSet.getString("Timeseries"));
+ }
+ }
+ LOGGER.info("[{}] SHOW TIMESERIES result: {}", context, timeseries);
+ Assert.assertEquals(
+ "[" + context + "] Expected exactly 2 timeseries after delete (temperature, power)",
+ 2,
+ timeseries.size());
+ Assert.assertFalse(
+ "[" + context + "] root.sg.d1.speed should have been deleted",
+ timeseries.contains("root.sg.d1.speed"));
+ Assert.assertTrue(
+ "[" + context + "] root.sg.d1.temperature should still exist",
+ timeseries.contains("root.sg.d1.temperature"));
+ Assert.assertTrue(
+ "[" + context + "] root.sg.d1.power should still exist",
+ timeseries.contains("root.sg.d1.power"));
+
+ // Verify via SELECT: only temperature and power columns should return data
+ try (ResultSet selectResult = statement.executeQuery(SELECT_SURVIVING_QUERY)) {
+ int rowCount = 0;
+ while (selectResult.next()) {
+ rowCount++;
+ }
+ // After delete, remaining data depends on whether unflushed data for the deleted
+ // timeseries was also cleaned up. We mainly verify that the query doesn't fail
+ // and that some rows are returned for the surviving measurements.
+ Assert.assertTrue(
+ "[" + context + "] Expected at least 1 row from SELECT on surviving timeseries",
+ rowCount >= 1);
+ }
+ }
+
private static final Pattern SYNC_LAG_PATTERN =
Pattern.compile("iot_consensus_v2\\{[^}]*type=\"syncLag\"[^}]*}\\s+(\\S+)");
@@ -259,7 +447,7 @@ protected void verifyDataConsistency(Statement statement) throws Exception {
totalCount += parseLongFromString(countResult.getString(i));
}
Assert.assertEquals(
- "Expected 6 total data points (3 timestamps x 2 measurements)", 6, totalCount);
+ "Expected 9 total data points (3 timestamps x 3 measurements)", 9, totalCount);
}
int rowCount = 0;
@@ -269,15 +457,19 @@ protected void verifyDataConsistency(Statement statement) throws Exception {
long timestamp = parseLongFromString(selectResult.getString(1));
long speed = parseLongFromString(selectResult.getString(2));
long temperature = parseLongFromString(selectResult.getString(3));
+ long power = parseLongFromString(selectResult.getString(4));
if (timestamp == 100) {
Assert.assertEquals(1, speed);
Assert.assertEquals(2, temperature);
+ Assert.assertEquals(3, power);
} else if (timestamp == 101) {
- Assert.assertEquals(3, speed);
- Assert.assertEquals(4, temperature);
+ Assert.assertEquals(4, speed);
+ Assert.assertEquals(5, temperature);
+ Assert.assertEquals(6, power);
} else if (timestamp == 102) {
- Assert.assertEquals(5, speed);
- Assert.assertEquals(6, temperature);
+ Assert.assertEquals(7, speed);
+ Assert.assertEquals(8, temperature);
+ Assert.assertEquals(9, power);
}
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
index f71462fa470a..bb97014d2134 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
@@ -49,4 +49,10 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
public void test3C3DWriteFlushAndQuery() throws Exception {
super.test3C3DWriteFlushAndQuery();
}
+
+ @Override
+ @Test
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ super.testDeleteTimeSeriesReplicaConsistency();
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
index 856d3624bf18..d4c0bf22ab43 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
@@ -49,4 +49,10 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
public void test3C3DWriteFlushAndQuery() throws Exception {
super.test3C3DWriteFlushAndQuery();
}
+
+ @Override
+ @Test
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ super.testDeleteTimeSeriesReplicaConsistency();
+ }
}