From 03e724525230e765227d4680929e248b59ec212a Mon Sep 17 00:00:00 2001 From: libo Date: Thu, 21 May 2026 10:27:05 +0800 Subject: [PATCH] Trigger leader balance on long-term WAL write blocking Mark DataNode as ReadOnly(WALBlocked) when WAL write blocking persists, and let ConfigNode move Region leaders away from the blocked DataNode. Add UT and IT coverage for WAL block status and leader balance behavior. --- .../env/cluster/config/MppCommonConfig.java | 13 + .../cluster/config/MppSharedCommonConfig.java | 14 ++ .../env/remote/config/RemoteCommonConfig.java | 10 + .../apache/iotdb/itbase/env/CommonConfig.java | 4 + ...egionGroupLeaderBalanceWithWALBlockIT.java | 232 ++++++++++++++++++ .../impl/DataNodeInternalRPCServiceImpl.java | 8 + .../dataregion/wal/WALManager.java | 50 ++++ .../dataregion/wal/WALWriteBlockStatus.java | 43 ++++ .../utils/MemoryControlledWALEntryQueue.java | 42 ++-- .../dataregion/wal/WALManagerTest.java | 32 +++ .../wal/WALWriteBlockStatusTest.java | 93 +++++++ 11 files changed, 527 insertions(+), 14 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderBalanceWithWALBlockIT.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatus.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatusTest.java diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 4852b9d116e25..8e0eab23c0e42 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -375,6 +375,19 @@ public CommonConfig setWalBufferSize(int walBufferSize) { return this; } + @Override + public CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) { + setProperty("check_period_when_insert_blocked", String.valueOf(checkPeriodWhenInsertBlocked)); + return this; + } + + @Override + public CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked) { + setProperty( + "max_waiting_time_when_insert_blocked", String.valueOf(maxWaitingTimeWhenInsertBlocked)); + return this; + } + @Override public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) { setProperty("degree_of_query_parallelism", String.valueOf(degreeOfParallelism)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 582c9a049e492..1f806d1c372cc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -375,6 +375,20 @@ public CommonConfig setWalBufferSize(int walBufferSize) { return this; } + @Override + public CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) { + cnConfig.setCheckPeriodWhenInsertBlocked(checkPeriodWhenInsertBlocked); + dnConfig.setCheckPeriodWhenInsertBlocked(checkPeriodWhenInsertBlocked); + return this; + } + + @Override + public CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked) { + cnConfig.setMaxWaitingTimeWhenInsertBlocked(maxWaitingTimeWhenInsertBlocked); + dnConfig.setMaxWaitingTimeWhenInsertBlocked(maxWaitingTimeWhenInsertBlocked); + return this; + } + @Override public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) { cnConfig.setDegreeOfParallelism(degreeOfParallelism); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 48c157e957be8..33dc3a30b8ba8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -264,6 +264,16 @@ public CommonConfig setWalBufferSize(int walBufferSize) { return this; } + @Override + public CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) { + return this; + } + + @Override + public CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked) { + return this; + } + @Override public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index dc21234e2bad2..e721f823a524b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -120,6 +120,10 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setWalBufferSize(int walBufferSize); + CommonConfig setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked); + + CommonConfig setMaxWaitingTimeWhenInsertBlocked(int maxWaitingTimeWhenInsertBlocked); + CommonConfig setDegreeOfParallelism(int degreeOfParallelism); CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderBalanceWithWALBlockIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderBalanceWithWALBlockIT.java new file mode 100644 index 0000000000000..0ccdf62f3863b --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderBalanceWithWALBlockIT.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.load; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.RegionRoleType; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBRegionGroupLeaderBalanceWithWALBlockIT { + + private static final String TEST_SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS = + ConsensusFactory.RATIS_CONSENSUS; + private static final String TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS = + ConsensusFactory.IOT_CONSENSUS; + private static final int TEST_REPLICATION_FACTOR = 3; + private static final int TEST_DATA_NODE_NUM = 3; + private static final int DATABASE_NUM = 3; + private static final int RETRY_NUM = 60; + + private static final String DATABASE = "root.wal_block_db"; + private static final String WAL_THROTTLE_THRESHOLD_IN_BYTE = "wal_throttle_threshold_in_byte"; + private static final String WAL_BLOCKED_STATUS = NodeStatus.ReadOnly.getStatus() + "(WALBlocked)"; + + @Before + public void setUp() { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setEnableAutoLeaderBalanceForRatisConsensus(true) + .setEnableAutoLeaderBalanceForIoTConsensus(true) + .setSchemaRegionConsensusProtocolClass(TEST_SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS) + .setDataRegionConsensusProtocolClass(TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS) + .setSchemaReplicationFactor(TEST_REPLICATION_FACTOR) + .setDataReplicationFactor(TEST_REPLICATION_FACTOR) + .setCheckPeriodWhenInsertBlocked(50) + .setMaxWaitingTimeWhenInsertBlocked(2000); + EnvFactory.getEnv().initClusterEnvironment(1, TEST_DATA_NODE_NUM); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRegionLeaderBalanceWhenWalLongTermBlocked() throws Exception { + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + createDataRegionGroups(client); + waitUntil( + "all DataNodes have balanced DataRegion leaders", + () -> isLeaderDistributionBalanced(client)); + + TRegionInfo targetLeader = findAnyDataRegionLeader(client); + triggerLongTermWalBlockingOnDataNode(client, targetLeader.getDataNodeId()); + + waitUntil( + "target leader DataNode becomes ReadOnly because of long-term WAL blocking", + () -> + WAL_BLOCKED_STATUS.equals( + getNodeStatusWithReason(client, targetLeader.getDataNodeId()))); + waitUntil( + "Region leaders are moved away from ReadOnly DataNodes", + () -> hasNoLeaderOnReadOnlyDataNode(client, targetLeader.getDataNodeId())); + } + } + + private void createDataRegionGroups(SyncConfigNodeIServiceClient client) throws Exception { + for (int i = 0; i < DATABASE_NUM; i++) { + TSStatus status = client.setDatabase(new TDatabaseSchema(DATABASE + i)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Map seriesSlotMap = new HashMap<>(); + seriesSlotMap.put( + new TSeriesPartitionSlot(1), + new TTimeSlotList() + .setTimePartitionSlots(Collections.singletonList(new TTimePartitionSlot(100)))); + Map> databaseSlotsMap = new HashMap<>(); + databaseSlotsMap.put(DATABASE + i, seriesSlotMap); + + TDataPartitionTableResp dataPartitionTableResp = + client.getOrCreateDataPartitionTable(new TDataPartitionReq(databaseSlotsMap)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + } + } + + private boolean isLeaderDistributionBalanced(SyncConfigNodeIServiceClient client) + throws Exception { + Map leaderCounter = new HashMap<>(); + for (TRegionInfo regionInfo : getUserDataRegionInfoList(client)) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + leaderCounter.merge(regionInfo.getDataNodeId(), 1, Integer::sum); + } + } + if (leaderCounter.size() != TEST_DATA_NODE_NUM) { + return false; + } + for (Integer leaderCount : leaderCounter.values()) { + if (leaderCount != DATABASE_NUM / TEST_DATA_NODE_NUM) { + return false; + } + } + return true; + } + + private TRegionInfo findAnyDataRegionLeader(SyncConfigNodeIServiceClient client) + throws Exception { + for (TRegionInfo regionInfo : getUserDataRegionInfoList(client)) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + return regionInfo; + } + } + throw new AssertionError("DataRegion leader not found"); + } + + private void triggerLongTermWalBlockingOnDataNode( + SyncConfigNodeIServiceClient client, int dataNodeId) throws Exception { + Map configItems = new HashMap<>(); + // The throttle threshold used by WALManager is 80% of this value, so 1 makes it 0 and + // deterministically triggers long-term WAL blocking on the target DataNode heartbeat. + configItems.put(WAL_THROTTLE_THRESHOLD_IN_BYTE, "1"); + TSStatus status = client.setConfiguration(new TSetConfigurationReq(configItems, dataNodeId)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + + private String getNodeStatusWithReason(SyncConfigNodeIServiceClient client, int dataNodeId) + throws Exception { + TShowClusterResp showClusterResp = client.showCluster(); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showClusterResp.getStatus().getCode()); + return showClusterResp.getNodeStatus().get(dataNodeId); + } + + private boolean hasNoLeaderOnReadOnlyDataNode( + SyncConfigNodeIServiceClient client, int readOnlyDataNodeId) throws Exception { + for (TRegionInfo regionInfo : getUserDataRegionInfoList(client)) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + if (regionInfo.getDataNodeId() == readOnlyDataNodeId + || NodeStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) { + return false; + } + } + } + return true; + } + + private List getUserDataRegionInfoList(SyncConfigNodeIServiceClient client) + throws Exception { + TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode()); + + List result = new ArrayList<>(); + for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) { + if (TConsensusGroupType.DataRegion.equals(regionInfo.getConsensusGroupId().getType()) + && !regionInfo.getDatabase().startsWith(SystemConstant.SYSTEM_DATABASE) + && !regionInfo.getDatabase().startsWith(SystemConstant.AUDIT_DATABASE)) { + result.add(regionInfo); + } + } + return result; + } + + private void waitUntil(String condition, WaitCondition waitCondition) throws Exception { + for (int retry = 0; retry < RETRY_NUM; retry++) { + if (waitCondition.evaluate()) { + return; + } + TimeUnit.SECONDS.sleep(1); + } + Assert.fail("Failed to wait until " + condition); + } + + @FunctionalInterface + private interface WaitCondition { + boolean evaluate() throws Exception; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e5c996405649d..1315bc79ad806 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -213,6 +213,8 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALWriteBlockStatus; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -2271,6 +2273,7 @@ public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) th resp.setDataRegionRawDataSize(regionRawDataSize); } AuthorityChecker.getAuthorityFetcher().refreshToken(); + updateWALBlockedStatus(); resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp()); resp.setStatus(commonConfig.getNodeStatus().getStatus()); if (commonConfig.getStatusReason() != null) { @@ -2314,6 +2317,11 @@ public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) th return resp; } + private void updateWALBlockedStatus() { + WALWriteBlockStatus.updateStatus( + commonConfig, WALManager.getInstance().isLongTermWriteBlocked()); + } + @Override public TSStatus updateRegionCache(TRegionRouteReq req) { boolean result = ClusterPartitionFetcher.getInstance().updateRegionCache(req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 00eb47d40ae2b..ff8fae8e431cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -52,6 +52,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR; @@ -69,6 +70,9 @@ public class WALManager implements IService { private final AtomicLong totalDiskUsage = new AtomicLong(); // total number of wal files private final AtomicLong totalFileNum = new AtomicLong(); + private final AtomicLong walThrottleStartTimeInMs = new AtomicLong(-1); + private final AtomicLong walBufferQueueBlockedStartTimeInMs = new AtomicLong(-1); + private final AtomicInteger walBufferQueueBlockedWriterCount = new AtomicInteger(0); private WALManager() { if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) @@ -235,6 +239,49 @@ public boolean shouldThrottle() { return getTotalDiskUsage() >= getThrottleThreshold(); } + public boolean isLongTermWriteBlocked() { + return isLongTermWalThrottle() || isLongTermWalBufferQueueBlocked(); + } + + private boolean isLongTermWalThrottle() { + if (!shouldThrottle()) { + walThrottleStartTimeInMs.set(-1); + return false; + } + return isLongTermBlocked(walThrottleStartTimeInMs); + } + + private boolean isLongTermWalBufferQueueBlocked() { + if (walBufferQueueBlockedWriterCount.get() <= 0) { + walBufferQueueBlockedStartTimeInMs.set(-1); + return false; + } + return isLongTermBlocked(walBufferQueueBlockedStartTimeInMs); + } + + private boolean isLongTermBlocked(AtomicLong blockStartTimeInMs) { + long currentTimeInMs = System.currentTimeMillis(); + long blockStartTime = blockStartTimeInMs.get(); + if (blockStartTime < 0) { + blockStartTimeInMs.compareAndSet(-1, currentTimeInMs); + blockStartTime = blockStartTimeInMs.get(); + } + return currentTimeInMs - blockStartTime >= config.getMaxWaitingTimeWhenInsertBlocked(); + } + + public void markWalBufferQueueBlocked() { + if (walBufferQueueBlockedWriterCount.getAndIncrement() == 0) { + walBufferQueueBlockedStartTimeInMs.compareAndSet(-1, System.currentTimeMillis()); + } + } + + public void markWalBufferQueueAvailable() { + if (walBufferQueueBlockedWriterCount.decrementAndGet() <= 0) { + walBufferQueueBlockedWriterCount.set(0); + walBufferQueueBlockedStartTimeInMs.set(-1); + } + } + public long getThrottleThreshold() { return (long) (config.getThrottleThreshold() * 0.8); } @@ -321,6 +368,9 @@ public void syncDeleteOutdatedFilesInWALNodes() { public void clear() { totalDiskUsage.set(0); + walThrottleStartTimeInMs.set(-1); + walBufferQueueBlockedStartTimeInMs.set(-1); + walBufferQueueBlockedWriterCount.set(0); walNodesManager.clear(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatus.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatus.java new file mode 100644 index 0000000000000..70dad8a9d3795 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatus.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.wal; + +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonConfig; + +public final class WALWriteBlockStatus { + + public static final String WAL_BLOCKED = "WALBlocked"; + + private WALWriteBlockStatus() {} + + public static void updateStatus(CommonConfig commonConfig, boolean longTermWriteBlocked) { + if (longTermWriteBlocked) { + if (NodeStatus.Running.equals(commonConfig.getNodeStatus())) { + commonConfig.setNodeStatus(NodeStatus.ReadOnly); + commonConfig.setStatusReason(WAL_BLOCKED); + } + } else if (NodeStatus.ReadOnly.equals(commonConfig.getNodeStatus()) + && WAL_BLOCKED.equals(commonConfig.getStatusReason())) { + commonConfig.setNodeStatus(NodeStatus.Running); + commonConfig.setStatusReason(null); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java index 2bd56a1a4fedb..061e0dea23e59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.utils; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; @@ -50,23 +51,36 @@ public WALEntry poll(long timeout, TimeUnit unit) throws InterruptedException { public void put(WALEntry e) throws InterruptedException { long elementSize = getElementSize(e); - synchronized (nonFullCondition) { - while (!SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocate(elementSize)) { - if (elementSize - > SystemInfo.getInstance().getWalBufferQueueMemoryBlock().getTotalMemorySizeInBytes()) { - throw new IoTDBRuntimeException( - "The element size of WALEntry " - + elementSize - + " is larger than the total memory size of wal buffer queue " - + SystemInfo.getInstance() - .getWalBufferQueueMemoryBlock() - .getTotalMemorySizeInBytes(), - WAL_ENTRY_TOO_LARGE.getStatusCode()); + boolean blocked = false; + try { + synchronized (nonFullCondition) { + while (!SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocate(elementSize)) { + if (elementSize + > SystemInfo.getInstance() + .getWalBufferQueueMemoryBlock() + .getTotalMemorySizeInBytes()) { + throw new IoTDBRuntimeException( + "The element size of WALEntry " + + elementSize + + " is larger than the total memory size of wal buffer queue " + + SystemInfo.getInstance() + .getWalBufferQueueMemoryBlock() + .getTotalMemorySizeInBytes(), + WAL_ENTRY_TOO_LARGE.getStatusCode()); + } + if (!blocked) { + blocked = true; + WALManager.getInstance().markWalBufferQueueBlocked(); + } + nonFullCondition.wait(); } - nonFullCondition.wait(); + } + queue.put(e); + } finally { + if (blocked) { + WALManager.getInstance().markWalBufferQueueAvailable(); } } - queue.put(e); } public WALEntry take() throws InterruptedException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java index 2017805ac7e24..1727d98522eb9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManagerTest.java @@ -44,6 +44,7 @@ import java.io.File; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -58,11 +59,13 @@ public class WALManagerTest { }; private String[] prevWalDirs; private String prevConsensus; + private int prevMaxWaitingTimeWhenInsertBlocked; @Before public void setUp() throws Exception { prevConsensus = config.getDataRegionConsensusProtocolClass(); prevWalDirs = commonConfig.getWalDirs(); + prevMaxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked(); config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); commonConfig.setWalDirs(walDirs); EnvironmentUtils.envSetUp(); @@ -76,6 +79,7 @@ public void tearDown() throws Exception { } config.setDataRegionConsensusProtocolClass(prevConsensus); commonConfig.setWalDirs(prevWalDirs); + config.setMaxWaitingTimeWhenInsertBlocked(prevMaxWaitingTimeWhenInsertBlocked); } @Test @@ -118,6 +122,34 @@ public void testDeleteOutdatedWALFiles() throws IllegalPathException { } } + @Test + public void testLongTermWriteBlockedByWalThrottle() { + WALManager walManager = WALManager.getInstance(); + walManager.addTotalDiskUsage(walManager.getThrottleThreshold()); + + assertFalse(walManager.isLongTermWriteBlocked()); + + config.setMaxWaitingTimeWhenInsertBlocked(0); + assertTrue(walManager.isLongTermWriteBlocked()); + + walManager.clear(); + assertFalse(walManager.isLongTermWriteBlocked()); + } + + @Test + public void testLongTermWriteBlockedByWalBufferQueue() { + WALManager walManager = WALManager.getInstance(); + walManager.markWalBufferQueueBlocked(); + + assertFalse(walManager.isLongTermWriteBlocked()); + + config.setMaxWaitingTimeWhenInsertBlocked(0); + assertTrue(walManager.isLongTermWriteBlocked()); + + walManager.markWalBufferQueueAvailable(); + assertFalse(walManager.isLongTermWriteBlocked()); + } + private InsertRowNode getInsertRowNode() throws IllegalPathException { long time = 110L; TSDataType[] dataTypes = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatusTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatusTest.java new file mode 100644 index 0000000000000..2546457d456bb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALWriteBlockStatusTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.wal; + +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonConfig; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class WALWriteBlockStatusTest { + + @Test + public void testRunningNodeTurnsReadOnlyWhenWalBlocked() { + CommonConfig commonConfig = mockCommonConfig(NodeStatus.Running, null); + + WALWriteBlockStatus.updateStatus(commonConfig, true); + + assertEquals(NodeStatus.ReadOnly, commonConfig.getNodeStatus()); + assertEquals(WALWriteBlockStatus.WAL_BLOCKED, commonConfig.getStatusReason()); + } + + @Test + public void testWalBlockedReadOnlyNodeRecovers() { + CommonConfig commonConfig = + mockCommonConfig(NodeStatus.ReadOnly, WALWriteBlockStatus.WAL_BLOCKED); + + WALWriteBlockStatus.updateStatus(commonConfig, false); + + assertEquals(NodeStatus.Running, commonConfig.getNodeStatus()); + assertNull(commonConfig.getStatusReason()); + } + + @Test + public void testOtherReadOnlyReasonIsNotOverwrittenOrRecovered() { + CommonConfig commonConfig = mockCommonConfig(NodeStatus.ReadOnly, NodeStatus.DISK_FULL); + + WALWriteBlockStatus.updateStatus(commonConfig, true); + assertEquals(NodeStatus.ReadOnly, commonConfig.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, commonConfig.getStatusReason()); + + WALWriteBlockStatus.updateStatus(commonConfig, false); + assertEquals(NodeStatus.ReadOnly, commonConfig.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, commonConfig.getStatusReason()); + } + + private CommonConfig mockCommonConfig(NodeStatus initialStatus, String initialStatusReason) { + AtomicReference status = new AtomicReference<>(initialStatus); + AtomicReference statusReason = new AtomicReference<>(initialStatusReason); + CommonConfig commonConfig = Mockito.mock(CommonConfig.class); + + Mockito.when(commonConfig.getNodeStatus()).thenAnswer(invocation -> status.get()); + Mockito.when(commonConfig.getStatusReason()).thenAnswer(invocation -> statusReason.get()); + Mockito.doAnswer( + invocation -> { + status.set(invocation.getArgument(0)); + statusReason.set(null); + return null; + }) + .when(commonConfig) + .setNodeStatus(Mockito.any(NodeStatus.class)); + Mockito.doAnswer( + invocation -> { + statusReason.set(invocation.getArgument(0)); + return null; + }) + .when(commonConfig) + .setStatusReason(Mockito.any()); + return commonConfig; + } +}