From f0f5693484c4d1a2b8b7dcd4173bd21a64979457 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 9 May 2026 15:31:20 +0800 Subject: [PATCH 1/2] Fix parquet row group column lookup crash (#63100) *** tablet id: 0 *** *** Aborted at 1778254939 (unix time) try "date -d @1778254939" if you are using GNU date *** *** Current BE git commitID: c6e6ecc45c5 *** *** SIGSEGV address not mapped to object (@0x13) received by PID 40355 (TID 77840 OR 0x7ef6079ff640) from PID 19; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/common/signal_handler.h:420 1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 2# JVM_handle_linux_signal in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 3# 0x00007F036C55C520 in /lib/x86_64-linux-gnu/libc.so.6 4# std::_Hash_bytes(void const*, unsigned long, unsigned long) in /mnt/hdd01/PERFORMANCE_ENV/be/lib/doris_be 5# std::__detail::_Map_base, std::allocator >, std::pair, std::allocator > const, unsigned int>, std::allocator, std::allocator > const, unsigned int> >, std::__detail::_Select1st, std::equal_to, std::allocator > >, std::hash, std::allocator > >, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits, true>::operator[](std::__cxx11::basic_string, std::allocator > const&) in /mnt/hdd01/PERFORMANCE_ENV/be/lib/doris_be 6# doris::RowGroupReader::_read_column_data(doris::Block*, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, unsigned long, unsigned long*, bool*, doris::FilterMap&) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/parquet/vparquet_group_reader.cpp:496 7# doris::RowGroupReader::next_batch(doris::Block*, unsigned long, unsigned long*, bool*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/parquet/vparquet_group_reader.cpp:404 8# doris::ParquetReader::get_next_block(doris::Block*, unsigned long*, bool*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/parquet/vparquet_reader.cpp:724 9# doris::IcebergTableReader::get_next_block_inner(doris::Block*, unsigned long*, bool*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/table/iceberg_reader.cpp:185 10# doris::TableFormatReader::get_next_block(doris::Block*, unsigned long*, bool*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/table/table_format_reader.h:80 11# doris::FileScanner::_get_block_wrapped(doris::RuntimeState*, doris::Block*, bool*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/file_scanner.cpp:474 12# doris::FileScanner::_get_block_impl(doris::RuntimeState*, doris::Block*, bool*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/file_scanner.cpp:407 13# doris::Scanner::get_block(doris::RuntimeState*, doris::Block*, bool*) in /mnt/hdd01/PERFORMANCE_ENV/be/lib/doris_be 14# doris::Scanner::get_block_after_projects(doris::RuntimeState*, doris::Block*, bool*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/scanner.cpp:91 15# doris::ScannerScheduler::_scanner_scan(std::shared_ptr, std::shared_ptr) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/scanner_scheduler.cpp:186 16# std::_Function_handler, std::shared_ptr)::$_0::operator()() const::{lambda()#1}>::_M_invoke(std::_Any_data const&) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292 17# doris::ScannerSplitRunner::process_for(std::chrono::duration >) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/scanner_scheduler.cpp:427 18# doris::PrioritizedSplitRunner::process() at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/task_executor/time_sharing/prioritized_split_runner.cpp:103 19# doris::TimeSharingTaskExecutor::_dispatch_thread() at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp:558 20# doris::Thread::supervise_thread(void*) at /home/zcp/repo_center/doris_branch-4.1/doris/be/src/util/thread.cpp:461 21# start_thread at ./nptl/pthread_create.c:442 22# 0x00007F036C6408D0 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83 - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- .../format/parquet/vparquet_group_reader.cpp | 173 +++++++++++++----- be/src/format/parquet/vparquet_group_reader.h | 13 +- 2 files changed, 141 insertions(+), 45 deletions(-) diff --git a/be/src/format/parquet/vparquet_group_reader.cpp b/be/src/format/parquet/vparquet_group_reader.cpp index 5e6fdacf51a0a6..eea296889dc230 100644 --- a/be/src/format/parquet/vparquet_group_reader.cpp +++ b/be/src/format/parquet/vparquet_group_reader.cpp @@ -535,31 +535,31 @@ Status RowGroupReader::_read_column_data(Block* block, size_t batch_read_rows = 0; bool has_eof = false; for (auto& read_col_name : table_columns) { - auto& column_with_type_and_name = - block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, read_col_name, &block_pos)); + auto reader_iter = _column_readers.find(read_col_name); + if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) { + return Status::InternalError("Column reader for '{}' not found in parquet row group", + read_col_name); + } + + auto& column_with_type_and_name = block->safe_get_by_position(block_pos); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; bool is_dict_filter = false; for (auto& _dict_filter_col : _dict_filter_cols) { if (_dict_filter_col.first == read_col_name) { MutableColumnPtr dict_column = ColumnInt32::create(); - if (!_col_name_to_block_idx->contains(read_col_name)) { - return Status::InternalError( - "Wrong read column '{}' in parquet file, block: {}", read_col_name, - block->dump_structure()); - } if (column_type->is_nullable()) { - block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = + block->get_by_position(block_pos).type = std::make_shared(std::make_shared()); block->replace_by_position( - (*_col_name_to_block_idx)[read_col_name], - ColumnNullable::create(std::move(dict_column), - ColumnUInt8::create(dict_column->size(), 0))); + block_pos, ColumnNullable::create( + std::move(dict_column), + ColumnUInt8::create(dict_column->size(), 0))); } else { - block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = - std::make_shared(); - block->replace_by_position((*_col_name_to_block_idx)[read_col_name], - std::move(dict_column)); + block->get_by_position(block_pos).type = std::make_shared(); + block->replace_by_position(block_pos, std::move(dict_column)); } is_dict_filter = true; break; @@ -570,10 +570,10 @@ Status RowGroupReader::_read_column_data(Block* block, bool col_eof = false; // Should reset _filter_map_index to 0 when reading next column. // select_vector.reset(); - _column_readers[read_col_name]->reset_filter_map_index(); + reader_iter->second->reset_filter_map_index(); while (!col_eof && col_read_rows < batch_size) { size_t loop_rows = 0; - RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data( + RETURN_IF_ERROR(reader_iter->second->read_column_data( column_ptr, column_type, _table_info_node_ptr->get_children_node(read_col_name), filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter)); VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name @@ -703,19 +703,19 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re SCOPED_RAW_TIMER(&_predicate_filter_time); for (const auto& col : _lazy_read_ctx.predicate_columns.first) { // clean block to read predicate columns - block->get_by_position((*_col_name_to_block_idx)[col]) - .column->assume_mutable() - ->clear(); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, col, &block_pos)); + block->get_by_position(block_pos).column->assume_mutable()->clear(); } for (const auto& col : _lazy_read_ctx.predicate_partition_columns) { - block->get_by_position((*_col_name_to_block_idx)[col.first]) - .column->assume_mutable() - ->clear(); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos)); + block->get_by_position(block_pos).column->assume_mutable()->clear(); } for (const auto& col : _lazy_read_ctx.predicate_missing_columns) { - block->get_by_position((*_col_name_to_block_idx)[col.first]) - .column->assume_mutable() - ->clear(); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos)); + block->get_by_position(block_pos).column->assume_mutable()->clear(); } RETURN_IF_ERROR(_table_format_reader->clear_synthesized_columns(block)); RETURN_IF_ERROR(_table_format_reader->clear_generated_columns(block)); @@ -849,6 +849,96 @@ Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map, return Status::OK(); } +Status RowGroupReader::_fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map>& + partition_columns) { + DataTypeSerDe::FormatOptions _text_formatOptions; + for (const auto& kv : partition_columns) { + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos)); + auto doris_column = block->get_by_position(block_pos).column; + // obtained from block*, it is a mutable object. + auto* col_ptr = const_cast(doris_column.get()); + const auto& [value, slot_desc] = kv.second; + auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); + Slice slice(value.data(), value.size()); + uint64_t num_deserialized = 0; + // Be careful when reading empty rows from parquet row groups. + if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows, + &num_deserialized, + _text_formatOptions) != Status::OK()) { + return Status::InternalError("Failed to fill partition column: {}={}", + slot_desc->col_name(), value); + } + if (num_deserialized != rows) { + return Status::InternalError( + "Failed to fill partition column: {}={} ." + "Number of rows expected to be written : {}, number of rows actually written : " + "{}", + slot_desc->col_name(), value, num_deserialized, rows); + } + } + return Status::OK(); +} + +Status RowGroupReader::_fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map& missing_columns) { + for (const auto& kv : missing_columns) { + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos)); + if (kv.second == nullptr) { + // no default column, fill with null + auto mutable_column = block->get_by_position(block_pos).column->assume_mutable(); + auto* nullable_column = assert_cast(mutable_column.get()); + nullable_column->insert_many_defaults(rows); + } else { + // fill with default value + const auto& ctx = kv.second; + ColumnPtr result_column_ptr; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(block, result_column_ptr)); + if (result_column_ptr->use_count() == 1) { + // call resize because the first column of _src_block_ptr may not be filled by reader, + // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()` + // has only one row. + auto mutable_column = result_column_ptr->assume_mutable(); + mutable_column->resize(rows); + // result_column_ptr maybe a ColumnConst, convert it to a normal column + result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); + auto origin_column_type = block->get_by_position(block_pos).type; + bool is_nullable = origin_column_type->is_nullable(); + block->replace_by_position( + block_pos, is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); + } + } + } + return Status::OK(); +} + +Status RowGroupReader::_get_block_column_pos(const Block& block, const std::string& column_name, + uint32_t* position) const { + if (_col_name_to_block_idx == nullptr) { + return Status::InternalError( + "Column name to block index map is not set when reading parquet column '{}', block: " + "{}", + column_name, block.dump_structure()); + } + auto iter = _col_name_to_block_idx->find(column_name); + if (iter == _col_name_to_block_idx->end()) { + return Status::InternalError("Column '{}' not found in block index map, block: {}", + column_name, block.dump_structure()); + } + if (iter->second >= block.columns()) { + return Status::InternalError( + "Column '{}' maps to invalid block position {}, block columns: {}, block: {}", + column_name, iter->second, block.columns(), block.dump_structure()); + } + *position = iter->second; + return Status::OK(); +} + Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof, bool* modify_row_ids) { *modify_row_ids = false; @@ -1190,38 +1280,35 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { for (auto& dict_filter_cols : _dict_filter_cols) { - if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "Wrong read column '{}' in parquet file, block: {}", - dict_filter_cols.first, block->dump_structure()); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, dict_filter_cols.first, &block_pos)); + auto reader_iter = _column_readers.find(dict_filter_cols.first); + if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) { + return Status::InternalError("Column reader for '{}' not found in parquet row group", + dict_filter_cols.first); } - ColumnWithTypeAndName& column_with_type_and_name = - block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]); + ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(block_pos); const ColumnPtr& column = column_with_type_and_name.column; if (const auto* nullable_column = check_and_get_column(*column)) { const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); const auto* dict_column = assert_cast(nested_column.get()); DCHECK(dict_column); - auto string_column = DORIS_TRY( - _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( - dict_column)); + auto string_column = + DORIS_TRY(reader_iter->second->convert_dict_column_to_string_column(dict_column)); column_with_type_and_name.type = std::make_shared(std::make_shared()); block->replace_by_position( - (*_col_name_to_block_idx)[dict_filter_cols.first], - ColumnNullable::create(std::move(string_column), - nullable_column->get_null_map_column_ptr())); + block_pos, ColumnNullable::create(std::move(string_column), + nullable_column->get_null_map_column_ptr())); } else { const auto* dict_column = assert_cast(column.get()); - auto string_column = DORIS_TRY( - _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( - dict_column)); + auto string_column = + DORIS_TRY(reader_iter->second->convert_dict_column_to_string_column(dict_column)); column_with_type_and_name.type = std::make_shared(); - block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first], - std::move(string_column)); + block->replace_by_position(block_pos, std::move(string_column)); } } return Status::OK(); diff --git a/be/src/format/parquet/vparquet_group_reader.h b/be/src/format/parquet/vparquet_group_reader.h index 66b04c4fd3293c..54ae22b366bb24 100644 --- a/be/src/format/parquet/vparquet_group_reader.h +++ b/be/src/format/parquet/vparquet_group_reader.h @@ -239,6 +239,15 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { DorisUniqueBufferPtr& filter_map_data, size_t pre_read_rows) const; + Status _fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map>& + partition_columns); + Status _fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map& missing_columns); + Status _get_block_column_pos(const Block& block, const std::string& column_name, + uint32_t* position) const; Status _build_pos_delete_filter(size_t read_rows); Status _filter_block(Block* block, int column_to_keep, const std::vector& columns_to_filter); @@ -259,7 +268,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { io::FileReaderSPtr _file_reader; std::unordered_map> _column_readers; // table_column_name - const std::vector& _read_table_columns; + std::vector _read_table_columns; const int32_t _row_group_id; const tparquet::RowGroup& _row_group_meta; @@ -271,7 +280,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { // merge the row ranges generated from page index and position delete. RowRanges _read_ranges; - const LazyReadContext& _lazy_read_ctx; + LazyReadContext _lazy_read_ctx; int64_t _lazy_read_filtered_rows = 0; int64_t _predicate_filter_time = 0; int64_t _dict_filter_rewrite_time = 0; From 09adfa5b3b94854daee572c408c56c46bf6b38a3 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 9 May 2026 15:43:37 +0800 Subject: [PATCH 2/2] Fix parquet row group reader lifetime issues (#63101) --- .../format/parquet/vparquet_group_reader.cpp | 26 ++++++++++--------- be/src/format/parquet/vparquet_group_reader.h | 5 ++-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/be/src/format/parquet/vparquet_group_reader.cpp b/be/src/format/parquet/vparquet_group_reader.cpp index eea296889dc230..6863926ddba56d 100644 --- a/be/src/format/parquet/vparquet_group_reader.cpp +++ b/be/src/format/parquet/vparquet_group_reader.cpp @@ -100,8 +100,9 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, _filter_column_ids(filter_column_ids) {} RowGroupReader::~RowGroupReader() { - _column_readers.clear(); - _obj_pool->clear(); + if (_obj_pool != nullptr) { + _obj_pool->clear(); + } } Status RowGroupReader::init( @@ -554,9 +555,9 @@ Status RowGroupReader::_read_column_data(Block* block, block->get_by_position(block_pos).type = std::make_shared(std::make_shared()); block->replace_by_position( - block_pos, ColumnNullable::create( - std::move(dict_column), - ColumnUInt8::create(dict_column->size(), 0))); + block_pos, + ColumnNullable::create(std::move(dict_column), + ColumnUInt8::create(dict_column->size(), 0))); } else { block->get_by_position(block_pos).type = std::make_shared(); block->replace_by_position(block_pos, std::move(dict_column)); @@ -909,8 +910,8 @@ Status RowGroupReader::_fill_missing_columns( result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); auto origin_column_type = block->get_by_position(block_pos).type; bool is_nullable = origin_column_type->is_nullable(); - block->replace_by_position( - block_pos, is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); + block->replace_by_position(block_pos, is_nullable ? make_nullable(result_column_ptr) + : result_column_ptr); } } } @@ -921,7 +922,8 @@ Status RowGroupReader::_get_block_column_pos(const Block& block, const std::stri uint32_t* position) const { if (_col_name_to_block_idx == nullptr) { return Status::InternalError( - "Column name to block index map is not set when reading parquet column '{}', block: " + "Column name to block index map is not set when reading parquet column '{}', " + "block: " "{}", column_name, block.dump_structure()); } @@ -1294,8 +1296,8 @@ Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { const auto* dict_column = assert_cast(nested_column.get()); DCHECK(dict_column); - auto string_column = - DORIS_TRY(reader_iter->second->convert_dict_column_to_string_column(dict_column)); + auto string_column = DORIS_TRY( + reader_iter->second->convert_dict_column_to_string_column(dict_column)); column_with_type_and_name.type = std::make_shared(std::make_shared()); @@ -1304,8 +1306,8 @@ Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { nullable_column->get_null_map_column_ptr())); } else { const auto* dict_column = assert_cast(column.get()); - auto string_column = - DORIS_TRY(reader_iter->second->convert_dict_column_to_string_column(dict_column)); + auto string_column = DORIS_TRY( + reader_iter->second->convert_dict_column_to_string_column(dict_column)); column_with_type_and_name.type = std::make_shared(); block->replace_by_position(block_pos, std::move(string_column)); diff --git a/be/src/format/parquet/vparquet_group_reader.h b/be/src/format/parquet/vparquet_group_reader.h index 54ae22b366bb24..e9eb5370e02240 100644 --- a/be/src/format/parquet/vparquet_group_reader.h +++ b/be/src/format/parquet/vparquet_group_reader.h @@ -266,8 +266,6 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { Status _get_current_batch_row_id(size_t read_rows); io::FileReaderSPtr _file_reader; - std::unordered_map> - _column_readers; // table_column_name std::vector _read_table_columns; const int32_t _row_group_id; @@ -279,6 +277,9 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { std::shared_ptr _row_lineage_columns; // merge the row ranges generated from page index and position delete. RowRanges _read_ranges; + // ParquetColumnReader keeps a reference to _read_ranges, so readers must be destroyed first. + std::unordered_map> + _column_readers; // table_column_name LazyReadContext _lazy_read_ctx; int64_t _lazy_read_filtered_rows = 0;