Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
_rowset_meta->set_rowset_id(_context.rowset_id);
_rowset_meta->set_partition_id(_context.partition_id);
_rowset_meta->set_tablet_id(_context.tablet_id);
_rowset_meta->set_db_id(_context.db_id);
_rowset_meta->set_table_id(_context.table_id);
_rowset_meta->set_index_id(_context.index_id);
_rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
_rowset_meta->set_rowset_type(_context.rowset_type);
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "cloud/config.h"
#include "load/channel/tablets_channel.h"
#include "load/delta_writer/delta_writer.h"
#include "storage/tablet_info.h"

namespace doris {

Expand All @@ -42,6 +43,9 @@ std::unique_ptr<BaseDeltaWriter> CloudTabletsChannel::create_delta_writer(

Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
PTabletWriterAddBlockResult* response) {
if (_schema != nullptr && _schema->row_binlog_index_schema() != nullptr) {
return Status::NotSupported("cloud mode does not support binlog<row> now");
}
// FIXME(plat1ko): Too many duplicate code with `TabletsChannel`
SCOPED_TIMER(_add_batch_timer);
int64_t cur_seq = 0;
Expand Down
32 changes: 28 additions & 4 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,17 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
out->mutable_commit_tso()->CopyFrom(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
if (in.has_db_id()) {
out->set_db_id(in.db_id());
}
if (in.has_table_id()) {
out->set_table_id(in.table_id());
}
}

void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
Expand Down Expand Up @@ -208,11 +214,17 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
out->mutable_commit_tso()->CopyFrom(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
if (in.has_db_id()) {
out->set_db_id(in.db_id());
}
if (in.has_table_id()) {
out->set_table_id(in.table_id());
}
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) {
Expand Down Expand Up @@ -308,11 +320,17 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
out->mutable_commit_tso()->CopyFrom(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
if (in.has_db_id()) {
out->set_db_id(in.db_id());
}
if (in.has_table_id()) {
out->set_table_id(in.table_id());
}
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
Expand Down Expand Up @@ -397,11 +415,17 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
out->mutable_commit_tso()->CopyFrom(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
if (in.has_db_id()) {
out->set_db_id(in.db_id());
}
if (in.has_table_id()) {
out->set_table_id(in.table_id());
}
}

TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) {
Expand Down
15 changes: 13 additions & 2 deletions be/src/exec/operator/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "service/backend_options.h"
#include "storage/index/ann/ann_topn_runtime.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet.h"
#include "storage/tablet/tablet_manager.h"
#include "util/to_string.h"

Expand Down Expand Up @@ -515,6 +516,8 @@ Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
}

bool enable_parallel_scan = state()->enable_parallel_scan();
bool read_row_binlog =
p._olap_scan_node.__isset.read_row_binlog && p._olap_scan_node.read_row_binlog;

// The flag of preagg's meaning is whether return pre agg data(or partial agg data)
// PreAgg ON: The storage layer returns partially aggregated data without additional processing. (Fast data reading)
Expand All @@ -524,7 +527,9 @@ Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
// PreAgg OFF: The storage layer must complete pre-aggregation and return fully aggregated data. (Slow data reading)
if (enable_parallel_scan && !p._should_run_serial &&
p._push_down_agg_type == TPushAggOp::NONE &&
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)
// binlog<row> need to be read in order
&& !read_row_binlog) {
// Filter out the "full scan" placeholder range (has_lower_bound == false)
// so that only ranges with real key bounds are forwarded to the parallel scanner.
std::vector<OlapScanRange*> key_ranges;
Expand Down Expand Up @@ -624,6 +629,7 @@ Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
_read_sources[scan_range_idx],
p._limit,
p._olap_scan_node.is_preaggregation,
read_row_binlog,
});
RETURN_IF_ERROR(scanner->init(state(), _conjuncts));
scanners->push_back(std::move(scanner));
Expand Down Expand Up @@ -793,6 +799,8 @@ Status OlapScanLocalState::prepare(RuntimeState* state) {
{0, _tablets[i].version},
{.skip_missing_versions = _state->skip_missing_version(),
.enable_fetch_rowsets_from_peers = config::enable_fetch_rowsets_from_peer_replicas,
.capture_row_binlog = olap_scan_node().__isset.read_row_binlog &&
olap_scan_node().read_row_binlog,
.enable_prefer_cached_rowset =
config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false,
.query_freshness_tolerance_ms =
Expand Down Expand Up @@ -866,7 +874,10 @@ void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
const auto& cache_param = _parent->cast<OlapScanOperatorX>()._cache_param;
bool hit_cache = false;
if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache) {
// read binlog<row> scan should not participate in query cache.
if (olap_scan_node().__isset.read_row_binlog && olap_scan_node().read_row_binlog) {
hit_cache = false;
} else if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache) {
std::string cache_key;
int64_t version = 0;
auto status = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
int child_node_id = pipeline->operators().back()->node_id();
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
!config::is_cloud_mode()) {
!_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
_sink = std::make_shared<OlapTableSinkV2OperatorX>(
pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
} else {
Expand Down
15 changes: 10 additions & 5 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& param
_key_ranges(std::move(params.key_ranges)),
_tablet_reader_params({.tablet = std::move(params.tablet),
.tablet_schema {},
.reader_type = params.read_row_binlog ? ReaderType::READER_BINLOG
: ReaderType::READER_QUERY,
.aggregation = params.aggregation,
.version = {0, params.version},
.start_key {},
Expand Down Expand Up @@ -168,10 +170,13 @@ Status OlapScanner::_prepare_impl() {
_tablet_reader->set_preferred_block_size_bytes(_state->preferred_block_size_bytes());
{
TOlapScanNode& olap_scan_node = local_state->olap_scan_node();
TabletSchemaSPtr source_tablet_schema =
_tablet_reader_params.reader_type == ReaderType::READER_BINLOG
? tablet->row_binlog_tablet_schema()
: tablet->tablet_schema();

// Each scanner builds its own TabletSchema to avoid concurrent modification.
tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->copy_from(*tablet->tablet_schema());
tablet_schema->copy_from(*source_tablet_schema);
if (olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() &&
olap_scan_node.columns_desc[0].col_unique_id >= 0) {
tablet_schema->clear_columns();
Expand Down Expand Up @@ -204,6 +209,8 @@ Status OlapScanner::_prepare_impl() {
.skip_missing_versions = _state->skip_missing_version(),
.enable_fetch_rowsets_from_peers =
config::enable_fetch_rowsets_from_peer_replicas,
.capture_row_binlog =
_tablet_reader_params.reader_type == ReaderType::READER_BINLOG,
.enable_prefer_cached_rowset =
config::is_cloud_mode() ? _state->enable_prefer_cached_rowset()
: false,
Expand All @@ -215,7 +222,6 @@ Status OlapScanner::_prepare_impl() {
LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error();
return maybe_read_source.error();
}

read_source = std::move(maybe_read_source.value());

if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) {
Expand Down Expand Up @@ -246,7 +252,7 @@ Status OlapScanner::_prepare_impl() {
_tablet_reader_params.collection_statistics = std::make_shared<CollectionStatistics>();

io::IOContext io_ctx {
.reader_type = ReaderType::READER_QUERY,
.reader_type = _tablet_reader_params.reader_type,
.expiration_time = tablet->ttl_seconds(),
.query_id = &_state->query_id(),
.file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats,
Expand Down Expand Up @@ -306,7 +312,6 @@ Status OlapScanner::_init_tablet_reader_params(
RETURN_IF_ERROR(_init_variant_columns());
RETURN_IF_ERROR(_init_return_columns());

_tablet_reader_params.reader_type = ReaderType::READER_QUERY;
_tablet_reader_params.push_down_agg_type_opt = _local_state->get_push_down_agg_type();

// TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`,
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/scan/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class OlapScanner : public Scanner {
TabletReadSource read_source;
int64_t limit;
bool aggregation;
bool read_row_binlog = false;
};

OlapScanner(ScanLocalStateBase* parent, Params&& params);
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/scan/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ Status ParallelScannerBuilder::_load() {
std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges,
TabletReadSource&& read_source) {
OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet),
version, std::move(read_source), _limit, _is_preaggregation};
OlapScanner::Params params {
Comment thread
Userwhite marked this conversation as resolved.
_state, _scanner_profile.get(), key_ranges, std::move(tablet),
version, std::move(read_source), _limit, _is_preaggregation,
};
return OlapScanner::create_shared(_parent, std::move(params));
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
}
}
CHECK_EQ(request_length, 0);
#ifndef BE_TEST
if (!_is_fetching && _current_volume < _low_water_level_mark()) {
RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
}
#endif
return Status::OK();
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/sink/autoinc_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ class AutoIncIDBuffer {
}
};

#ifdef BE_TEST
void append_range_for_test(int64_t start, size_t length) {
std::lock_guard<std::mutex> lock {_latch};
_buffers.emplace_back(AutoIncRange {start, length});
_current_volume += length;
}
#endif

private:
[[nodiscard]] size_t _prefetch_size() const {
return _batch_size * config::auto_inc_prefetch_size_ratio;
Expand Down
9 changes: 6 additions & 3 deletions be/src/information_schema/schema_rowsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
{"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
{"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
{"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
{"COMMIT_TSO", TYPE_BIGINT, sizeof(int64_t), true},
{"COMMIT_TSO", TYPE_VARCHAR, sizeof(StringRef), true},

};

Expand Down Expand Up @@ -269,10 +269,13 @@ Status SchemaRowsetsScanner::_fill_block_impl(Block* block) {
}
// COMMIT_TSO
{
std::vector<int64_t> srcs(fill_rowsets_num);
std::vector<std::string> commit_tsos(fill_rowsets_num);
std::vector<StringRef> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->commit_tso();
commit_tsos[i - fill_idx_begin] = rowset->commit_tso().to_string();
srcs[i - fill_idx_begin] = StringRef(commit_tsos[i - fill_idx_begin].c_str(),
commit_tsos[i - fill_idx_begin].size());
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas));
Expand Down
4 changes: 3 additions & 1 deletion be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ enum class ReaderType : uint8_t {
READER_COLD_DATA_COMPACTION = 5,
READER_SEGMENT_COMPACTION = 6,
READER_FULL_COMPACTION = 7,
UNKNOWN = 8
READER_BINLOG_COMPACTION = 8,
READER_BINLOG = 9,
UNKNOWN = 10
};

namespace io {
Expand Down
38 changes: 37 additions & 1 deletion be/src/load/channel/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "load/channel/load_channel.h"
#include "load/delta_writer/delta_writer.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet_manager.h"
#include "storage/tablet_info.h"
#include "storage/txn/txn_manager.h"
#include "util/defer_op.h"
Expand Down Expand Up @@ -240,6 +241,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para
wrequest.is_high_priority = _is_high_priority;
wrequest.table_schema_param = _schema;
wrequest.txn_expiration = params.txn_expiration(); // Required by CLOUD.
wrequest.write_file_cache = params.write_file_cache();
wrequest.storage_vault_id = params.storage_vault_id();

auto delta_writer = create_delta_writer(wrequest);
Expand All @@ -260,7 +262,41 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para
}

std::unique_ptr<BaseDeltaWriter> TabletsChannel::create_delta_writer(const WriteRequest& request) {
return std::make_unique<DeltaWriter>(_engine, request, _profile, _load_id);
DCHECK(request.write_req_type == WriteRequestType::DATA);
DCHECK(request.table_schema_param != nullptr);

int64_t row_binlog_index_id = 0;
for (const auto* index_schema : request.table_schema_param->indexes()) {
if (index_schema->index_id == request.index_id) {
row_binlog_index_id = index_schema->row_binlog_id;
break;
}
}
if (row_binlog_index_id <= 0) {
return std::make_unique<DeltaWriter>(_engine, request, _profile, _load_id);
}

const auto* row_binlog_index_schema = request.table_schema_param->row_binlog_index_schema();
DCHECK(row_binlog_index_schema != nullptr);
DCHECK(row_binlog_index_schema->index_id == row_binlog_index_id);

// group_build_req is only for the group wrapper itself. It provides the group semantics and
// metadata used by BaseDeltaWriter/GroupRowsetBuilder to expose tablet_id, txn_id,
// partition_id, load_id and profile information, while concrete rowset builders use the
// sub requests below.
WriteRequest group_build_req = request;
group_build_req.write_req_type = WriteRequestType::GROUP;

WriteRequest sub_data_req = request;
sub_data_req.write_req_type = WriteRequestType::DATA;

WriteRequest sub_row_binlog_req = request;
sub_row_binlog_req.write_req_type = WriteRequestType::ROW_BINLOG;
sub_row_binlog_req.index_id = row_binlog_index_schema->index_id;
sub_row_binlog_req.schema_hash = row_binlog_index_schema->schema_hash;

return std::make_unique<DeltaWriter>(_engine, group_build_req, sub_data_req, sub_row_binlog_req,
_profile, _load_id);
}

Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
Expand Down
Loading
Loading