diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 4852b9d116e25..8e0eab23c0e42 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -375,6 +375,19 @@ public CommonConfig setWalBufferSize(int walBufferSize) { return this; } + @Override + public CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) { + setProperty("check_period_when_insert_blocked", String.valueOf(checkPeriodWhenInsertBlocked)); + return this; + } + + @Override + public CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked) { + setProperty( + "max_waiting_time_when_insert_blocked", String.valueOf(maxWaitingTimeWhenInsertBlocked)); + return this; + } + @Override public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) { setProperty("degree_of_query_parallelism", String.valueOf(degreeOfParallelism)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 582c9a049e492..1f806d1c372cc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -375,6 +375,20 @@ public CommonConfig setWalBufferSize(int walBufferSize) { return this; } + @Override + public CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) { + cnConfig.setCheckPeriodWhenInsertBlocked(checkPeriodWhenInsertBlocked); + dnConfig.setCheckPeriodWhenInsertBlocked(checkPeriodWhenInsertBlocked); + return this; + } + + @Override + public CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked) { + cnConfig.setMaxWaitingTimeWhenInsertBlocked(maxWaitingTimeWhenInsertBlocked); + dnConfig.setMaxWaitingTimeWhenInsertBlocked(maxWaitingTimeWhenInsertBlocked); + return this; + } + @Override public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) { cnConfig.setDegreeOfParallelism(degreeOfParallelism); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 48c157e957be8..33dc3a30b8ba8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -264,6 +264,16 @@ public CommonConfig setWalBufferSize(int walBufferSize) { return this; } + @Override + public CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) { + return this; + } + + @Override + public CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked) { + return this; + } + @Override public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index dc21234e2bad2..e721f823a524b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -120,6 +120,10 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setWalBufferSize(int walBufferSize); + CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked); + + CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked); + CommonConfig setDegreeOfParallelism(int degreeOfParallelism); CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderBalanceWithWALBlockIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderBalanceWithWALBlockIT.java new file mode 100644 index 0000000000000..0ccdf62f3863b --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderBalanceWithWALBlockIT.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.load; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.RegionRoleType; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBRegionGroupLeaderBalanceWithWALBlockIT { + + private static final String TEST_SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS = + ConsensusFactory.RATIS_CONSENSUS; + private static final String TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS = + ConsensusFactory.IOT_CONSENSUS; + private static final int TEST_REPLICATION_FACTOR = 3; + private static final int TEST_DATA_NODE_NUM = 3; + private static final int DATABASE_NUM = 3; + private static final int RETRY_NUM = 60; + + private static final String DATABASE = "root.wal_block_db"; + private static final String WAL_THROTTLE_THRESHOLD_IN_BYTE = "wal_throttle_threshold_in_byte"; + private static final String WAL_BLOCKED_STATUS = NodeStatus.ReadOnly.getStatus() + "(WALBlocked)"; + + @Before + public void setUp() { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setEnableAutoLeaderBalanceForRatisConsensus(true) + .setEnableAutoLeaderBalanceForIoTConsensus(true) + .setSchemaRegionConsensusProtocolClass(TEST_SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS) + .setDataRegionConsensusProtocolClass(TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS) + .setSchemaReplicationFactor(TEST_REPLICATION_FACTOR) + .setDataReplicationFactor(TEST_REPLICATION_FACTOR) + .setCheckPeriodWhenInsertBlocked(50) + .setMaxWaitingTimeWhenInsertBlocked(2000); + EnvFactory.getEnv().initClusterEnvironment(1, TEST_DATA_NODE_NUM); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRegionLeaderBalanceWhenWalLongTermBlocked() throws Exception { + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + createDataRegionGroups(client); + waitUntil( + "all DataNodes have balanced DataRegion leaders", + () -> isLeaderDistributionBalanced(client)); + + TRegionInfo targetLeader = findAnyDataRegionLeader(client); + triggerLongTermWalBlockingOnDataNode(client, targetLeader.getDataNodeId()); + + waitUntil( + "target leader DataNode becomes ReadOnly because of long-term WAL blocking", + () -> + WAL_BLOCKED_STATUS.equals( + getNodeStatusWithReason(client, targetLeader.getDataNodeId()))); + waitUntil( + "Region leaders are moved away from ReadOnly DataNodes", + () -> hasNoLeaderOnReadOnlyDataNode(client, targetLeader.getDataNodeId())); + } + } + + private void createDataRegionGroups(SyncConfigNodeIServiceClient client) throws Exception { + for (int i = 0; i < DATABASE_NUM; i++) { + TSStatus status = client.setDatabase(new TDatabaseSchema(DATABASE + i)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Map seriesSlotMap = new HashMap<>(); + seriesSlotMap.put( + new TSeriesPartitionSlot(1), + new TTimeSlotList() + .setTimePartitionSlots(Collections.singletonList(new TTimePartitionSlot(100)))); + Map> databaseSlotsMap = new HashMap<>(); + databaseSlotsMap.put(DATABASE + i, seriesSlotMap); + + TDataPartitionTableResp dataPartitionTableResp = + client.getOrCreateDataPartitionTable(new TDataPartitionReq(databaseSlotsMap)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + } + } + + private boolean isLeaderDistributionBalanced(SyncConfigNodeIServiceClient client) + throws Exception { + Map leaderCounter = new HashMap<>(); + for (TRegionInfo regionInfo : getUserDataRegionInfoList(client)) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + leaderCounter.merge(regionInfo.getDataNodeId(), 1, Integer::sum); + } + } + if (leaderCounter.size() != TEST_DATA_NODE_NUM) { + return false; + } + for (Integer leaderCount : leaderCounter.values()) { + if (leaderCount != DATABASE_NUM / TEST_DATA_NODE_NUM) { + return false; + } + } + return true; + } + + private TRegionInfo findAnyDataRegionLeader(SyncConfigNodeIServiceClient client) + throws Exception { + for (TRegionInfo regionInfo : getUserDataRegionInfoList(client)) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + return regionInfo; + } + } + throw new AssertionError("DataRegion leader not found"); + } + + private void triggerLongTermWalBlockingOnDataNode( + SyncConfigNodeIServiceClient client, int dataNodeId) throws Exception { + Map configItems = new HashMap<>(); + // The throttle threshold used by WALManager is 80% of this value, so 1 makes it 0 and + // deterministically triggers long-term WAL blocking on the target DataNode heartbeat. + configItems.put(WAL_THROTTLE_THRESHOLD_IN_BYTE, "1"); + TSStatus status = client.setConfiguration(new TSetConfigurationReq(configItems, dataNodeId)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + + private String getNodeStatusWithReason(SyncConfigNodeIServiceClient client, int dataNodeId) + throws Exception { + TShowClusterResp showClusterResp = client.showCluster(); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showClusterResp.getStatus().getCode()); + return showClusterResp.getNodeStatus().get(dataNodeId); + } + + private boolean hasNoLeaderOnReadOnlyDataNode( + SyncConfigNodeIServiceClient client, int readOnlyDataNodeId) throws Exception { + for (TRegionInfo regionInfo : getUserDataRegionInfoList(client)) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + if (regionInfo.getDataNodeId() == readOnlyDataNodeId + || NodeStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) { + return false; + } + } + } + return true; + } + + private List getUserDataRegionInfoList(SyncConfigNodeIServiceClient client) + throws Exception { + TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode()); + + List result = new ArrayList<>(); + for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) { + if (TConsensusGroupType.DataRegion.equals(regionInfo.getConsensusGroupId().getType()) + && !regionInfo.getDatabase().startsWith(SystemConstant.SYSTEM_DATABASE) + && !regionInfo.getDatabase().startsWith(SystemConstant.AUDIT_DATABASE)) { + result.add(regionInfo); + } + } + return result; + } + + private void waitUntil(String condition, WaitCondition waitCondition) throws Exception { + for (int retry = 0; retry < RETRY_NUM; retry++) { + if (waitCondition.evaluate()) { + return; + } + TimeUnit.SECONDS.sleep(1); + } + Assert.fail("Failed to wait until " + condition); + } + + @FunctionalInterface + private interface WaitCondition { + boolean evaluate() throws Exception; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e5c996405649d..1315bc79ad806 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -213,6 +213,8 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALWriteBlockStatus; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -2271,6 +2273,7 @@ public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) th resp.setDataRegionRawDataSize(regionRawDataSize); } AuthorityChecker.getAuthorityFetcher().refreshToken(); + updateWALBlockedStatus(); resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp()); resp.setStatus(commonConfig.getNodeStatus().getStatus()); if (commonConfig.getStatusReason() != null) { @@ -2314,6 +2317,11 @@ public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) th return resp; } + private void updateWALBlockedStatus() { + WALWriteBlockStatus.updateStatus( + commonConfig, WALManager.getInstance().isLongTermWriteBlocked()); + } + @Override public TSStatus updateRegionCache(TRegionRouteReq req) { boolean result = ClusterPartitionFetcher.getInstance().updateRegionCache(req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 00eb47d40ae2b..ff8fae8e431cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -52,6 +52,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR; @@ -69,6 +70,9 @@ public class WALManager implements IService { private final AtomicLong totalDiskUsage = new AtomicLong(); // total number of wal files private final AtomicLong totalFileNum = new AtomicLong(); + private final AtomicLong walThrottleStartTimeInMs = new AtomicLong(-1); + private final AtomicLong walBufferQueueBlockedStartTimeInMs = new AtomicLong(-1); + private final AtomicInteger walBufferQueueBlockedWriterCount = new AtomicInteger(0); private WALManager() { if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) @@ -235,6 +239,49 @@ public boolean shouldThrottle() { return getTotalDiskUsage() >= getThrottleThreshold(); } + public boolean isLongTermWriteBlocked() { + return isLongTermWalThrottle() || isLongTermWalBufferQueueBlocked(); + } + + private boolean isLongTermWalThrottle() { + if (!shouldThrottle()) { + walThrottleStartTimeInMs.set(-1); + return false; + } + return isLongTermBlocked(walThrottleStartTimeInMs); + } + + private boolean isLongTermWalBufferQueueBlocked() { + if (walBufferQueueBlockedWriterCount.get() <= 0) { + walBufferQueueBlockedStartTimeInMs.set(-1); + return false; + } + return isLongTermBlocked(walBufferQueueBlockedStartTimeInMs); + } + + private boolean isLongTermBlocked(AtomicLong blockStartTimeInMs) { + long currentTimeInMs = System.currentTimeMillis(); + long blockStartTime = blockStartTimeInMs.get(); + if (blockStartTime < 0) { + blockStartTimeInMs.compareAndSet(-1, currentTimeInMs); + blockStartTime = blockStartTimeInMs.get(); + } + return currentTimeInMs - blockStartTime >= config.getMaxWaitingTimeWhenInsertBlocked(); + } + + public void markWalBufferQueueBlocked() { + if (walBufferQueueBlockedWriterCount.getAndIncrement() == 0) { + walBufferQueueBlockedStartTimeInMs.compareAndSet(-1, System.currentTimeMillis()); + } + } + + public void markWalBufferQueueAvailable() { + if (walBufferQueueBlockedWriterCount.decrementAndGet() <= 0) { + walBufferQueueBlockedWriterCount.set(0); + walBufferQueueBlockedStartTimeInMs.set(-1); + } + } + public long getThrottleThreshold() { return (long) (config.getThrottleThreshold() * 0.8); } @@ -321,6 +368,9 @@ public void syncDeleteOutdatedFilesInWALNodes() { public void clear() { totalDiskUsage.set(0); + walThrottleStartTimeInMs.set(-1); + walBufferQueueBlockedStartTimeInMs.set(-1); + walBufferQueueBlockedWriterCount.set(0); walNodesManager.clear(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatus.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatus.java new file mode 100644 index 0000000000000..70dad8a9d3795 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatus.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.wal; + +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonConfig; + +public final class WALWriteBlockStatus { + + public static final String WAL_BLOCKED = "WALBlocked"; + + private WALWriteBlockStatus() {} + + public static void updateStatus(CommonConfig commonConfig, boolean longTermWriteBlocked) { + if (longTermWriteBlocked) { + if (NodeStatus.Running.equals(commonConfig.getNodeStatus())) { + commonConfig.setNodeStatus(NodeStatus.ReadOnly); + commonConfig.setStatusReason(WAL_BLOCKED); + } + } else if (NodeStatus.ReadOnly.equals(commonConfig.getNodeStatus()) + && WAL_BLOCKED.equals(commonConfig.getStatusReason())) { + commonConfig.setNodeStatus(NodeStatus.Running); + commonConfig.setStatusReason(null); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java index 2bd56a1a4fedb..061e0dea23e59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.utils; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; @@ -50,23 +51,36 @@ public WALEntry poll(long timeout, TimeUnit unit) throws InterruptedException { public void put(WALEntry e) throws InterruptedException { long elementSize = getElementSize(e); - synchronized (nonFullCondition) { - while (!SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocate(elementSize)) { - if (elementSize - > SystemInfo.getInstance().getWalBufferQueueMemoryBlock().getTotalMemorySizeInBytes()) { - throw new IoTDBRuntimeException( - "The element size of WALEntry " - + elementSize - + " is larger than the total memory size of wal buffer queue " - + SystemInfo.getInstance() - .getWalBufferQueueMemoryBlock() - .getTotalMemorySizeInBytes(), - WAL_ENTRY_TOO_LARGE.getStatusCode()); + boolean blocked = false; + try { + synchronized (nonFullCondition) { + while (!SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocate(elementSize)) { + if (elementSize + > SystemInfo.getInstance() + .getWalBufferQueueMemoryBlock() + .getTotalMemorySizeInBytes()) { + throw new IoTDBRuntimeException( + "The element size of WALEntry " + + elementSize + + " is larger than the total memory size of wal buffer queue " + + SystemInfo.getInstance() + .getWalBufferQueueMemoryBlock() + .getTotalMemorySizeInBytes(), + WAL_ENTRY_TOO_LARGE.getStatusCode()); + } + if (!blocked) { + blocked = true; + WALManager.getInstance().markWalBufferQueueBlocked(); + } + nonFullCondition.wait(); } - nonFullCondition.wait(); + } + queue.put(e); + } finally { + if (blocked) { + WALManager.getInstance().markWalBufferQueueAvailable(); } } - queue.put(e); } public WALEntry take() throws InterruptedException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java index 2017805ac7e24..1727d98522eb9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java @@ -44,6 +44,7 @@ import java.io.File; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -58,11 +59,13 @@ public class WALManagerTest { }; private String[] prevWalDirs; private String prevConsensus; + private int prevMaxWaitingTimeWhenInsertBlocked; @Before public void setUp() throws Exception { prevConsensus = config.getDataRegionConsensusProtocolClass(); prevWalDirs = commonConfig.getWalDirs(); + prevMaxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked(); config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); commonConfig.setWalDirs(walDirs); EnvironmentUtils.envSetUp(); @@ -76,6 +79,7 @@ public void tearDown() throws Exception { } config.setDataRegionConsensusProtocolClass(prevConsensus); commonConfig.setWalDirs(prevWalDirs); + config.setMaxWaitingTimeWhenInsertBlocked(prevMaxWaitingTimeWhenInsertBlocked); } @Test @@ -118,6 +122,34 @@ public void testDeleteOutdatedWALFiles() throws IllegalPathException { } } + @Test + public void testLongTermWriteBlockedByWalThrottle() { + WALManager walManager = WALManager.getInstance(); + walManager.addTotalDiskUsage(walManager.getThrottleThreshold()); + + assertFalse(walManager.isLongTermWriteBlocked()); + + config.setMaxWaitingTimeWhenInsertBlocked(0); + assertTrue(walManager.isLongTermWriteBlocked()); + + walManager.clear(); + assertFalse(walManager.isLongTermWriteBlocked()); + } + + @Test + public void testLongTermWriteBlockedByWalBufferQueue() { + WALManager walManager = WALManager.getInstance(); + walManager.markWalBufferQueueBlocked(); + + assertFalse(walManager.isLongTermWriteBlocked()); + + config.setMaxWaitingTimeWhenInsertBlocked(0); + assertTrue(walManager.isLongTermWriteBlocked()); + + walManager.markWalBufferQueueAvailable(); + assertFalse(walManager.isLongTermWriteBlocked()); + } + private InsertRowNode getInsertRowNode() throws IllegalPathException { long time = 110L; TSDataType[] dataTypes = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatusTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatusTest.java new file mode 100644 index 0000000000000..2546457d456bb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatusTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.wal; + +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonConfig; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class WALWriteBlockStatusTest { + + @Test + public void testRunningNodeTurnsReadOnlyWhenWalBlocked() { + CommonConfig commonConfig = mockCommonConfig(NodeStatus.Running, null); + + WALWriteBlockStatus.updateStatus(commonConfig, true); + + assertEquals(NodeStatus.ReadOnly, commonConfig.getNodeStatus()); + assertEquals(WALWriteBlockStatus.WAL_BLOCKED, commonConfig.getStatusReason()); + } + + @Test + public void testWalBlockedReadOnlyNodeRecovers() { + CommonConfig commonConfig = + mockCommonConfig(NodeStatus.ReadOnly, WALWriteBlockStatus.WAL_BLOCKED); + + WALWriteBlockStatus.updateStatus(commonConfig, false); + + assertEquals(NodeStatus.Running, commonConfig.getNodeStatus()); + assertNull(commonConfig.getStatusReason()); + } + + @Test + public void testOtherReadOnlyReasonIsNotOverwrittenOrRecovered() { + CommonConfig commonConfig = mockCommonConfig(NodeStatus.ReadOnly, NodeStatus.DISK_FULL); + + WALWriteBlockStatus.updateStatus(commonConfig, true); + assertEquals(NodeStatus.ReadOnly, commonConfig.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, commonConfig.getStatusReason()); + + WALWriteBlockStatus.updateStatus(commonConfig, false); + assertEquals(NodeStatus.ReadOnly, commonConfig.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, commonConfig.getStatusReason()); + } + + private CommonConfig mockCommonConfig(NodeStatus initialStatus, String initialStatusReason) { + AtomicReference status = new AtomicReference<>(initialStatus); + AtomicReference statusReason = new AtomicReference<>(initialStatusReason); + CommonConfig commonConfig = Mockito.mock(CommonConfig.class); + + Mockito.when(commonConfig.getNodeStatus()).thenAnswer(invocation -> status.get()); + Mockito.when(commonConfig.getStatusReason()).thenAnswer(invocation -> statusReason.get()); + Mockito.doAnswer( + invocation -> { + status.set(invocation.getArgument(0)); + statusReason.set(null); + return null; + }) + .when(commonConfig) + .setNodeStatus(Mockito.any(NodeStatus.class)); + Mockito.doAnswer( + invocation -> { + statusReason.set(invocation.getArgument(0)); + return null; + }) + .when(commonConfig) + .setStatusReason(Mockito.any()); + return commonConfig; + } +}