diff --git a/be/src/format/parquet/vparquet_group_reader.cpp b/be/src/format/parquet/vparquet_group_reader.cpp index 5e6fdacf51a0a6..6863926ddba56d 100644 --- a/be/src/format/parquet/vparquet_group_reader.cpp +++ b/be/src/format/parquet/vparquet_group_reader.cpp @@ -100,8 +100,9 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, _filter_column_ids(filter_column_ids) {} RowGroupReader::~RowGroupReader() { - _column_readers.clear(); - _obj_pool->clear(); + if (_obj_pool != nullptr) { + _obj_pool->clear(); + } } Status RowGroupReader::init( @@ -535,31 +536,31 @@ Status RowGroupReader::_read_column_data(Block* block, size_t batch_read_rows = 0; bool has_eof = false; for (auto& read_col_name : table_columns) { - auto& column_with_type_and_name = - block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, read_col_name, &block_pos)); + auto reader_iter = _column_readers.find(read_col_name); + if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) { + return Status::InternalError("Column reader for '{}' not found in parquet row group", + read_col_name); + } + + auto& column_with_type_and_name = block->safe_get_by_position(block_pos); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; bool is_dict_filter = false; for (auto& _dict_filter_col : _dict_filter_cols) { if (_dict_filter_col.first == read_col_name) { MutableColumnPtr dict_column = ColumnInt32::create(); - if (!_col_name_to_block_idx->contains(read_col_name)) { - return Status::InternalError( - "Wrong read column '{}' in parquet file, block: {}", read_col_name, - block->dump_structure()); - } if (column_type->is_nullable()) { - block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = + block->get_by_position(block_pos).type = std::make_shared(std::make_shared()); block->replace_by_position( - (*_col_name_to_block_idx)[read_col_name], + block_pos, ColumnNullable::create(std::move(dict_column), ColumnUInt8::create(dict_column->size(), 0))); } else { - block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type = - std::make_shared(); - block->replace_by_position((*_col_name_to_block_idx)[read_col_name], - std::move(dict_column)); + block->get_by_position(block_pos).type = std::make_shared(); + block->replace_by_position(block_pos, std::move(dict_column)); } is_dict_filter = true; break; @@ -570,10 +571,10 @@ Status RowGroupReader::_read_column_data(Block* block, bool col_eof = false; // Should reset _filter_map_index to 0 when reading next column. // select_vector.reset(); - _column_readers[read_col_name]->reset_filter_map_index(); + reader_iter->second->reset_filter_map_index(); while (!col_eof && col_read_rows < batch_size) { size_t loop_rows = 0; - RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data( + RETURN_IF_ERROR(reader_iter->second->read_column_data( column_ptr, column_type, _table_info_node_ptr->get_children_node(read_col_name), filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter)); VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name @@ -703,19 +704,19 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re SCOPED_RAW_TIMER(&_predicate_filter_time); for (const auto& col : _lazy_read_ctx.predicate_columns.first) { // clean block to read predicate columns - block->get_by_position((*_col_name_to_block_idx)[col]) - .column->assume_mutable() - ->clear(); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, col, &block_pos)); + block->get_by_position(block_pos).column->assume_mutable()->clear(); } for (const auto& col : _lazy_read_ctx.predicate_partition_columns) { - block->get_by_position((*_col_name_to_block_idx)[col.first]) - .column->assume_mutable() - ->clear(); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos)); + block->get_by_position(block_pos).column->assume_mutable()->clear(); } for (const auto& col : _lazy_read_ctx.predicate_missing_columns) { - block->get_by_position((*_col_name_to_block_idx)[col.first]) - .column->assume_mutable() - ->clear(); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos)); + block->get_by_position(block_pos).column->assume_mutable()->clear(); } RETURN_IF_ERROR(_table_format_reader->clear_synthesized_columns(block)); RETURN_IF_ERROR(_table_format_reader->clear_generated_columns(block)); @@ -849,6 +850,97 @@ Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map, return Status::OK(); } +Status RowGroupReader::_fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map>& + partition_columns) { + DataTypeSerDe::FormatOptions _text_formatOptions; + for (const auto& kv : partition_columns) { + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos)); + auto doris_column = block->get_by_position(block_pos).column; + // obtained from block*, it is a mutable object. + auto* col_ptr = const_cast(doris_column.get()); + const auto& [value, slot_desc] = kv.second; + auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); + Slice slice(value.data(), value.size()); + uint64_t num_deserialized = 0; + // Be careful when reading empty rows from parquet row groups. + if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows, + &num_deserialized, + _text_formatOptions) != Status::OK()) { + return Status::InternalError("Failed to fill partition column: {}={}", + slot_desc->col_name(), value); + } + if (num_deserialized != rows) { + return Status::InternalError( + "Failed to fill partition column: {}={} ." + "Number of rows expected to be written : {}, number of rows actually written : " + "{}", + slot_desc->col_name(), value, num_deserialized, rows); + } + } + return Status::OK(); +} + +Status RowGroupReader::_fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map& missing_columns) { + for (const auto& kv : missing_columns) { + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos)); + if (kv.second == nullptr) { + // no default column, fill with null + auto mutable_column = block->get_by_position(block_pos).column->assume_mutable(); + auto* nullable_column = assert_cast(mutable_column.get()); + nullable_column->insert_many_defaults(rows); + } else { + // fill with default value + const auto& ctx = kv.second; + ColumnPtr result_column_ptr; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(block, result_column_ptr)); + if (result_column_ptr->use_count() == 1) { + // call resize because the first column of _src_block_ptr may not be filled by reader, + // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()` + // has only one row. + auto mutable_column = result_column_ptr->assume_mutable(); + mutable_column->resize(rows); + // result_column_ptr maybe a ColumnConst, convert it to a normal column + result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); + auto origin_column_type = block->get_by_position(block_pos).type; + bool is_nullable = origin_column_type->is_nullable(); + block->replace_by_position(block_pos, is_nullable ? make_nullable(result_column_ptr) + : result_column_ptr); + } + } + } + return Status::OK(); +} + +Status RowGroupReader::_get_block_column_pos(const Block& block, const std::string& column_name, + uint32_t* position) const { + if (_col_name_to_block_idx == nullptr) { + return Status::InternalError( + "Column name to block index map is not set when reading parquet column '{}', " + "block: " + "{}", + column_name, block.dump_structure()); + } + auto iter = _col_name_to_block_idx->find(column_name); + if (iter == _col_name_to_block_idx->end()) { + return Status::InternalError("Column '{}' not found in block index map, block: {}", + column_name, block.dump_structure()); + } + if (iter->second >= block.columns()) { + return Status::InternalError( + "Column '{}' maps to invalid block position {}, block columns: {}, block: {}", + column_name, iter->second, block.columns(), block.dump_structure()); + } + *position = iter->second; + return Status::OK(); +} + Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof, bool* modify_row_ids) { *modify_row_ids = false; @@ -1190,13 +1282,14 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { for (auto& dict_filter_cols : _dict_filter_cols) { - if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "Wrong read column '{}' in parquet file, block: {}", - dict_filter_cols.first, block->dump_structure()); + uint32_t block_pos = 0; + RETURN_IF_ERROR(_get_block_column_pos(*block, dict_filter_cols.first, &block_pos)); + auto reader_iter = _column_readers.find(dict_filter_cols.first); + if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) { + return Status::InternalError("Column reader for '{}' not found in parquet row group", + dict_filter_cols.first); } - ColumnWithTypeAndName& column_with_type_and_name = - block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]); + ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(block_pos); const ColumnPtr& column = column_with_type_and_name.column; if (const auto* nullable_column = check_and_get_column(*column)) { const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); @@ -1204,24 +1297,20 @@ Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { DCHECK(dict_column); auto string_column = DORIS_TRY( - _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( - dict_column)); + reader_iter->second->convert_dict_column_to_string_column(dict_column)); column_with_type_and_name.type = std::make_shared(std::make_shared()); block->replace_by_position( - (*_col_name_to_block_idx)[dict_filter_cols.first], - ColumnNullable::create(std::move(string_column), - nullable_column->get_null_map_column_ptr())); + block_pos, ColumnNullable::create(std::move(string_column), + nullable_column->get_null_map_column_ptr())); } else { const auto* dict_column = assert_cast(column.get()); auto string_column = DORIS_TRY( - _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( - dict_column)); + reader_iter->second->convert_dict_column_to_string_column(dict_column)); column_with_type_and_name.type = std::make_shared(); - block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first], - std::move(string_column)); + block->replace_by_position(block_pos, std::move(string_column)); } } return Status::OK(); diff --git a/be/src/format/parquet/vparquet_group_reader.h b/be/src/format/parquet/vparquet_group_reader.h index 66b04c4fd3293c..e9eb5370e02240 100644 --- a/be/src/format/parquet/vparquet_group_reader.h +++ b/be/src/format/parquet/vparquet_group_reader.h @@ -239,6 +239,15 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { DorisUniqueBufferPtr& filter_map_data, size_t pre_read_rows) const; + Status _fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map>& + partition_columns); + Status _fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map& missing_columns); + Status _get_block_column_pos(const Block& block, const std::string& column_name, + uint32_t* position) const; Status _build_pos_delete_filter(size_t read_rows); Status _filter_block(Block* block, int column_to_keep, const std::vector& columns_to_filter); @@ -257,9 +266,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { Status _get_current_batch_row_id(size_t read_rows); io::FileReaderSPtr _file_reader; - std::unordered_map> - _column_readers; // table_column_name - const std::vector& _read_table_columns; + std::vector _read_table_columns; const int32_t _row_group_id; const tparquet::RowGroup& _row_group_meta; @@ -270,8 +277,11 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { std::shared_ptr _row_lineage_columns; // merge the row ranges generated from page index and position delete. RowRanges _read_ranges; + // ParquetColumnReader keeps a reference to _read_ranges, so readers must be destroyed first. + std::unordered_map> + _column_readers; // table_column_name - const LazyReadContext& _lazy_read_ctx; + LazyReadContext _lazy_read_ctx; int64_t _lazy_read_filtered_rows = 0; int64_t _predicate_filter_time = 0; int64_t _dict_filter_rewrite_time = 0;