Skip to content

Commit 20c0341

Browse files
azatilejn
authored andcommitted
Merge pull request ClickHouse#91574 from amosbird/merge-tree-clean-up-thread
1 parent c63afbe commit 20c0341

22 files changed

Lines changed: 547 additions & 274 deletions

src/Common/FailPoint.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ static struct InitFiu
2929
/// We should define different types of failpoints here. There are four types of them:
3030
/// - ONCE: the failpoint will only be triggered once.
3131
/// - REGULAR: the failpoint will always be triggered until disableFailPoint is called.
32-
/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, util disableFailPoint is called.
33-
/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, util disableFailPoint is called.
32+
/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, until disableFailPoint is called.
33+
/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, until disableFailPoint is called.
3434

3535
#define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \
3636
ONCE(replicated_merge_tree_commit_zk_fail_after_op) \
@@ -131,9 +131,13 @@ static struct InitFiu
131131
REGULAR(rmt_delay_commit_part) \
132132
ONCE(smt_commit_exception_before_op) \
133133
ONCE(backup_add_empty_memory_table) \
134+
ONCE(local_object_storage_network_error_during_remove) \
135+
ONCE(parallel_replicas_check_read_mode_always) \
136+
REGULAR(lightweight_show_tables) \
137+
PAUSEABLE_ONCE(drop_database_before_exclusive_ddl_lock) \
138+
REGULAR(storage_merge_tree_background_schedule_merge_fail) \
134139
REGULAR(refresh_task_stop_racing_for_running_refresh)
135140

136-
137141
namespace FailPoints
138142
{
139143
#define M(NAME) extern const char(NAME)[] = #NAME "";
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#include <Storages/MergeTree/IMergeTreeCleanupThread.h>
2+
3+
#include <Interpreters/Context.h>
4+
#include <Storages/MergeTree/MergeTreeData.h>
5+
#include <Storages/MergeTree/MergeTreeSettings.h>
6+
#include <Common/ZooKeeper/KeeperException.h>
7+
8+
namespace DB
9+
{
10+
11+
namespace MergeTreeSetting
12+
{
13+
extern const MergeTreeSettingsUInt64 cleanup_delay_period;
14+
extern const MergeTreeSettingsUInt64 cleanup_delay_period_random_add;
15+
extern const MergeTreeSettingsUInt64 cleanup_thread_preferred_points_per_iteration;
16+
extern const MergeTreeSettingsUInt64 max_cleanup_delay_period;
17+
}
18+
19+
IMergeTreeCleanupThread::IMergeTreeCleanupThread(MergeTreeData & data_)
20+
: data(data_)
21+
, log_name(data.getStorageID().getFullTableName() + " (CleanupThread)")
22+
, log(getLogger(log_name))
23+
, sleep_ms((*data.getSettings())[MergeTreeSetting::cleanup_delay_period] * 1000)
24+
{
25+
task = data.getContext()->getSchedulePool().createTask(log_name, [this] { run(); });
26+
}
27+
28+
IMergeTreeCleanupThread::~IMergeTreeCleanupThread() = default;
29+
30+
void IMergeTreeCleanupThread::start()
31+
{
32+
task->activateAndSchedule();
33+
}
34+
35+
void IMergeTreeCleanupThread::wakeup()
36+
{
37+
task->schedule();
38+
}
39+
40+
void IMergeTreeCleanupThread::stop()
41+
{
42+
task->deactivate();
43+
}
44+
45+
void IMergeTreeCleanupThread::wakeupEarlierIfNeeded()
46+
{
47+
/// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data.
48+
/// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon,
49+
/// but the number of objects to clean up is growing. We need to wakeup the task earlier.
50+
auto storage_settings = data.getSettings();
51+
if (!(*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration])
52+
return;
53+
54+
/// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts.
55+
/// Do not wake up unless we have too many.
56+
size_t number_of_outdated_objects = data.getOutdatedPartsCount();
57+
if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2)
58+
return;
59+
60+
/// A race condition is possible here, but it's okay
61+
if (is_running.load(std::memory_order_relaxed))
62+
return;
63+
64+
/// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime())
65+
if (!wakeup_check_timer.compareAndRestart(static_cast<double>((*storage_settings)[MergeTreeSetting::cleanup_delay_period]) / 4.0))
66+
return;
67+
68+
UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
69+
UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000;
70+
if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms)
71+
return;
72+
73+
/// Don't run it more often than cleanup_delay_period
74+
UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000;
75+
if (seconds_passed < (*storage_settings)[MergeTreeSetting::cleanup_delay_period])
76+
return;
77+
78+
/// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many.
79+
number_of_outdated_objects = data.getNumberOfOutdatedPartsWithExpiredRemovalTime();
80+
if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2)
81+
return;
82+
83+
LOG_TRACE(
84+
log,
85+
"Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago",
86+
number_of_outdated_objects,
87+
seconds_passed);
88+
89+
wakeup();
90+
}
91+
92+
void IMergeTreeCleanupThread::run()
93+
{
94+
if (cleanup_blocker.isCancelled())
95+
{
96+
LOG_TRACE(LogFrequencyLimiter(log, 30), "Cleanup is cancelled, exiting");
97+
return;
98+
}
99+
100+
SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); });
101+
is_running.store(true, std::memory_order_relaxed);
102+
103+
auto storage_settings = data.getSettings();
104+
105+
Float32 cleanup_points = 0;
106+
try
107+
{
108+
cleanup_points = iterate();
109+
}
110+
catch (const Coordination::Exception & e)
111+
{
112+
tryLogCurrentException(log, __PRETTY_FUNCTION__);
113+
114+
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
115+
return;
116+
}
117+
catch (...)
118+
{
119+
tryLogCurrentException(log, __PRETTY_FUNCTION__);
120+
}
121+
122+
UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
123+
UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000;
124+
125+
/// Do not adjust sleep_ms on the first run after starting the server
126+
if (prev_timestamp && (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration])
127+
{
128+
/// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup.
129+
/// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s)
130+
/// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part.
131+
/// So we need some interpolation based on preferred batch size.
132+
auto expected_cleanup_points = (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration];
133+
134+
/// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration?
135+
Float32 ratio = cleanup_points / static_cast<Float32>(expected_cleanup_points);
136+
if (ratio == 0)
137+
sleep_ms = (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000;
138+
else
139+
sleep_ms = static_cast<UInt64>(static_cast<Float32>(sleep_ms) / ratio);
140+
141+
sleep_ms = std::clamp(
142+
sleep_ms,
143+
(*storage_settings)[MergeTreeSetting::cleanup_delay_period] * 1000,
144+
(*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000);
145+
146+
UInt64 interval_ms = now_ms - prev_timestamp;
147+
LOG_TRACE(
148+
log,
149+
"Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})",
150+
sleep_ms,
151+
cleanup_points,
152+
interval_ms,
153+
ratio,
154+
cleanup_points / static_cast<Float32>(interval_ms * 60'000));
155+
}
156+
prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed);
157+
158+
sleep_ms += std::uniform_int_distribution<UInt64>(0, (*storage_settings)[MergeTreeSetting::cleanup_delay_period_random_add] * 1000)(rng);
159+
task->scheduleAfter(sleep_ms);
160+
}
161+
162+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#pragma once
2+
3+
#include <Core/BackgroundSchedulePool.h>
4+
#include <Common/ActionBlocker.h>
5+
#include <Common/Stopwatch.h>
6+
#include <Common/randomSeed.h>
7+
8+
#include <pcg_random.hpp>
9+
10+
namespace DB
11+
{
12+
13+
class MergeTreeData;
14+
15+
/// Removes obsolete data from a table of type [Replicated]MergeTree.
16+
class IMergeTreeCleanupThread
17+
{
18+
public:
19+
explicit IMergeTreeCleanupThread(MergeTreeData & data_);
20+
21+
virtual ~IMergeTreeCleanupThread();
22+
23+
void start();
24+
25+
void wakeup();
26+
27+
void stop();
28+
29+
void wakeupEarlierIfNeeded();
30+
31+
ActionLock getCleanupLock() { return cleanup_blocker.cancel(); }
32+
33+
protected:
34+
MergeTreeData & data;
35+
36+
String log_name;
37+
LoggerPtr log;
38+
BackgroundSchedulePoolTaskHolder task;
39+
pcg64 rng{randomSeed()};
40+
41+
UInt64 sleep_ms;
42+
43+
std::atomic<UInt64> prev_cleanup_timestamp_ms = 0;
44+
std::atomic<bool> is_running = false;
45+
46+
AtomicStopwatch wakeup_check_timer;
47+
48+
ActionBlocker cleanup_blocker;
49+
50+
void run();
51+
52+
/// Returns a number this is directly proportional to the number of cleaned up blocks
53+
virtual Float32 iterate() = 0;
54+
};
55+
56+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include <Storages/MergeTree/MergeTreeCleanupThread.h>
2+
3+
#include <Storages/MergeTree/MergeTreeSettings.h>
4+
#include <Storages/StorageMergeTree.h>
5+
6+
namespace DB
7+
{
8+
9+
namespace MergeTreeSetting
10+
{
11+
extern const MergeTreeSettingsSeconds lock_acquire_timeout_for_background_operations;
12+
extern const MergeTreeSettingsUInt64 merge_tree_clear_old_parts_interval_seconds;
13+
extern const MergeTreeSettingsUInt64 merge_tree_clear_old_temporary_directories_interval_seconds;
14+
extern const MergeTreeSettingsSeconds temporary_directories_lifetime;
15+
}
16+
17+
MergeTreeCleanupThread::MergeTreeCleanupThread(StorageMergeTree & storage_)
18+
: IMergeTreeCleanupThread(storage_)
19+
, storage(storage_)
20+
{
21+
}
22+
23+
void MergeTreeCleanupThread::start()
24+
{
25+
time_after_previous_cleanup_parts.restart();
26+
time_after_previous_cleanup_temporary_directories.restart();
27+
IMergeTreeCleanupThread::start();
28+
}
29+
30+
Float32 MergeTreeCleanupThread::iterate()
31+
{
32+
size_t cleaned_other = 0;
33+
size_t cleaned_part_like = 0;
34+
size_t cleaned_parts = 0;
35+
36+
auto storage_settings = storage.getSettings();
37+
38+
auto shared_lock
39+
= storage.lockForShare(RWLockImpl::NO_QUERY, (*storage_settings)[MergeTreeSetting::lock_acquire_timeout_for_background_operations]);
40+
if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(
41+
static_cast<double>((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_temporary_directories_interval_seconds])))
42+
{
43+
/// Both use relative_data_path which changes during rename, so we do it under share lock
44+
cleaned_part_like += storage.clearOldTemporaryDirectories(
45+
(*storage.getSettings())[MergeTreeSetting::temporary_directories_lifetime].totalSeconds());
46+
}
47+
48+
if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred(
49+
static_cast<double>((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_parts_interval_seconds])))
50+
{
51+
cleaned_parts += storage.clearOldPartsFromFilesystem(/* force */ false, /* with_pause_point */ true);
52+
cleaned_other += storage.clearOldMutations();
53+
cleaned_part_like += storage.clearEmptyParts();
54+
cleaned_part_like += storage.clearUnusedPatchParts();
55+
cleaned_part_like += storage.unloadPrimaryKeysAndClearCachesOfOutdatedParts();
56+
}
57+
58+
constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time
59+
Float32 cleaned_inserted_parts = static_cast<Float32>(cleaned_parts) / parts_number_amplification;
60+
return cleaned_inserted_parts + static_cast<Float32>(cleaned_part_like) + static_cast<Float32>(cleaned_other);
61+
}
62+
63+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#pragma once
2+
3+
#include <Storages/MergeTree/IMergeTreeCleanupThread.h>
4+
#include <Common/Stopwatch.h>
5+
6+
namespace DB
7+
{
8+
9+
class StorageMergeTree;
10+
11+
class MergeTreeCleanupThread : public IMergeTreeCleanupThread
12+
{
13+
public:
14+
explicit MergeTreeCleanupThread(StorageMergeTree & storage_);
15+
16+
/// Shadows IMergeTreeCleanupThread::start() to restart cleanup timers
17+
/// before activating the background task. This ensures the thread waits
18+
/// a full interval after the manual cleanup done in startup().
19+
void start();
20+
21+
private:
22+
StorageMergeTree & storage;
23+
24+
AtomicStopwatch time_after_previous_cleanup_parts;
25+
AtomicStopwatch time_after_previous_cleanup_temporary_directories;
26+
27+
/// Returns a number that is directly proportional to the number of cleaned up objects
28+
Float32 iterate() override;
29+
};
30+
31+
}

0 commit comments

Comments
 (0)