Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
// IO worker thread pool concurrency: object list, delete
CONF_mInt32(instance_recycler_worker_pool_size, "32");
// Max scanned rowsets recycled for one tablet in one recycle_rowsets round.
CONF_mInt32(max_recycle_rowsets_per_tablet_batch, "10000");
// Max number of delete tasks per batch when recycling objects.
// Each task deletes up to 1000 files. Controls memory usage during large-scale deletion.
CONF_Int32(recycler_max_tasks_per_batch, "1000");
Expand Down
133 changes: 130 additions & 3 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4969,9 +4969,8 @@ int InstanceRecycler::recycle_rowsets() {
if (config::enable_recycler_stats_metrics) {
scan_and_statistics_rowsets();
}
// recycle_func and loop_done for scan and recycle
int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv),
std::move(loop_done));
int ret = scan_recycle_rowsets_by_tablet(recyc_rs_key0, recyc_rs_key1,
std::move(handle_rowset_kv), std::move(loop_done));

worker_pool->stop();

Expand All @@ -4991,6 +4990,134 @@ int InstanceRecycler::recycle_rowsets() {
return ret;
}

static int decode_recycle_rowset_tablet_id(std::string_view key, int64_t* tablet_id) {
DCHECK(tablet_id != nullptr);
std::string_view key_without_prefix = key;
key_without_prefix.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
if (decode_key(&key_without_prefix, &out) != 0) {
return -1;
}
if (out.size() < 5) {
return -1;
}
try {
*tablet_id = std::get<int64_t>(std::get<0>(out[3]));
} catch (const std::bad_variant_access&) {
return -1;
}
return 0;
}

int InstanceRecycler::next_recycle_rowset_tablet_key(const std::string& instance_id,
int64_t tablet_id, std::string* next_key) {
DCHECK(next_key != nullptr);
if (tablet_id == std::numeric_limits<int64_t>::max()) {
return -1;
}
*next_key = recycle_rowset_key({instance_id, tablet_id + 1, ""});
return 0;
}

int InstanceRecycler::scan_recycle_rowsets_by_tablet(
std::string begin, std::string_view end,
std::function<int(std::string_view k, std::string_view v)> recycle_func,
std::function<int()> loop_done) {
LOG(INFO) << "begin scan_recycle_rowsets_by_tablet key_range=[" << hex(begin) << "," << hex(end)
<< ")";
int ret = 0;
int64_t cnt = 0;
int64_t num_skip_tablets = 0;
int get_range_retried = 0;
std::string err;
DORIS_CLOUD_DEFER_COPY(begin, end) {
LOG(INFO) << "finish scan_recycle_rowsets_by_tablet key_range=[" << hex(begin) << ","
<< hex(end) << ") num_scanned=" << cnt << " num_skip_tablets=" << num_skip_tablets
<< " get_range_retried=" << get_range_retried << " ret=" << ret << " err=" << err;
};

const size_t max_rowsets_per_tablet =
std::max<int32_t>(1, config::max_recycle_rowsets_per_tablet_batch);
std::unique_ptr<RangeGetIterator> it;
int64_t tablet_id = -1;
size_t num_tablet_rowsets = 0;
while (it == nullptr /* may be not init */ || (it->more() && !stopped())) {
if (get_range_retried > 1000) {
err = "txn_get exceeds max retry(1000), may not scan all keys";
ret = -3;
return ret;
}
int get_ret = txn_get(txn_kv_.get(), begin, end, it);
if (get_ret != 0) {
LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end)
<< ") num_scanned=" << cnt << " txn_get_ret=" << get_ret
<< " get_range_retried=" << get_range_retried;
++get_range_retried;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
continue;
}
if (!it->has_next()) {
LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")";
break;
}

bool begin_updated = false;
while (it->has_next()) {
++cnt;
auto [k, v] = it->next();
int64_t key_tablet_id = -1;
if (decode_recycle_rowset_tablet_id(k, &key_tablet_id) != 0) {
LOG_WARNING("failed to decode recycle rowset key").tag("key", hex(k));
err = "decode recycle rowset key error";
ret = -1;
}

if (tablet_id < 0) {
tablet_id = key_tablet_id;
} else if (key_tablet_id != tablet_id) {
tablet_id = key_tablet_id;
num_tablet_rowsets = 0;
}

if (ret == 0) {
// FIXME(gavin): if we want to continue scanning, the recycle_func should not return non-zero
if (recycle_func(k, v) != 0) {
err = "recycle_func error";
ret = -1;
}
}

if (++num_tablet_rowsets >= max_rowsets_per_tablet) {
if (next_recycle_rowset_tablet_key(instance_id_, key_tablet_id, &begin) != 0) {
begin = k;
begin.push_back('\x00');
} else {
++num_skip_tablets;
}
begin_updated = true;
break;
}

if (!it->has_next()) {
begin = k;
VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k);
}
}
if (ret != 0) {
break;
}
if (!begin_updated) {
begin.push_back('\x00');
}
// FIXME(gavin): if we want to continue scanning, the loop_done should not return non-zero
if (loop_done && loop_done() != 0) {
err = "loop_done error";
ret = -1;
}
}
return ret;
}

int InstanceRecycler::recycle_restore_jobs() {
const std::string task_name = "recycle_restore_jobs";
int64_t num_scanned = 0;
Expand Down
10 changes: 10 additions & 0 deletions cloud/src/recycler/recycler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
#include <cstdint>
#include <deque>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/bvars.h"
#include "meta-service/delete_bitmap_lock_white_list.h"
Expand Down Expand Up @@ -447,6 +449,14 @@ class InstanceRecycler {
std::function<int(std::string_view k, std::string_view v)> recycle_func,
std::function<int()> loop_done = nullptr);

static int next_recycle_rowset_tablet_key(const std::string& instance_id, int64_t tablet_id,
std::string* next_key);

int scan_recycle_rowsets_by_tablet(
std::string begin, std::string_view end,
std::function<int(std::string_view k, std::string_view v)> recycle_func,
std::function<int()> loop_done = nullptr);

// return 0 for success otherwise error
int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);

Expand Down
Loading
Loading