From 3d9bee9af3eabb0dcb645a45255504284ff8ba5e Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Sat, 21 Mar 2026 14:02:55 +0800 Subject: [PATCH] Add IT for DELETE TIMESERIES replica consistency under IoTConsensusV2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add testDeleteTimeSeriesReplicaConsistency() to verify that DELETE TIMESERIES operations are properly replicated across all DataNode replicas in a 3C3D IoTConsensusV2 cluster. This test reproduces the scenario from the historical deletion replication bug where deletion events lacking replicateIndex were silently dropped by the Processor. The test inserts data with 3 measurements, leaves some data unflushed, deletes one timeseries, then verifies schema consistency on every DataNode — including after stopping and restarting each node in turn to trigger consensus pipe reconstruction and historical replay. Also unifies INSERTION constants to use 3 columns (speed, temperature, power) across all test methods, removing the prior 2-column variants. Co-Authored-By: Claude Opus 4.6 --- .../IoTDBIoTConsensusV23C3DBasicITBase.java | 210 +++++++++++++++++- .../IoTDBIoTConsensusV2Batch3C3DBasicIT.java | 6 + .../IoTDBIoTConsensusV2Stream3C3DBasicIT.java | 6 + 3 files changed, 213 insertions(+), 9 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java index 9544ac5cf2b3..ec04aab39bd7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -37,6 +37,8 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -69,14 +71,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300; protected static final String INSERTION1 = - "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)"; + "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (100, 1, 2, 3)"; protected static final String INSERTION2 = - "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)"; + "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (101, 4, 5, 6)"; protected static final String INSERTION3 = - "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)"; + "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (102, 7, 8, 9)"; protected static final String FLUSH_COMMAND = "flush on cluster"; protected static final String COUNT_QUERY = "select count(*) from root.sg.**"; - protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1"; + protected static final String SELECT_ALL_QUERY = + "select speed, temperature, power from root.sg.d1"; + protected static final String DELETE_TIMESERIES_SPEED = "DELETE TIMESERIES root.sg.d1.speed"; + protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES root.sg.d1.*"; + protected static final String SELECT_SURVIVING_QUERY = + "SELECT temperature, power FROM root.sg.d1"; /** * Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link @@ -210,6 +217,187 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception { } } + /** + * Test that DELETE TIMESERIES is properly replicated to all DataNode replicas via IoTConsensusV2. + * + *

This test reproduces the scenario from the historical deletion replication bug: when a + * timeseries is deleted after data insertion (with some unflushed data), the deletion event must + * be consistently replicated to all replicas. After waiting for replication to complete, stopping + * each DataNode in turn should show the same schema on all surviving nodes. + * + *

Scenario: + * + *

    + *
  1. Insert data into root.sg.d1 with 3 measurements (speed, temperature, power), flush + *
  2. Insert more data (unflushed to create WAL-only entries) + *
  3. DELETE TIMESERIES root.sg.d1.speed + *
  4. Flush again to persist deletion + *
  5. Wait for replication to complete on all DataNodes + *
  6. Verify that every DataNode independently shows the same timeseries (speed is gone) + *
+ */ + public void testDeleteTimeSeriesReplicaConsistency() throws Exception { + try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + + // Step 1: Insert data with 3 measurements and flush + LOGGER.info( + "Step 1: Inserting data with 3 measurements and flushing (mode: {})...", + getIoTConsensusV2Mode()); + statement.execute(INSERTION1); + statement.execute(INSERTION2); + statement.execute(FLUSH_COMMAND); + + // Step 2: Insert more data without flush (creates WAL-only entries) + LOGGER.info("Step 2: Inserting more data without flush (WAL-only entries)..."); + statement.execute(INSERTION3); + + // Step 3: Delete one timeseries + LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed..."); + statement.execute(DELETE_TIMESERIES_SPEED); + + // Step 4: Flush again to persist the deletion + LOGGER.info("Step 4: Flushing to persist deletion..."); + statement.execute(FLUSH_COMMAND); + + // Verify on the current connection: speed should be gone, 2 timeseries remain + verifyTimeSeriesAfterDelete(statement, "via initial connection"); + + // Step 5: Wait for replication to complete on data region leaders + LOGGER.info("Step 5: Waiting for replication to complete on data region leaders..."); + Map>> dataRegionMap = + getDataRegionMapWithLeader(statement); + Set leaderNodeIds = new HashSet<>(); + for (Pair> leaderAndReplicas : dataRegionMap.values()) { + if (leaderAndReplicas.getLeft() > 0) { + leaderNodeIds.add(leaderAndReplicas.getLeft()); + } + } + for (int leaderNodeId : leaderNodeIds) { + EnvFactory.getEnv() + .dataNodeIdToWrapper(leaderNodeId) + .ifPresent(this::waitForReplicationComplete); + } + + // Step 6: Verify schema consistency on each DataNode independently + LOGGER.info("Step 6: Verifying schema consistency on each DataNode independently..."); + List dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList(); + for (DataNodeWrapper wrapper : dataNodeWrappers) { + String nodeDescription = "DataNode " + wrapper.getIp() + ":" + wrapper.getPort(); + LOGGER.info("Verifying schema on {}", nodeDescription); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try (Connection nodeConn = + makeItCloseQuietly( + EnvFactory.getEnv() + .getConnection( + wrapper, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TREE_SQL_DIALECT)); + Statement nodeStmt = makeItCloseQuietly(nodeConn.createStatement())) { + verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription); + } + }); + } + + // Step 7: Stop each DataNode one by one and verify remaining nodes still consistent + LOGGER.info( + "Step 7: Stopping each DataNode in turn and verifying remaining nodes show consistent schema..."); + for (DataNodeWrapper stoppedNode : dataNodeWrappers) { + String stoppedDesc = "DataNode " + stoppedNode.getIp() + ":" + stoppedNode.getPort(); + LOGGER.info("Stopping {}", stoppedDesc); + stoppedNode.stopForcibly(); + Assert.assertFalse(stoppedDesc + " should be stopped", stoppedNode.isAlive()); + + try { + // Verify schema on each surviving node + for (DataNodeWrapper aliveNode : dataNodeWrappers) { + if (aliveNode == stoppedNode) { + continue; + } + String aliveDesc = "DataNode " + aliveNode.getIp() + ":" + aliveNode.getPort(); + Awaitility.await() + .pollDelay(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try (Connection aliveConn = + makeItCloseQuietly( + EnvFactory.getEnv() + .getConnection( + aliveNode, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TREE_SQL_DIALECT)); + Statement aliveStmt = makeItCloseQuietly(aliveConn.createStatement())) { + verifyTimeSeriesAfterDelete( + aliveStmt, aliveDesc + " (while " + stoppedDesc + " is down)"); + } + }); + } + } finally { + // Restart the stopped node before moving to the next iteration + LOGGER.info("Restarting {}", stoppedDesc); + stoppedNode.start(); + // Wait for the restarted node to rejoin + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until(stoppedNode::isAlive); + } + } + + LOGGER.info( + "DELETE TIMESERIES replica consistency test passed for mode: {}", + getIoTConsensusV2Mode()); + } + } + + /** + * Verify that after deleting root.sg.d1.speed, only temperature and power timeseries remain, and + * that data queries do not return the deleted timeseries. + */ + private void verifyTimeSeriesAfterDelete(Statement statement, String context) throws Exception { + // Verify via SHOW TIMESERIES: speed should be gone, only temperature and power remain + Set timeseries = new HashSet<>(); + try (ResultSet resultSet = statement.executeQuery(SHOW_TIMESERIES_D1)) { + while (resultSet.next()) { + timeseries.add(resultSet.getString("Timeseries")); + } + } + LOGGER.info("[{}] SHOW TIMESERIES result: {}", context, timeseries); + Assert.assertEquals( + "[" + context + "] Expected exactly 2 timeseries after delete (temperature, power)", + 2, + timeseries.size()); + Assert.assertFalse( + "[" + context + "] root.sg.d1.speed should have been deleted", + timeseries.contains("root.sg.d1.speed")); + Assert.assertTrue( + "[" + context + "] root.sg.d1.temperature should still exist", + timeseries.contains("root.sg.d1.temperature")); + Assert.assertTrue( + "[" + context + "] root.sg.d1.power should still exist", + timeseries.contains("root.sg.d1.power")); + + // Verify via SELECT: only temperature and power columns should return data + try (ResultSet selectResult = statement.executeQuery(SELECT_SURVIVING_QUERY)) { + int rowCount = 0; + while (selectResult.next()) { + rowCount++; + } + // After delete, remaining data depends on whether unflushed data for the deleted + // timeseries was also cleaned up. We mainly verify that the query doesn't fail + // and that some rows are returned for the surviving measurements. + Assert.assertTrue( + "[" + context + "] Expected at least 1 row from SELECT on surviving timeseries", + rowCount >= 1); + } + } + private static final Pattern SYNC_LAG_PATTERN = Pattern.compile("iot_consensus_v2\\{[^}]*type=\"syncLag\"[^}]*}\\s+(\\S+)"); @@ -259,7 +447,7 @@ protected void verifyDataConsistency(Statement statement) throws Exception { totalCount += parseLongFromString(countResult.getString(i)); } Assert.assertEquals( - "Expected 6 total data points (3 timestamps x 2 measurements)", 6, totalCount); + "Expected 9 total data points (3 timestamps x 3 measurements)", 9, totalCount); } int rowCount = 0; @@ -269,15 +457,19 @@ protected void verifyDataConsistency(Statement statement) throws Exception { long timestamp = parseLongFromString(selectResult.getString(1)); long speed = parseLongFromString(selectResult.getString(2)); long temperature = parseLongFromString(selectResult.getString(3)); + long power = parseLongFromString(selectResult.getString(4)); if (timestamp == 100) { Assert.assertEquals(1, speed); Assert.assertEquals(2, temperature); + Assert.assertEquals(3, power); } else if (timestamp == 101) { - Assert.assertEquals(3, speed); - Assert.assertEquals(4, temperature); + Assert.assertEquals(4, speed); + Assert.assertEquals(5, temperature); + Assert.assertEquals(6, power); } else if (timestamp == 102) { - Assert.assertEquals(5, speed); - Assert.assertEquals(6, temperature); + Assert.assertEquals(7, speed); + Assert.assertEquals(8, temperature); + Assert.assertEquals(9, power); } } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java index f71462fa470a..bb97014d2134 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java @@ -49,4 +49,10 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception { public void test3C3DWriteFlushAndQuery() throws Exception { super.test3C3DWriteFlushAndQuery(); } + + @Override + @Test + public void testDeleteTimeSeriesReplicaConsistency() throws Exception { + super.testDeleteTimeSeriesReplicaConsistency(); + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java index 856d3624bf18..d4c0bf22ab43 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java @@ -49,4 +49,10 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception { public void test3C3DWriteFlushAndQuery() throws Exception { super.test3C3DWriteFlushAndQuery(); } + + @Override + @Test + public void testDeleteTimeSeriesReplicaConsistency() throws Exception { + super.testDeleteTimeSeriesReplicaConsistency(); + } }