diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index a3c9fd951ae646..711bc15840583c 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include "cloud/cloud_meta_mgr.h" @@ -540,6 +541,28 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread std::unique_lock lock {_new_tablet->get_sync_meta_lock()}; std::unique_lock wlock(_new_tablet->get_header_lock()); + // Mirror MS behavior: delete rowsets in [2, alter_version] before adding + // SC output rowsets to avoid stale compaction rowsets remaining visible. + { + int64_t alter_ver = sc_job->alter_version(); + std::vector to_delete; + for (auto& [v, rs] : _new_tablet->rowset_map()) { + if (v.first >= 2 && v.second <= alter_ver) { + to_delete.push_back(rs); + } + } + if (!to_delete.empty()) { + LOG_INFO( + "schema change: delete {} local rowsets in [2, {}] before adding SC " + "output, tablet_id={}, versions=[{}]", + to_delete.size(), alter_ver, _new_tablet->tablet_id(), + fmt::join(to_delete | std::views::transform([](const auto& rs) { + return rs->version().to_string(); + }), + ", ")); + _new_tablet->delete_rowsets_for_schema_change(to_delete, wlock); + } + } _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false); _new_tablet->set_cumulative_layer_point(_output_cumulative_point); _new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 19bc5b9279126a..db620468fcb16b 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -682,6 +682,34 @@ void CloudTablet::delete_rowsets(const std::vector& to_delete, _tablet_meta->modify_rs_metas({}, rs_metas, false); } +void CloudTablet::delete_rowsets_for_schema_change(const std::vector& to_delete, + std::unique_lock&) { + if (to_delete.empty()) { + return; + } + std::vector rs_metas; + rs_metas.reserve(to_delete.size()); + for (auto&& rs : to_delete) { + rs_metas.push_back(rs->rowset_meta()); + _rs_version_map.erase(rs->version()); + // Remove edge from version graph so that the greedy capture algorithm + // won't prefer the wider stale compaction rowset over individual SC + // output rowsets (e.g. [818-822] vs [818],[819],...,[822]). + _timestamped_version_tracker.delete_version(rs->version()); + } + + // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the + // stale tracking mechanism (_stale_rs_version_map / _stale_version_path_map) + // because SC output will create new rowsets with identical version ranges; + // a later compaction could put those into stale as well, causing two stale + // paths to reference the same version key -- when one path is cleaned first, + // the other hits a DCHECK(false) in delete_expired_stale_rowsets(). + _tablet_meta->modify_rs_metas({}, rs_metas, true); + + // Schedule for direct cache cleanup. MS has already recycled these rowsets. + add_unused_rowsets(to_delete); +} + uint64_t CloudTablet::delete_expired_stale_rowsets() { if (config::enable_mow_verbose_log) { LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id()); diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index b27bd7c5b55d89..5def9eabea266e 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -159,6 +159,13 @@ class CloudTablet final : public BaseTablet { void delete_rowsets(const std::vector& to_delete, std::unique_lock& meta_lock); + // Like delete_rowsets, but also removes edges from the version graph. + // Used by schema change to prevent the greedy capture algorithm from + // preferring stale compaction rowsets over individual SC output rowsets. + // MUST hold EXCLUSIVE `_meta_lock`. + void delete_rowsets_for_schema_change(const std::vector& to_delete, + std::unique_lock& meta_lock); + // When the tablet is dropped, we need to recycle cached data: // 1. The data in file cache // 2. The memory in tablet cache diff --git a/be/test/cloud/cloud_tablet_test.cpp b/be/test/cloud/cloud_tablet_test.cpp index 904dc2e3fdf1e0..356bbc2e040d42 100644 --- a/be/test/cloud/cloud_tablet_test.cpp +++ b/be/test/cloud/cloud_tablet_test.cpp @@ -997,4 +997,252 @@ TEST_F(CloudTabletWarmUpStateTest, TestWarmedUpOverridesNotWarmedUp) { EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id())); } +class CloudTabletDeleteRowsetsForSchemaChangeTest : public testing::Test { +public: + CloudTabletDeleteRowsetsForSchemaChangeTest() : _engine(CloudStorageEngine(EngineOptions {})) {} + + void SetUp() override { + _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + _tablet = + std::make_shared(_engine, std::make_shared(*_tablet_meta)); + } + void TearDown() override {} + + RowsetSharedPtr create_rowset(Version version) { + auto rs_meta = std::make_shared(); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_version(version); + rs_meta->set_rowset_id(_engine.next_rowset_id()); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; + } + +protected: + TabletMetaSharedPtr _tablet_meta; + std::shared_ptr _tablet; + CloudStorageEngine _engine; +}; + +// Simulate the DORIS-25014 scenario: +// - New tablet has compacted rowset [2-6] from compaction during SC +// - SC produces individual output rowsets [2],[3],[4],[5],[6] +// - Without the fix, add_rowsets fails to remove [2-6] because +// [2].contains([2-6]) = false +// - With delete_rowsets_for_schema_change, the stale compaction rowset is +// removed from both _rs_version_map and version graph before add_rowsets +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestSchemaChangeDeletesCompactionRowset) { + // Setup: add placeholder [0-1] and compacted rowset [2-6] + auto rs_placeholder = create_rowset(Version(0, 1)); + auto rs_compacted = create_rowset(Version(2, 6)); + ASSERT_NE(rs_placeholder, nullptr); + ASSERT_NE(rs_compacted, nullptr); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, false); + } + // Verify initial state + ASSERT_EQ(_tablet->rowset_map().size(), 2); + ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6))); + + // SC produces individual rowsets + std::vector sc_output; + for (int v = 2; v <= 6; v++) { + auto rs = create_rowset(Version(v, v)); + ASSERT_NE(rs, nullptr); + sc_output.push_back(rs); + } + + // Simulate delete_rowsets_for_schema_change + add_rowsets + int64_t alter_version = 6; + { + std::unique_lock wlock(_tablet->get_header_lock()); + // Collect rowsets in [2, alter_version] + std::vector to_delete; + for (auto& [v, rs] : _tablet->rowset_map()) { + if (v.first >= 2 && v.second <= alter_version) { + to_delete.push_back(rs); + } + } + ASSERT_EQ(to_delete.size(), 1); // only [2-6] + ASSERT_EQ(to_delete[0]->version(), Version(2, 6)); + + _tablet->delete_rowsets_for_schema_change(to_delete, wlock); + + // [2-6] should be removed from rs_version_map + ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6))); + // Should NOT go to stale (to avoid stale path conflicts), but to unused + ASSERT_FALSE(_tablet->has_stale_rowsets()); + ASSERT_TRUE(_tablet->need_remove_unused_rowsets()); + + _tablet->add_rowsets(std::move(sc_output), false, wlock, false); + } + + // Verify: individual SC rowsets are now in rs_version_map + ASSERT_EQ(_tablet->rowset_map().size(), 6); // [0-1] + 5 individual + for (int v = 2; v <= 6; v++) { + ASSERT_TRUE(_tablet->rowset_map().count(Version(v, v))) + << "Missing version " << v << "-" << v; + } + ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6))); + + // Verify: capture_consistent_versions works correctly (no stale edges) + auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 6), {}); + ASSERT_TRUE(versions_result.has_value()) << versions_result.error(); + auto& versions = versions_result.value(); + ASSERT_EQ(versions.size(), 6); // [0-1] + [2],[3],[4],[5],[6] + ASSERT_EQ(versions[0], Version(0, 1)); + for (int i = 0; i < 5; i++) { + ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i)); + } +} + +// Test that delete_rowsets_for_schema_change with empty input is a no-op +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) { + auto rs = create_rowset(Version(0, 1)); + ASSERT_NE(rs, nullptr); + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs}, false, wlock, false); + } + ASSERT_EQ(_tablet->rowset_map().size(), 1); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->delete_rowsets_for_schema_change({}, wlock); + } + ASSERT_EQ(_tablet->rowset_map().size(), 1); + ASSERT_FALSE(_tablet->has_stale_rowsets()); +} + +// Test with multiple compaction rowsets spanning different version ranges +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestMultipleCompactionRowsets) { + auto rs_placeholder = create_rowset(Version(0, 1)); + auto rs_comp1 = create_rowset(Version(2, 5)); + auto rs_comp2 = create_rowset(Version(6, 10)); + auto rs_post = create_rowset(Version(11, 11)); // after alter_version, should NOT be deleted + ASSERT_NE(rs_placeholder, nullptr); + ASSERT_NE(rs_comp1, nullptr); + ASSERT_NE(rs_comp2, nullptr); + ASSERT_NE(rs_post, nullptr); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs_placeholder, rs_comp1, rs_comp2, rs_post}, false, wlock, false); + } + ASSERT_EQ(_tablet->rowset_map().size(), 4); + + // SC output: individual rowsets for versions 2-10 + std::vector sc_output; + for (int v = 2; v <= 10; v++) { + auto rs = create_rowset(Version(v, v)); + ASSERT_NE(rs, nullptr); + sc_output.push_back(rs); + } + + int64_t alter_version = 10; + { + std::unique_lock wlock(_tablet->get_header_lock()); + std::vector to_delete; + for (auto& [v, rs] : _tablet->rowset_map()) { + if (v.first >= 2 && v.second <= alter_version) { + to_delete.push_back(rs); + } + } + ASSERT_EQ(to_delete.size(), 2); // [2-5] and [6-10] + + _tablet->delete_rowsets_for_schema_change(to_delete, wlock); + + // Post-alter rowset should survive + ASSERT_TRUE(_tablet->rowset_map().count(Version(11, 11))); + ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 5))); + ASSERT_FALSE(_tablet->rowset_map().count(Version(6, 10))); + + _tablet->add_rowsets(std::move(sc_output), false, wlock, false); + } + + // Verify: [0-1], [2],[3],...,[10], [11-11] + ASSERT_EQ(_tablet->rowset_map().size(), 11); + + // Verify capture + auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 11), {}); + ASSERT_TRUE(versions_result.has_value()) << versions_result.error(); + auto& versions = versions_result.value(); + ASSERT_EQ(versions.size(), 11); + ASSERT_EQ(versions[0], Version(0, 1)); + for (int i = 0; i < 9; i++) { + ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i)); + } + ASSERT_EQ(versions[10], Version(11, 11)); +} + +// Reproduce the CI crash scenario: SC delete puts rowsets to stale, then +// compaction creates a new stale path with overlapping version keys. When +// one stale path is cleaned, the other hits DCHECK(false) because the +// version is already removed from _stale_rs_version_map. +// With the fix (bypassing stale tracking), this should not happen. +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestNoStalePathConflictWithCompaction) { + // Setup: [0-1] placeholder, [2-6] compaction product during SC + auto rs_placeholder = create_rowset(Version(0, 1)); + auto rs_compacted = create_rowset(Version(2, 6)); + ASSERT_NE(rs_placeholder, nullptr); + ASSERT_NE(rs_compacted, nullptr); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, false); + } + + // SC output: individual rowsets [2],[3],[4],[5],[6] + std::vector sc_output; + for (int v = 2; v <= 6; v++) { + sc_output.push_back(create_rowset(Version(v, v))); + } + + // Step 1: delete_rowsets_for_schema_change + add SC output + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->delete_rowsets_for_schema_change({rs_compacted}, wlock); + _tablet->add_rowsets(std::move(sc_output), false, wlock, false); + } + // Stale should be empty — SC delete bypasses stale tracking + ASSERT_FALSE(_tablet->has_stale_rowsets()); + + // Step 2: compaction merges SC output [2],[3],[4],[5],[6] -> [2-6] + auto rs_new_compacted = create_rowset(Version(2, 6)); + std::vector compaction_input; + { + std::unique_lock wlock(_tablet->get_header_lock()); + for (auto& [v, rs] : _tablet->rowset_map()) { + if (v.first >= 2 && v.second <= 6) { + compaction_input.push_back(rs); + } + } + ASSERT_EQ(compaction_input.size(), 5); + // Normal compaction delete_rowsets — this WILL use stale tracking + _tablet->delete_rowsets(compaction_input, wlock); + _tablet->add_rowsets({rs_new_compacted}, false, wlock, false); + } + // Now stale has the compaction inputs + ASSERT_TRUE(_tablet->has_stale_rowsets()); + + // Step 3: delete_expired_stale_rowsets — this is where CI crashed + // With old code: stale path from SC and compaction both reference [2-6] key, + // causing DCHECK(false). With fix: only compaction stale path exists, no conflict. + config::tablet_rowset_stale_sweep_time_sec = 0; // expire immediately + ASSERT_NO_FATAL_FAILURE(_tablet->delete_expired_stale_rowsets()); + + // Verify final state: [0-1] and [2-6] active, no stale left + ASSERT_EQ(_tablet->rowset_map().size(), 2); + ASSERT_TRUE(_tablet->rowset_map().count(Version(0, 1))); + ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6))); + ASSERT_FALSE(_tablet->has_stale_rowsets()); +} + } // namespace doris