Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -151,6 +152,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -234,6 +236,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -305,6 +308,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down
27 changes: 27 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ DEFINE_mInt32(trash_file_expire_time_sec, "0");
// modify them upon necessity
DEFINE_Int32(min_file_descriptor_number, "60000");
DEFINE_mBool(disable_segment_cache, "false");
// Enable checking segment rows consistency between rowset meta and segment footer
DEFINE_mBool(enable_segment_rows_consistency_check, "false");
DEFINE_mBool(enable_segment_rows_check_core, "false");
// ATTENTION: For test only. In test environment, there are no historical data,
// so all rowset meta should have segment rows info.
DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false");
DEFINE_String(row_cache_mem_limit, "20%");

// Cache for storage page size
Expand Down Expand Up @@ -1472,6 +1478,21 @@ DEFINE_mInt64(string_overflow_size, "4294967295"); // std::numic_limits<uint32_t
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
// The max thread num for BufferedReaderPrefetchThreadPool
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");

DEFINE_mBool(enable_segment_prefetch_verbose_log, "false");
// The thread num for SegmentPrefetchThreadPool
DEFINE_Int64(segment_prefetch_thread_pool_thread_num_min, "32");
DEFINE_Int64(segment_prefetch_thread_pool_thread_num_max, "2000");

DEFINE_mInt32(segment_file_cache_consume_rowids_batch_size, "8000");
// Enable segment file cache block prefetch for query
DEFINE_mBool(enable_query_segment_file_cache_prefetch, "false");
// Number of blocks to prefetch ahead in segment iterator for query
DEFINE_mInt32(query_segment_file_cache_prefetch_block_size, "2");
// Enable segment file cache block prefetch for compaction
DEFINE_mBool(enable_compaction_segment_file_cache_prefetch, "false");
// Number of blocks to prefetch ahead in segment iterator for compaction
DEFINE_mInt32(compaction_segment_file_cache_prefetch_block_size, "2");
// The min thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
Expand Down Expand Up @@ -1634,6 +1655,12 @@ DEFINE_Validator(aws_credentials_provider_version, [](const std::string& config)
return config == "v1" || config == "v2";
});

// Concurrency stats dump configuration
DEFINE_mBool(enable_concurrency_stats_dump, "false");
DEFINE_mInt32(concurrency_stats_dump_interval_ms, "100");
DEFINE_Validator(concurrency_stats_dump_interval_ms,
[](const int32_t config) -> bool { return config >= 10; });

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
25 changes: 25 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ DECLARE_mInt32(trash_file_expire_time_sec);
// modify them upon necessity
DECLARE_Int32(min_file_descriptor_number);
DECLARE_mBool(disable_segment_cache);
// Enable checking segment rows consistency between rowset meta and segment footer
DECLARE_mBool(enable_segment_rows_consistency_check);
DECLARE_mBool(enable_segment_rows_check_core);
// ATTENTION: For test only. In test environment, there are no historical data,
// so all rowset meta should have segment rows info.
DECLARE_mBool(fail_when_segment_rows_not_in_rowset_meta);
DECLARE_String(row_cache_mem_limit);

// Cache for storage page size
Expand Down Expand Up @@ -1550,6 +1556,21 @@ DECLARE_mInt64(string_overflow_size);
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
// The max thread num for BufferedReaderPrefetchThreadPool
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);

DECLARE_mBool(enable_segment_prefetch_verbose_log);
// The thread num for SegmentPrefetchThreadPool
DECLARE_Int64(segment_prefetch_thread_pool_thread_num_min);
DECLARE_Int64(segment_prefetch_thread_pool_thread_num_max);

DECLARE_mInt32(segment_file_cache_consume_rowids_batch_size);
// Enable segment file cache block prefetch for query
DECLARE_mBool(enable_query_segment_file_cache_prefetch);
// Number of blocks to prefetch ahead in segment iterator for query
DECLARE_mInt32(query_segment_file_cache_prefetch_block_size);
// Enable segment file cache block prefetch for compaction
DECLARE_mBool(enable_compaction_segment_file_cache_prefetch);
// Number of blocks to prefetch ahead in segment iterator for compaction
DECLARE_mInt32(compaction_segment_file_cache_prefetch_block_size);
// The min thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
// The max thread num for S3FileUploadThreadPool
Expand Down Expand Up @@ -1694,6 +1715,10 @@ DECLARE_mBool(read_cluster_cache_opt_verbose_log);

DECLARE_mString(aws_credentials_provider_version);

// Concurrency stats dump configuration
DECLARE_mBool(enable_concurrency_stats_dump);
DECLARE_mInt32(concurrency_stats_dump_interval_ms);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/mem_file_cache_storage.h"
#include "util/concurrency_stats.h"
#include "util/runtime_profile.h"
#include "util/stack_util.h"
#include "util/stopwatch.hpp"
Expand Down Expand Up @@ -809,7 +810,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o
DCHECK(stats != nullptr);
MonotonicStopWatch sw;
sw.start();
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
std::lock_guard cache_lock(_mutex);
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
stats->lock_wait_timer += sw.elapsed_time();
FileBlocks file_blocks;
int64_t duration = 0;
Expand Down
92 changes: 76 additions & 16 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "service/backend_options.h"
#include "util/bit_util.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/concurrency_stats.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -283,6 +284,8 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockS
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
size_t already_read = 0;
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);

const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
DCHECK(io_ctx);
Expand Down Expand Up @@ -385,8 +388,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
cache_context.tablet_id = tablet_id.value_or(0);
MonotonicStopWatch sw;
sw.start();

ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment();
FileBlocksHolder holder =
_cache->get_or_set(_cache_hash, align_left, align_size, cache_context);
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement();

stats.cache_get_or_set_timer += sw.elapsed_time();
std::vector<FileBlockSPtr> empty_blocks;
for (auto& block : holder.file_blocks) {
Expand Down Expand Up @@ -431,23 +438,28 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
RETURN_IF_ERROR(
_execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx));

for (auto& block : empty_blocks) {
if (block->state() == FileBlock::State::SKIP_CACHE) {
continue;
}
SCOPED_RAW_TIMER(&stats.local_write_timer);
char* cur_ptr = buffer.get() + block->range().left - empty_start;
size_t block_size = block->range().size();
Status st = block->append(Slice(cur_ptr, block_size));
if (st.ok()) {
st = block->finalize();
}
if (!st.ok()) {
LOG_EVERY_N(WARNING, 100) << "Write data to file cache failed. err=" << st.msg();
} else {
_insert_file_reader(block);
{
SCOPED_CONCURRENCY_COUNT(
ConcurrencyStatsManager::instance().cached_remote_reader_write_back);
for (auto& block : empty_blocks) {
if (block->state() == FileBlock::State::SKIP_CACHE) {
continue;
}
SCOPED_RAW_TIMER(&stats.local_write_timer);
char* cur_ptr = buffer.get() + block->range().left - empty_start;
size_t block_size = block->range().size();
Status st = block->append(Slice(cur_ptr, block_size));
if (st.ok()) {
st = block->finalize();
}
if (!st.ok()) {
LOG_EVERY_N(WARNING, 100)
<< "Write data to file cache failed. err=" << st.msg();
} else {
_insert_file_reader(block);
}
stats.bytes_write_into_file_cache += block_size;
}
stats.bytes_write_into_file_cache += block_size;
}
// copy from memory directly
size_t right_offset = offset + bytes_req - 1;
Expand Down Expand Up @@ -486,6 +498,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
static int64_t max_wait_time = 10;
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time);
if (block_state != FileBlock::State::DOWNLOADED) {
SCOPED_CONCURRENCY_COUNT(
ConcurrencyStatsManager::instance().cached_remote_reader_blocking);
do {
SCOPED_RAW_TIMER(&stats.remote_wait_timer);
SCOPED_RAW_TIMER(&stats.remote_read_timer);
Expand All @@ -512,6 +526,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
} else {
size_t file_offset = current_offset - left;
SCOPED_RAW_TIMER(&stats.local_read_timer);
SCOPED_CONCURRENCY_COUNT(
ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
st = block->read(Slice(result.data + (current_offset - offset), read_size),
file_offset);
indirect_read_bytes += read_size;
Expand Down Expand Up @@ -595,4 +611,48 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
g_skip_cache_sum << read_stats.skip_cache;
}

void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) {
if (offset >= this->size() || size == 0) {
return;
}

size = std::min(size, this->size() - offset);

ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool();
if (pool == nullptr) {
return;
}

IOContext dryrun_ctx;
if (io_ctx != nullptr) {
dryrun_ctx = *io_ctx;
}
dryrun_ctx.is_dryrun = true;
dryrun_ctx.query_id = nullptr;
dryrun_ctx.file_cache_stats = nullptr;
dryrun_ctx.file_reader_stats = nullptr;

LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
<< fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}",
offset, size, path().filename().native());
std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this();
auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() {
auto self = weak_this.lock();
if (self == nullptr) {
return;
}
size_t bytes_read;
Slice dummy_buffer((char*)nullptr, size);
(void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx);
LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
<< fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}",
offset, size, self->path().filename().native());
});

if (!st.ok()) {
VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size
<< " error=" << st.to_string();
}
}

} // namespace doris::io
15 changes: 14 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ namespace doris::io {
struct IOContext;
struct FileCacheStatistics;

class CachedRemoteFileReader final : public FileReader {
class CachedRemoteFileReader final : public FileReader,
public std::enable_shared_from_this<CachedRemoteFileReader> {
public:
CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts);

Expand All @@ -55,6 +56,18 @@ class CachedRemoteFileReader final : public FileReader {

static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, size_t length);

// Asynchronously prefetch a range of file cache blocks.
// This method triggers read file cache in dryrun mode to warm up the cache
// without actually reading the data into user buffers.
//
// Parameters:
// offset: Starting offset in the file
// size: Number of bytes to prefetch
// io_ctx: IO context (can be nullptr, will create a dryrun context internally)
//
// Note: This is a best-effort operation. Errors are logged but not returned.
void prefetch_range(size_t offset, size_t size, const IOContext* io_ctx = nullptr);

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
#include "util/concurrency_stats.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -131,6 +132,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
return Status::InternalError("init s3 client error");
}

SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().s3_file_reader_read);

int retry_count = 0;
const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
auto seg_id = 0;
bool segments_key_bounds_truncated {false};
std::vector<KeyBoundsPB> segment_key_bounds;
std::vector<uint32_t> num_segment_rows;
for (auto rowset : _input_rowsets) {
RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(),
_output_rs_writer->rowset_id(), seg_id));
Expand All @@ -346,6 +347,10 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
std::vector<KeyBoundsPB> key_bounds;
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end());
std::vector<uint32_t> input_segment_rows;
rowset->get_num_segment_rows(&input_segment_rows);
num_segment_rows.insert(num_segment_rows.end(), input_segment_rows.begin(),
input_segment_rows.end());
}
// build output rowset
RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
Expand All @@ -359,6 +364,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
rowset_meta->set_rowset_state(VISIBLE);
rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
rowset_meta->set_segments_key_bounds(segment_key_bounds);
rowset_meta->set_num_segment_rows(num_segment_rows);

_output_rowset = _output_rs_writer->manual_build(rowset_meta);

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ struct OlapReaderStatistics {
int64_t segment_iterator_init_timer_ns = 0;
int64_t segment_iterator_init_return_column_iterators_timer_ns = 0;
int64_t segment_iterator_init_index_iterators_timer_ns = 0;
int64_t segment_iterator_init_segment_prefetchers_timer_ns = 0;

int64_t segment_create_column_readers_timer_ns = 0;
int64_t segment_load_index_timer_ns = 0;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerS
Status ParallelScannerBuilder::_load() {
_total_rows = 0;
size_t idx = 0;
bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
? _state->query_options().enable_segment_cache
: true;
for (auto&& [tablet, version] : _tablets) {
const auto tablet_id = tablet->tablet_id();
_all_read_sources[tablet_id] = _read_sources[idx];
Expand All @@ -233,7 +236,8 @@ Status ParallelScannerBuilder::_load() {

auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
std::vector<uint32_t> segment_rows;
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, &_builder_stats));
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache,
&_builder_stats));
auto segment_count = rowset->num_segments();
for (int64_t i = 0; i != segment_count; i++) {
_all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
Expand Down
Loading