diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 770ee9ca13a4f5..ab07528a39306e 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -101,6 +101,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list CONF_Strings(recycle_blacklist, ""); // Comma seprated list // IO worker thread pool concurrency: object list, delete CONF_mInt32(instance_recycler_worker_pool_size, "32"); +// Max scanned rowsets recycled for one tablet in one recycle_rowsets round. +CONF_mInt32(max_recycle_rowsets_per_tablet_batch, "10000"); // Max number of delete tasks per batch when recycling objects. // Each task deletes up to 1000 files. Controls memory usage during large-scale deletion. CONF_Int32(recycler_max_tasks_per_batch, "1000"); diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index e7ea909906b74c..4a8949b4a4a53f 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -4969,9 +4969,8 @@ int InstanceRecycler::recycle_rowsets() { if (config::enable_recycler_stats_metrics) { scan_and_statistics_rowsets(); } - // recycle_func and loop_done for scan and recycle - int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv), - std::move(loop_done)); + int ret = scan_recycle_rowsets_by_tablet(recyc_rs_key0, recyc_rs_key1, + std::move(handle_rowset_kv), std::move(loop_done)); worker_pool->stop(); @@ -4991,6 +4990,134 @@ int InstanceRecycler::recycle_rowsets() { return ret; } +static int decode_recycle_rowset_tablet_id(std::string_view key, int64_t* tablet_id) { + DCHECK(tablet_id != nullptr); + std::string_view key_without_prefix = key; + key_without_prefix.remove_prefix(1); + std::vector, int, int>> out; + if (decode_key(&key_without_prefix, &out) != 0) { + return -1; + } + if (out.size() < 5) { + return -1; + } + try { + *tablet_id = std::get(std::get<0>(out[3])); + } catch (const std::bad_variant_access&) { + return -1; + } + return 0; +} + +int InstanceRecycler::next_recycle_rowset_tablet_key(const std::string& instance_id, + int64_t tablet_id, std::string* next_key) { + DCHECK(next_key != nullptr); + if (tablet_id == std::numeric_limits::max()) { + return -1; + } + *next_key = recycle_rowset_key({instance_id, tablet_id + 1, ""}); + return 0; +} + +int InstanceRecycler::scan_recycle_rowsets_by_tablet( + std::string begin, std::string_view end, + std::function recycle_func, + std::function loop_done) { + LOG(INFO) << "begin scan_recycle_rowsets_by_tablet key_range=[" << hex(begin) << "," << hex(end) + << ")"; + int ret = 0; + int64_t cnt = 0; + int64_t num_skip_tablets = 0; + int get_range_retried = 0; + std::string err; + DORIS_CLOUD_DEFER_COPY(begin, end) { + LOG(INFO) << "finish scan_recycle_rowsets_by_tablet key_range=[" << hex(begin) << "," + << hex(end) << ") num_scanned=" << cnt << " num_skip_tablets=" << num_skip_tablets + << " get_range_retried=" << get_range_retried << " ret=" << ret << " err=" << err; + }; + + const size_t max_rowsets_per_tablet = + std::max(1, config::max_recycle_rowsets_per_tablet_batch); + std::unique_ptr it; + int64_t tablet_id = -1; + size_t num_tablet_rowsets = 0; + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { + if (get_range_retried > 1000) { + err = "txn_get exceeds max retry(1000), may not scan all keys"; + ret = -3; + return ret; + } + int get_ret = txn_get(txn_kv_.get(), begin, end, it); + if (get_ret != 0) { + LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end) + << ") num_scanned=" << cnt << " txn_get_ret=" << get_ret + << " get_range_retried=" << get_range_retried; + ++get_range_retried; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + continue; + } + if (!it->has_next()) { + LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")"; + break; + } + + bool begin_updated = false; + while (it->has_next()) { + ++cnt; + auto [k, v] = it->next(); + int64_t key_tablet_id = -1; + if (decode_recycle_rowset_tablet_id(k, &key_tablet_id) != 0) { + LOG_WARNING("failed to decode recycle rowset key").tag("key", hex(k)); + err = "decode recycle rowset key error"; + ret = -1; + } + + if (tablet_id < 0) { + tablet_id = key_tablet_id; + } else if (key_tablet_id != tablet_id) { + tablet_id = key_tablet_id; + num_tablet_rowsets = 0; + } + + if (ret == 0) { + // FIXME(gavin): if we want to continue scanning, the recycle_func should not return non-zero + if (recycle_func(k, v) != 0) { + err = "recycle_func error"; + ret = -1; + } + } + + if (++num_tablet_rowsets >= max_rowsets_per_tablet) { + if (next_recycle_rowset_tablet_key(instance_id_, key_tablet_id, &begin) != 0) { + begin = k; + begin.push_back('\x00'); + } else { + ++num_skip_tablets; + } + begin_updated = true; + break; + } + + if (!it->has_next()) { + begin = k; + VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); + } + } + if (ret != 0) { + break; + } + if (!begin_updated) { + begin.push_back('\x00'); + } + // FIXME(gavin): if we want to continue scanning, the loop_done should not return non-zero + if (loop_done && loop_done() != 0) { + err = "loop_done error"; + ret = -1; + } + } + return ret; +} + int InstanceRecycler::recycle_restore_jobs() { const std::string task_name = "recycle_restore_jobs"; int64_t num_scanned = 0; diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index e04a69e0ec57b1..69efaaf2532d94 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include "common/bvars.h" #include "meta-service/delete_bitmap_lock_white_list.h" @@ -447,6 +449,14 @@ class InstanceRecycler { std::function recycle_func, std::function loop_done = nullptr); + static int next_recycle_rowset_tablet_key(const std::string& instance_id, int64_t tablet_id, + std::string* next_key); + + int scan_recycle_rowsets_by_tablet( + std::string begin, std::string_view end, + std::function recycle_func, + std::function loop_done = nullptr); + // return 0 for success otherwise error int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb); diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index ffe2401862bb07..344b744fc70f70 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -327,6 +327,23 @@ static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, return 0; } +static int create_recycle_rowset_kv(TxnKv* txn_kv, const doris::RowsetMetaCloudPB& rowset) { + RecycleRowsetPB rowset_pb; + rowset_pb.set_creation_time(current_time); + rowset_pb.set_type(RecycleRowsetPB::COMPACT); + rowset_pb.mutable_rowset_meta()->CopyFrom(rowset); + std::string key = recycle_rowset_key({instance_id, rowset.tablet_id(), rowset.rowset_id_v2()}); + std::string val; + rowset_pb.SerializeToString(&val); + + std::unique_ptr txn; + if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { + return -1; + } + txn->put(key, val); + return txn->commit() == TxnErrorCode::TXN_OK ? 0 : -1; +} + static int create_tmp_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, const doris::RowsetMetaCloudPB& rowset, bool write_schema_kv, bool is_inverted_idx_v2 = false, @@ -1376,6 +1393,206 @@ TEST(RecyclerTest, recycle_rowsets) { check_delete_bitmap_file_size(accessor, tablet_id, 0); } +TEST(RecyclerTest, scan_recycle_rowsets_by_tablet_skips_to_next_tablet_on_batch_limit) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("scan_rowsets_by_tablet_limit"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + constexpr int64_t index_id = 10001; + constexpr int64_t first_tablet_id = 10002; + constexpr int64_t second_tablet_id = 10020; + for (int i = 0; i < 5; ++i) { + auto rowset = + create_rowset("scan_rowsets_by_tablet_limit", first_tablet_id, index_id, 0, schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + for (int i = 0; i < 2; ++i) { + auto rowset = create_rowset("scan_rowsets_by_tablet_limit", second_tablet_id, index_id, 0, + schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + + auto old_worker_pool_size = config::instance_recycler_worker_pool_size; + auto old_max_rowsets_per_tablet = config::max_recycle_rowsets_per_tablet_batch; + config::instance_recycler_worker_pool_size = 10; + config::max_recycle_rowsets_per_tablet_batch = 3; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = old_worker_pool_size; + config::max_recycle_rowsets_per_tablet_batch = old_max_rowsets_per_tablet; + }; + + std::vector scanned_tablets; + auto recycle_func = [&](std::string_view k, std::string_view) -> int { + std::string_view k1 = k; + k1.remove_prefix(1); + std::vector, int, int>> out; + decode_key(&k1, &out); + scanned_tablets.push_back(std::get(std::get<0>(out[3]))); + return 0; + }; + + ASSERT_EQ(recycler.scan_recycle_rowsets_by_tablet( + recycle_rowset_key({instance_id, 0, ""}), + recycle_rowset_key({instance_id, INT64_MAX, ""}), recycle_func), + 0); + EXPECT_EQ(scanned_tablets, + std::vector({first_tablet_id, first_tablet_id, first_tablet_id, + second_tablet_id, second_tablet_id})); +} + +TEST(RecyclerTest, scan_recycle_rowsets_by_tablet_accumulates_limit_across_range_gets) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("scan_rowsets_by_tablet_cross_range_limit"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + constexpr int64_t index_id = 10001; + constexpr int64_t first_tablet_id = 10002; + constexpr int64_t second_tablet_id = 10020; + for (int i = 0; i < 5; ++i) { + auto rowset = create_rowset("scan_rowsets_by_tablet_cross_range_limit", first_tablet_id, + index_id, 0, schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + for (int i = 0; i < 2; ++i) { + auto rowset = create_rowset("scan_rowsets_by_tablet_cross_range_limit", second_tablet_id, + index_id, 0, schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + + auto old_max_rowsets_per_tablet = config::max_recycle_rowsets_per_tablet_batch; + config::max_recycle_rowsets_per_tablet_batch = 3; + DORIS_CLOUD_DEFER { + config::max_recycle_rowsets_per_tablet_batch = old_max_rowsets_per_tablet; + SyncPoint::get_instance()->clear_all_call_backs(); + }; + + auto sp = SyncPoint::get_instance(); + sp->set_call_back("memkv::Transaction::get", [](auto&& args) { + auto* limit = try_any_cast(args[0]); + *limit = 2; + }); + sp->enable_processing(); + + std::vector scanned_tablets; + auto recycle_func = [&](std::string_view k, std::string_view) -> int { + std::string_view k1 = k; + k1.remove_prefix(1); + std::vector, int, int>> out; + decode_key(&k1, &out); + scanned_tablets.push_back(std::get(std::get<0>(out[3]))); + return 0; + }; + + ASSERT_EQ(recycler.scan_recycle_rowsets_by_tablet( + recycle_rowset_key({instance_id, 0, ""}), + recycle_rowset_key({instance_id, INT64_MAX, ""}), recycle_func), + 0); + EXPECT_EQ(scanned_tablets, + std::vector({first_tablet_id, first_tablet_id, first_tablet_id, + second_tablet_id, second_tablet_id})); +} + +TEST(RecyclerTest, next_recycle_rowset_tablet_key_overwrites_existing_buffer) { + std::string next_key = recycle_rowset_key({instance_id, 10002, "rowset"}); + ASSERT_EQ(InstanceRecycler::next_recycle_rowset_tablet_key(instance_id, 10002, &next_key), 0); + + std::string_view k1 = next_key; + k1.remove_prefix(1); + std::vector, int, int>> out; + ASSERT_EQ(decode_key(&k1, &out), 0); + EXPECT_EQ(std::get(std::get<0>(out[3])), 10003); + EXPECT_TRUE(std::get(std::get<0>(out[4])).empty()); +} + +TEST(RecyclerTest, recycle_rowsets_tablet_batch_limit_recycles_remaining_in_next_round) { + config::retention_seconds = 0; + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_rowsets_batch_limit"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_rowsets_batch_limit"); + + auto old_worker_pool_size = config::instance_recycler_worker_pool_size; + auto old_max_rowsets_per_tablet = config::max_recycle_rowsets_per_tablet_batch; + auto old_enable_mark = config::enable_mark_delete_rowset_before_recycle; + config::instance_recycler_worker_pool_size = 1; + config::max_recycle_rowsets_per_tablet_batch = 3; + config::enable_mark_delete_rowset_before_recycle = false; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = old_worker_pool_size; + config::max_recycle_rowsets_per_tablet_batch = old_max_rowsets_per_tablet; + config::enable_mark_delete_rowset_before_recycle = old_enable_mark; + }; + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + constexpr int64_t index_id = 10001; + constexpr int64_t first_tablet_id = 10002; + constexpr int64_t second_tablet_id = 10020; + std::vector first_tablet_rowset_ids; + for (int i = 0; i < 5; ++i) { + auto rowset = + create_rowset("recycle_rowsets_batch_limit", first_tablet_id, index_id, 1, schema); + first_tablet_rowset_ids.push_back(rowset.rowset_id_v2()); + ASSERT_EQ(create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true), + 0); + } + for (int i = 0; i < 2; ++i) { + auto rowset = + create_rowset("recycle_rowsets_batch_limit", second_tablet_id, index_id, 1, schema); + ASSERT_EQ(create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true), + 0); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + for (int i = 0; i < 3; ++i) { + EXPECT_EQ(accessor->exists(segment_path(first_tablet_id, first_tablet_rowset_ids[i], 0)), + 1); + } + for (int i = 3; i < 5; ++i) { + EXPECT_EQ(accessor->exists(segment_path(first_tablet_id, first_tablet_rowset_ids[i], 0)), + 0); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + for (const auto& rowset_id : first_tablet_rowset_ids) { + EXPECT_EQ(accessor->exists(segment_path(first_tablet_id, rowset_id, 0)), 1); + } +} + TEST(RecyclerTest, recycle_rowsets_with_data_ref_count) { config::retention_seconds = 0; auto txn_kv = std::make_shared();