diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 1a3490344699..cbcf4ca3fd59 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -572,7 +572,7 @@ public void stopForcibly() { Thread.currentThread().interrupt(); logger.error("Waiting node to shutdown error.", e); } - logger.info("In test {} {} started forcibly.", getTestLogDirName(), getId()); + logger.info("In test {} {} stopped forcibly.", getTestLogDirName(), getId()); } @Override diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java index f7d12f10b8fe..181fd2da2b25 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.it.schema; +import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.utils.MetadataUtils; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.isession.ISession; @@ -2749,4 +2750,39 @@ public void testAlterIllegalDataType() { throw new RuntimeException(e); } } + + @Test + public void testCrossPartitionWrite() + throws IoTDBConnectionException, StatementExecutionException { + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE root.cross_partition"); + session.executeNonQueryStatement( + "CREATE TIMESERIES root.cross_partition.device1.sensor1 WITH DATATYPE=INT32,ENCODING=RLE"); + + // Insert data into two partitions + Tablet tablet = + new Tablet( + "root.cross_partition.device1", + Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE))); + tablet.addTimestamp(0, 0); + tablet.addValue("sensor1", 0, 0); + tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL); + tablet.addValue("sensor1", 1, 1); + session.insertTablet(tablet); + + session.executeNonQueryStatement( + "ALTER TIMESERIES root.cross_partition.device1.sensor1 SET DATA TYPE INT64"); + + // Insert data with altered type + tablet = + new Tablet( + "root.cross_partition.device1", + Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.RLE))); + tablet.addTimestamp(0, 0); + tablet.addValue("sensor1", 0, 0L); + tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL); + tablet.addValue("sensor1", 1, 1L); + session.insertTablet(tablet); + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index 7ff4ff75efef..35403c4dc6d0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -233,6 +233,48 @@ public void testNoPermission() throws Exception { } } + @Test + public void testSourcePermissionRestart() throws SQLException { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", connection); + TestUtils.executeNonQueries( + senderEnv, Collections.singletonList("grant READ on root.** to user thulab")); + + statement.execute( + String.format( + "create pipe a2b" + + " with source (" + + "'user'='thulab'" + + ", 'password'='passwD@123456')" + + " with sink (" + + "'node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)")); + TestUtils.executeNonQueries( + receiverEnv, + Arrays.asList( + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)")); + + TestUtils.executeNonQueries(senderEnv, Collections.singletonList("start pipe a2b")); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(pressure) from root.vehicle.plane", + "count(root.vehicle.plane.pressure),", + Collections.singleton("1,")); + } + } + @Test public void testSourcePermission() { TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", null); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 3768f05941b9..0061f41f9952 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1487,15 +1487,10 @@ private boolean insertTabletToTsFileProcessor( registerToTsFile(insertTabletNode, tsFileProcessor); tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics); } catch (DataTypeInconsistentException e) { - // flush both MemTables so that the new type can be inserted into a new MemTable - TsFileProcessor workSequenceProcessor = workSequenceTsFileProcessors.get(timePartitionId); - if (workSequenceProcessor != null) { - fileFlushPolicy.apply(this, workSequenceProcessor, workSequenceProcessor.isSequence()); - } - TsFileProcessor workUnsequenceProcessor = workUnsequenceTsFileProcessors.get(timePartitionId); - if (workUnsequenceProcessor != null) { - fileFlushPolicy.apply(this, workUnsequenceProcessor, workUnsequenceProcessor.isSequence()); - } + // flush all MemTables so that the new type can be inserted into a new MemTable + // cannot just flush the current TsFileProcessor, because the new type may be inserted into + // other TsFileProcessors of this region + asyncCloseAllWorkingTsFileProcessors(); throw e; } catch (WriteProcessRejectException e) { logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index e0225b698a99..b0657272fc5c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -55,6 +55,7 @@ public class CommonConfig { public static final String SYSTEM_CONFIG_NAME = "iotdb-system.properties"; public static final String SYSTEM_CONFIG_TEMPLATE_NAME = "iotdb-system.properties.template"; private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class); + public static final long DEFAULT_TIME_PARTITION_INTERVAL = 604_800_000L; // Open ID Secret private String openIdProviderUrl = ""; @@ -184,7 +185,7 @@ public class CommonConfig { private long timePartitionOrigin = 0; /** Time partition interval in milliseconds. */ - private long timePartitionInterval = 604_800_000; + private long timePartitionInterval = DEFAULT_TIME_PARTITION_INTERVAL; /** This variable set timestamp precision as millisecond, microsecond or nanosecond. */ private String timestampPrecision = "ms";