Skip to content

Commit 2dfa55c

Browse files
authored
branch-4.0: [Opt](meta)persist segment rows in rowse meta pb (#59476) (#60058)
pick #59476
1 parent f15f007 commit 2dfa55c

27 files changed

Lines changed: 688 additions & 22 deletions

be/src/cloud/pb_convert.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
8080
out->set_txn_expiration(in.txn_expiration());
8181
out->set_segments_overlap_pb(in.segments_overlap_pb());
8282
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
83+
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
8384
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
8485
out->set_index_id(in.index_id());
8586
if (in.has_schema_version()) {
@@ -157,6 +158,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
157158
out->set_txn_expiration(in.txn_expiration());
158159
out->set_segments_overlap_pb(in.segments_overlap_pb());
159160
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
161+
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
160162
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
161163
out->set_index_id(in.index_id());
162164
if (in.has_schema_version()) {
@@ -246,6 +248,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
246248
out->set_txn_expiration(in.txn_expiration());
247249
out->set_segments_overlap_pb(in.segments_overlap_pb());
248250
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
251+
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
249252
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
250253
out->set_index_id(in.index_id());
251254
if (in.has_schema_version()) {
@@ -323,6 +326,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
323326
out->set_txn_expiration(in.txn_expiration());
324327
out->set_segments_overlap_pb(in.segments_overlap_pb());
325328
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
329+
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
326330
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
327331
out->set_index_id(in.index_id());
328332
if (in.has_schema_version()) {

be/src/common/config.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,12 @@ DEFINE_mInt32(trash_file_expire_time_sec, "0");
384384
// modify them upon necessity
385385
DEFINE_Int32(min_file_descriptor_number, "60000");
386386
DEFINE_mBool(disable_segment_cache, "false");
387+
// Enable checking segment rows consistency between rowset meta and segment footer
388+
DEFINE_mBool(enable_segment_rows_consistency_check, "false");
389+
DEFINE_mBool(enable_segment_rows_check_core, "false");
390+
// ATTENTION: For test only. In test environment, there are no historical data,
391+
// so all rowset meta should have segment rows info.
392+
DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false");
387393
DEFINE_String(row_cache_mem_limit, "20%");
388394

389395
// Cache for storage page size

be/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,12 @@ DECLARE_mInt32(trash_file_expire_time_sec);
432432
// modify them upon necessity
433433
DECLARE_Int32(min_file_descriptor_number);
434434
DECLARE_mBool(disable_segment_cache);
435+
// Enable checking segment rows consistency between rowset meta and segment footer
436+
DECLARE_mBool(enable_segment_rows_consistency_check);
437+
DECLARE_mBool(enable_segment_rows_check_core);
438+
// ATTENTION: For test only. In test environment, there are no historical data,
439+
// so all rowset meta should have segment rows info.
440+
DECLARE_mBool(fail_when_segment_rows_not_in_rowset_meta);
435441
DECLARE_String(row_cache_mem_limit);
436442

437443
// Cache for storage page size

be/src/olap/compaction.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
338338
auto seg_id = 0;
339339
bool segments_key_bounds_truncated {false};
340340
std::vector<KeyBoundsPB> segment_key_bounds;
341+
std::vector<uint32_t> num_segment_rows;
341342
for (auto rowset : _input_rowsets) {
342343
RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(),
343344
_output_rs_writer->rowset_id(), seg_id));
@@ -346,6 +347,10 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
346347
std::vector<KeyBoundsPB> key_bounds;
347348
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
348349
segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end());
350+
std::vector<uint32_t> input_segment_rows;
351+
rowset->get_num_segment_rows(&input_segment_rows);
352+
num_segment_rows.insert(num_segment_rows.end(), input_segment_rows.begin(),
353+
input_segment_rows.end());
349354
}
350355
// build output rowset
351356
RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
@@ -359,6 +364,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
359364
rowset_meta->set_rowset_state(VISIBLE);
360365
rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
361366
rowset_meta->set_segments_key_bounds(segment_key_bounds);
367+
rowset_meta->set_num_segment_rows(num_segment_rows);
362368

363369
_output_rowset = _output_rs_writer->manual_build(rowset_meta);
364370

be/src/olap/parallel_scanner_builder.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ Status ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
221221
Status ParallelScannerBuilder::_load() {
222222
_total_rows = 0;
223223
size_t idx = 0;
224+
bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
225+
? _state->query_options().enable_segment_cache
226+
: true;
224227
for (auto&& [tablet, version] : _tablets) {
225228
const auto tablet_id = tablet->tablet_id();
226229
_all_read_sources[tablet_id] = _read_sources[idx];
@@ -232,7 +235,8 @@ Status ParallelScannerBuilder::_load() {
232235

233236
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
234237
std::vector<uint32_t> segment_rows;
235-
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, &_builder_stats));
238+
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache,
239+
&_builder_stats));
236240
auto segment_count = rowset->num_segments();
237241
for (int64_t i = 0; i != segment_count; i++) {
238242
_all_segments_rows[rowset_id].emplace_back(segment_rows[i]);

be/src/olap/rowset/beta_rowset.cpp

Lines changed: 86 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
#include <utility>
3030

3131
#include "beta_rowset.h"
32+
#include "cloud/config.h"
3233
#include "common/config.h"
3334
#include "common/logging.h"
3435
#include "common/status.h"
36+
#include "cpp/sync_point.h"
3537
#include "io/fs/file_reader.h"
3638
#include "io/fs/file_system.h"
3739
#include "io/fs/local_file_system.h"
@@ -71,24 +73,97 @@ Status BetaRowset::init() {
7173
return Status::OK(); // no op
7274
}
7375

76+
namespace {
77+
Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset,
78+
std::vector<uint32_t>* segment_rows, bool enable_segment_cache,
79+
OlapReaderStatistics* read_stats) {
80+
SegmentCacheHandle segment_cache_handle;
81+
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
82+
rowset, &segment_cache_handle, enable_segment_cache, false, read_stats));
83+
for (const auto& segment : segment_cache_handle.get_segments()) {
84+
segment_rows->emplace_back(segment->num_rows());
85+
}
86+
return Status::OK();
87+
}
88+
89+
Status check_segment_rows_consistency(const std::vector<uint32_t>& rows_from_meta,
90+
const std::vector<uint32_t>& rows_from_footer,
91+
int64_t tablet_id, const std::string& rowset_id) {
92+
DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size());
93+
for (size_t i = 0; i < rows_from_footer.size(); i++) {
94+
if (rows_from_footer[i] != rows_from_meta[i]) {
95+
auto msg = fmt::format(
96+
"segment rows mismatch between rowset meta and segment footer. "
97+
"segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}",
98+
i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id);
99+
if (config::enable_segment_rows_check_core) {
100+
CHECK(false) << msg;
101+
}
102+
return Status::InternalError(msg);
103+
}
104+
}
105+
return Status::OK();
106+
}
107+
} // namespace
108+
74109
Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows,
110+
bool enable_segment_cache,
75111
OlapReaderStatistics* read_stats) {
112+
#ifndef BE_TEST
76113
// `ROWSET_UNLOADING` is state for closed() called but owned by some readers.
77114
// So here `ROWSET_UNLOADING` is allowed.
78115
DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED);
79-
80-
RETURN_IF_ERROR(_load_segment_rows_once.call([this, read_stats] {
116+
#endif
117+
RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] {
81118
auto segment_count = num_segments();
82-
_segments_rows.resize(segment_count);
83-
for (int64_t i = 0; i != segment_count; ++i) {
84-
SegmentCacheHandle segment_cache_handle;
85-
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
86-
std::static_pointer_cast<BetaRowset>(shared_from_this()), i,
87-
&segment_cache_handle, false, false, read_stats));
88-
const auto& tmp_segments = segment_cache_handle.get_segments();
89-
_segments_rows[i] = tmp_segments[0]->num_rows();
119+
if (segment_count == 0) {
120+
return Status::OK();
90121
}
91-
return Status::OK();
122+
123+
if (!_rowset_meta->get_num_segment_rows().empty()) {
124+
if (_rowset_meta->get_num_segment_rows().size() == segment_count) {
125+
// use segment rows in rowset meta if eligible
126+
TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta");
127+
_segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(),
128+
_rowset_meta->get_num_segment_rows().cend());
129+
if (config::enable_segment_rows_consistency_check) {
130+
// verify segment rows from meta match segment footer
131+
std::vector<uint32_t> rows_from_footer;
132+
auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this());
133+
auto load_status = load_segment_rows_from_footer(
134+
self, &rows_from_footer, enable_segment_cache, read_stats);
135+
if (load_status.ok()) {
136+
return check_segment_rows_consistency(
137+
_segments_rows, rows_from_footer, _rowset_meta->tablet_id(),
138+
_rowset_meta->rowset_id().to_string());
139+
}
140+
}
141+
return Status::OK();
142+
} else {
143+
auto msg = fmt::format(
144+
"[verbose] corrupted segment rows info in rowset meta. "
145+
"segment count: {}, segment rows size: {}, tablet={}, rowset={}",
146+
segment_count, _rowset_meta->get_num_segment_rows().size(),
147+
_rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string());
148+
if (config::enable_segment_rows_check_core) {
149+
CHECK(false) << msg;
150+
}
151+
LOG_EVERY_SECOND(WARNING) << msg;
152+
}
153+
}
154+
if (config::fail_when_segment_rows_not_in_rowset_meta) {
155+
CHECK(false) << "[verbose] segment rows info not found in rowset meta. tablet="
156+
<< _rowset_meta->tablet_id()
157+
<< ", rowset=" << _rowset_meta->rowset_id().to_string()
158+
<< ", version=" << _rowset_meta->version()
159+
<< ", debug_string=" << _rowset_meta->debug_string()
160+
<< ", stack=" << Status::InternalError("error");
161+
}
162+
// otherwise, read it from segment footer
163+
TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer");
164+
auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this());
165+
return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache,
166+
read_stats);
92167
}));
93168
segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
94169
return Status::OK();

be/src/olap/rowset/beta_rowset.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class BetaRowset final : public Rowset {
9191
Status show_nested_index_file(rapidjson::Value* rowset_value,
9292
rapidjson::Document::AllocatorType& allocator);
9393

94-
Status get_segment_num_rows(std::vector<uint32_t>* segment_rows,
94+
Status get_segment_num_rows(std::vector<uint32_t>* segment_rows, bool enable_segment_cache,
9595
OlapReaderStatistics* read_stats);
9696

9797
protected:

be/src/olap/rowset/beta_rowset_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
250250
if (_read_context->record_rowids && _read_context->rowid_conversion) {
251251
// init segment rowid map for rowid conversion
252252
std::vector<uint32_t> segment_rows;
253-
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, _stats));
253+
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, should_use_cache, _stats));
254254
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
255255
segment_rows));
256256
}

be/src/olap/rowset/beta_rowset_writer.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
#include <mutex>
3030
#include <sstream>
3131
#include <utility>
32+
#include <vector>
3233

3334
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
35+
#include "common/cast_set.h"
3436
#include "common/compiler_util.h" // IWYU pragma: keep
3537
#include "common/config.h"
3638
#include "common/logging.h"
@@ -97,6 +99,9 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
9799
std::vector<KeyBoundsPB> segments_key_bounds;
98100
spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
99101
rowset_meta.set_segments_key_bounds(segments_key_bounds);
102+
std::vector<uint32_t> num_segment_rows;
103+
spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
104+
rowset_meta.set_num_segment_rows(num_segment_rows);
100105
}
101106

102107
} // namespace
@@ -776,6 +781,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
776781
_num_segment += cast_set<int32_t>(rowset->num_segments());
777782
// append key_bounds to current rowset
778783
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
784+
rowset->get_num_segment_rows(&_segment_num_rows);
779785
_segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();
780786

781787
// TODO update zonemap
@@ -955,21 +961,31 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
955961
int64_t total_data_size = 0;
956962
int64_t total_index_size = 0;
957963
std::vector<KeyBoundsPB> segments_encoded_key_bounds;
964+
std::vector<uint32_t> segment_rows;
958965
{
959966
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
960967
for (const auto& itr : _segid_statistics_map) {
961968
num_rows_written += itr.second.row_num;
962969
total_data_size += itr.second.data_size;
963970
total_index_size += itr.second.index_size;
964971
segments_encoded_key_bounds.push_back(itr.second.key_bounds);
972+
// segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load
973+
segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num));
965974
}
966975
}
976+
if (segment_rows.empty()) {
977+
// vertical compaction and linked schema change will not record segment statistics,
978+
// it will record segment rows in _segment_num_rows
979+
RETURN_IF_ERROR(get_segment_num_rows(&segment_rows));
980+
}
981+
967982
for (auto& key_bound : _segments_encoded_key_bounds) {
968983
segments_encoded_key_bounds.push_back(key_bound);
969984
}
970985
if (_segments_key_bounds_truncated.has_value()) {
971986
rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value());
972987
}
988+
rowset_meta->set_num_segment_rows(segment_rows);
973989
// segment key bounds are empty in old version(before version 1.2.x). So we should not modify
974990
// the overlap property when key bounds are empty.
975991
// for mow table with cluster keys, the overlap is used for cluster keys,
@@ -990,6 +1006,13 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
9901006
"is: {}, _num_seg is: {}",
9911007
segments_encoded_key_bounds_size, segment_num);
9921008
}
1009+
if (segment_rows.size() != segment_num) {
1010+
return Status::InternalError(
1011+
"segment_rows size should equal to _num_seg, segment_rows size is: {}, "
1012+
"_num_seg is {}, tablet={}, rowset={}, txn={}",
1013+
segment_rows.size(), segment_num, _context.tablet_id,
1014+
_context.rowset_id.to_string(), _context.txn_id);
1015+
}
9931016
}
9941017

9951018
rowset_meta->set_num_segments(segment_num);

be/src/olap/rowset/rowset.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
272272
return Status::OK();
273273
}
274274

275+
void get_num_segment_rows(std::vector<uint32_t>* num_segment_rows) {
276+
_rowset_meta->get_num_segment_rows(num_segment_rows);
277+
}
278+
275279
// min key of the first segment
276280
bool first_key(std::string* min_key) {
277281
KeyBoundsPB key_bounds;

0 commit comments

Comments
 (0)