Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 130 additions & 41 deletions be/src/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
_filter_column_ids(filter_column_ids) {}

RowGroupReader::~RowGroupReader() {
_column_readers.clear();
_obj_pool->clear();
if (_obj_pool != nullptr) {
_obj_pool->clear();
}
}

Status RowGroupReader::init(
Expand Down Expand Up @@ -535,31 +536,31 @@ Status RowGroupReader::_read_column_data(Block* block,
size_t batch_read_rows = 0;
bool has_eof = false;
for (auto& read_col_name : table_columns) {
auto& column_with_type_and_name =
block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]);
uint32_t block_pos = 0;
RETURN_IF_ERROR(_get_block_column_pos(*block, read_col_name, &block_pos));
auto reader_iter = _column_readers.find(read_col_name);
if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) {
return Status::InternalError("Column reader for '{}' not found in parquet row group",
read_col_name);
}

auto& column_with_type_and_name = block->safe_get_by_position(block_pos);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
bool is_dict_filter = false;
for (auto& _dict_filter_col : _dict_filter_cols) {
if (_dict_filter_col.first == read_col_name) {
MutableColumnPtr dict_column = ColumnInt32::create();
if (!_col_name_to_block_idx->contains(read_col_name)) {
return Status::InternalError(
"Wrong read column '{}' in parquet file, block: {}", read_col_name,
block->dump_structure());
}
if (column_type->is_nullable()) {
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
block->get_by_position(block_pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
(*_col_name_to_block_idx)[read_col_name],
block_pos,
ColumnNullable::create(std::move(dict_column),
ColumnUInt8::create(dict_column->size(), 0)));
} else {
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
std::make_shared<DataTypeInt32>();
block->replace_by_position((*_col_name_to_block_idx)[read_col_name],
std::move(dict_column));
block->get_by_position(block_pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(block_pos, std::move(dict_column));
}
is_dict_filter = true;
break;
Expand All @@ -570,10 +571,10 @@ Status RowGroupReader::_read_column_data(Block* block,
bool col_eof = false;
// Should reset _filter_map_index to 0 when reading next column.
// select_vector.reset();
_column_readers[read_col_name]->reset_filter_map_index();
reader_iter->second->reset_filter_map_index();
while (!col_eof && col_read_rows < batch_size) {
size_t loop_rows = 0;
RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
RETURN_IF_ERROR(reader_iter->second->read_column_data(
column_ptr, column_type, _table_info_node_ptr->get_children_node(read_col_name),
filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter));
VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
Expand Down Expand Up @@ -703,19 +704,19 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
SCOPED_RAW_TIMER(&_predicate_filter_time);
for (const auto& col : _lazy_read_ctx.predicate_columns.first) {
// clean block to read predicate columns
block->get_by_position((*_col_name_to_block_idx)[col])
.column->assume_mutable()
->clear();
uint32_t block_pos = 0;
RETURN_IF_ERROR(_get_block_column_pos(*block, col, &block_pos));
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
for (const auto& col : _lazy_read_ctx.predicate_partition_columns) {
block->get_by_position((*_col_name_to_block_idx)[col.first])
.column->assume_mutable()
->clear();
uint32_t block_pos = 0;
RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos));
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
for (const auto& col : _lazy_read_ctx.predicate_missing_columns) {
block->get_by_position((*_col_name_to_block_idx)[col.first])
.column->assume_mutable()
->clear();
uint32_t block_pos = 0;
RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, &block_pos));
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
RETURN_IF_ERROR(_table_format_reader->clear_synthesized_columns(block));
RETURN_IF_ERROR(_table_format_reader->clear_generated_columns(block));
Expand Down Expand Up @@ -849,6 +850,97 @@ Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map,
return Status::OK();
}

Status RowGroupReader::_fill_partition_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
for (const auto& kv : partition_columns) {
uint32_t block_pos = 0;
RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
auto doris_column = block->get_by_position(block_pos).column;
// obtained from block*, it is a mutable object.
auto* col_ptr = const_cast<IColumn*>(doris_column.get());
const auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
uint64_t num_deserialized = 0;
// Be careful when reading empty rows from parquet row groups.
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
}
return Status::OK();
}

Status RowGroupReader::_fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
for (const auto& kv : missing_columns) {
uint32_t block_pos = 0;
RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
if (kv.second == nullptr) {
// no default column, fill with null
auto mutable_column = block->get_by_position(block_pos).column->assume_mutable();
auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
const auto& ctx = kv.second;
ColumnPtr result_column_ptr;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(block, result_column_ptr));
if (result_column_ptr->use_count() == 1) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
auto mutable_column = result_column_ptr->assume_mutable();
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type = block->get_by_position(block_pos).type;
bool is_nullable = origin_column_type->is_nullable();
block->replace_by_position(block_pos, is_nullable ? make_nullable(result_column_ptr)
: result_column_ptr);
}
}
}
return Status::OK();
}

Status RowGroupReader::_get_block_column_pos(const Block& block, const std::string& column_name,
uint32_t* position) const {
if (_col_name_to_block_idx == nullptr) {
return Status::InternalError(
"Column name to block index map is not set when reading parquet column '{}', "
"block: "
"{}",
column_name, block.dump_structure());
}
auto iter = _col_name_to_block_idx->find(column_name);
if (iter == _col_name_to_block_idx->end()) {
return Status::InternalError("Column '{}' not found in block index map, block: {}",
column_name, block.dump_structure());
}
if (iter->second >= block.columns()) {
return Status::InternalError(
"Column '{}' maps to invalid block position {}, block columns: {}, block: {}",
column_name, iter->second, block.columns(), block.dump_structure());
}
*position = iter->second;
return Status::OK();
}

Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof,
bool* modify_row_ids) {
*modify_row_ids = false;
Expand Down Expand Up @@ -1190,38 +1282,35 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,

Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
for (auto& dict_filter_cols : _dict_filter_cols) {
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Wrong read column '{}' in parquet file, block: {}",
dict_filter_cols.first, block->dump_structure());
uint32_t block_pos = 0;
RETURN_IF_ERROR(_get_block_column_pos(*block, dict_filter_cols.first, &block_pos));
auto reader_iter = _column_readers.find(dict_filter_cols.first);
if (reader_iter == _column_readers.end() || reader_iter->second == nullptr) {
return Status::InternalError("Column reader for '{}' not found in parquet row group",
dict_filter_cols.first);
}
ColumnWithTypeAndName& column_with_type_and_name =
block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]);
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(block_pos);
const ColumnPtr& column = column_with_type_and_name.column;
if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
const auto* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
DCHECK(dict_column);

auto string_column = DORIS_TRY(
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
dict_column));
reader_iter->second->convert_dict_column_to_string_column(dict_column));

column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
(*_col_name_to_block_idx)[dict_filter_cols.first],
ColumnNullable::create(std::move(string_column),
nullable_column->get_null_map_column_ptr()));
block_pos, ColumnNullable::create(std::move(string_column),
nullable_column->get_null_map_column_ptr()));
} else {
const auto* dict_column = assert_cast<const ColumnInt32*>(column.get());
auto string_column = DORIS_TRY(
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
dict_column));
reader_iter->second->convert_dict_column_to_string_column(dict_column));

column_with_type_and_name.type = std::make_shared<DataTypeString>();
block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first],
std::move(string_column));
block->replace_by_position(block_pos, std::move(string_column));
}
}
return Status::OK();
Expand Down
18 changes: 14 additions & 4 deletions be/src/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider {
DorisUniqueBufferPtr<uint8_t>& filter_map_data,
size_t pre_read_rows) const;

Status _fill_partition_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns);
Status _fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns);
Status _get_block_column_pos(const Block& block, const std::string& column_name,
uint32_t* position) const;
Status _build_pos_delete_filter(size_t read_rows);
Status _filter_block(Block* block, int column_to_keep,
const std::vector<uint32_t>& columns_to_filter);
Expand All @@ -257,9 +266,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider {
Status _get_current_batch_row_id(size_t read_rows);

io::FileReaderSPtr _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_column_readers; // table_column_name
const std::vector<std::string>& _read_table_columns;
std::vector<std::string> _read_table_columns;

const int32_t _row_group_id;
const tparquet::RowGroup& _row_group_meta;
Expand All @@ -270,8 +277,11 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider {
std::shared_ptr<RowLineageColumns> _row_lineage_columns;
// merge the row ranges generated from page index and position delete.
RowRanges _read_ranges;
// ParquetColumnReader keeps a reference to _read_ranges, so readers must be destroyed first.
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_column_readers; // table_column_name

const LazyReadContext& _lazy_read_ctx;
LazyReadContext _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
int64_t _predicate_filter_time = 0;
int64_t _dict_filter_rewrite_time = 0;
Expand Down
Loading