diff --git a/ci/docker/ubuntu-22.04-cpp.dockerfile b/ci/docker/ubuntu-22.04-cpp.dockerfile index 88a27efe335..183bed4c6c8 100644 --- a/ci/docker/ubuntu-22.04-cpp.dockerfile +++ b/ci/docker/ubuntu-22.04-cpp.dockerfile @@ -120,6 +120,7 @@ RUN apt-get update -y -q && \ rsync \ tzdata \ uuid-runtime \ + unzip \ wget \ xz-utils && \ apt-get clean && \ diff --git a/ci/docker/ubuntu-24.04-cpp.dockerfile b/ci/docker/ubuntu-24.04-cpp.dockerfile index 0347d452d7b..7703046c75c 100644 --- a/ci/docker/ubuntu-24.04-cpp.dockerfile +++ b/ci/docker/ubuntu-24.04-cpp.dockerfile @@ -122,6 +122,7 @@ RUN apt-get update -y -q && \ tzdata \ tzdata-legacy \ uuid-runtime \ + unzip \ wget && \ apt-get clean && \ rm -rf /var/lib/apt/lists* diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh index 88239a0bd1e..4f292b860b4 100755 --- a/ci/scripts/cpp_test.sh +++ b/ci/scripts/cpp_test.sh @@ -180,10 +180,36 @@ fi if [ "${ARROW_FUZZING}" == "ON" ]; then # Fuzzing regression tests + + # This will display any errors generated during fuzzing. These errors are + # usually not bugs (most fuzz files are invalid and hence generate errors + # when trying to read them), which is why they are hidden by default when + # fuzzing. + export ARROW_FUZZING_VERBOSITY=1 # Some fuzz regression files may trigger huge memory allocations, # let the allocator return null instead of aborting. export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1" - export ARROW_FUZZING_VERBOSITY=1 + + # 1. Generate seed corpuses + "${source_dir}/build-support/fuzzing/generate_corpuses.sh" "${binary_output_dir}" + + # 2. Run fuzz targets on seed corpus entries + function run_fuzz_target_on_seed_corpus() { + fuzz_target_basename=$1 + corpus_dir=${binary_output_dir}/${fuzz_target_basename}_seed_corpus + mkdir -p "${corpus_dir}" + rm -f "${corpus_dir}"/* + unzip "${binary_output_dir}"/"${fuzz_target_basename}"_seed_corpus.zip -d "${corpus_dir}" + "${binary_output_dir}"/"${fuzz_target_basename}" -rss_limit_mb=4000 "${corpus_dir}"/* + } + run_fuzz_target_on_seed_corpus arrow-csv-fuzz + run_fuzz_target_on_seed_corpus arrow-ipc-file-fuzz + run_fuzz_target_on_seed_corpus arrow-ipc-stream-fuzz + run_fuzz_target_on_seed_corpus arrow-ipc-tensor-stream-fuzz + run_fuzz_target_on_seed_corpus parquet-arrow-fuzz + run_fuzz_target_on_seed_corpus parquet-encoding-fuzz + + # 3. Run fuzz targets on regression files from arrow-testing # Run golden IPC integration files: these should ideally load without errors, # though some very old ones carry invalid data (such as decimal values # larger than their advertised precision). diff --git a/cpp/build-support/fuzzing/generate_corpuses.sh b/cpp/build-support/fuzzing/generate_corpuses.sh index 6ebc86ffad8..273c2a20d04 100755 --- a/cpp/build-support/fuzzing/generate_corpuses.sh +++ b/cpp/build-support/fuzzing/generate_corpuses.sh @@ -56,7 +56,7 @@ rm -rf ${CORPUS_DIR} ${OUT}/arrow-ipc-generate-tensor-fuzz-corpus -stream ${CORPUS_DIR} ${ARROW_CPP}/build-support/fuzzing/pack_corpus.py ${CORPUS_DIR} ${OUT}/arrow-ipc-tensor-stream-fuzz_seed_corpus.zip -# Parquet +# Parquet file-level fuzzer rm -rf ${CORPUS_DIR} ${OUT}/parquet-arrow-generate-fuzz-corpus ${CORPUS_DIR} @@ -65,6 +65,12 @@ cp ${ARROW_CPP}/submodules/parquet-testing/data/*.parquet ${CORPUS_DIR} cp ${ARROW_CPP}/submodules/parquet-testing/bad_data/*.parquet ${CORPUS_DIR} ${ARROW_CPP}/build-support/fuzzing/pack_corpus.py ${CORPUS_DIR} ${OUT}/parquet-arrow-fuzz_seed_corpus.zip +# Parquet encoding fuzzer + +rm -rf ${CORPUS_DIR} +${OUT}/parquet-generate-encoding-fuzz-corpus ${CORPUS_DIR} +${ARROW_CPP}/build-support/fuzzing/pack_corpus.py ${CORPUS_DIR} ${OUT}/parquet-encoding-fuzz_seed_corpus.zip + # CSV rm -rf ${PANDAS_DIR} diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h index 55bc1eeb1d2..832b686b316 100644 --- a/cpp/src/arrow/util/macros.h +++ b/cpp/src/arrow/util/macros.h @@ -183,28 +183,18 @@ #endif // ---------------------------------------------------------------------- +// Macros to enforce struct member packing -// macros to disable padding -// these macros are portable across different compilers and platforms -//[https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flatbuffers.h#L1355] -#if !defined(MANUALLY_ALIGNED_STRUCT) -# if defined(_MSC_VER) -# define MANUALLY_ALIGNED_STRUCT(alignment) \ - __pragma(pack(1)); \ - struct __declspec(align(alignment)) -# define STRUCT_END(name, size) \ - __pragma(pack()); \ - static_assert(sizeof(name) == size, "compiler breaks packing rules") -# elif defined(__GNUC__) || defined(__clang__) -# define MANUALLY_ALIGNED_STRUCT(alignment) \ - _Pragma("pack(1)") struct __attribute__((aligned(alignment))) -# define STRUCT_END(name, size) \ - _Pragma("pack()") static_assert(sizeof(name) == size, \ - "compiler breaks packing rules") -# else -# error Unknown compiler, please define structure alignment macros -# endif -#endif // !defined(MANUALLY_ALIGNED_STRUCT) +#if defined(__GNUC__) +# define ARROW_PACKED_START(KEYWORD, ...) KEYWORD [[gnu::packed]] __VA_ARGS__ +# define ARROW_PACKED_END +#elif defined(_MSC_VER) +# define ARROW_PACKED_START(KEYWORD, ...) _Pragma("pack(push, 1)") KEYWORD __VA_ARGS__ +# define ARROW_PACKED_END _Pragma("pack(pop)") +#else +# define ARROW_PACKED_START(KEYWORD, ...) KEYWORD __VA_ARGS__ +# define ARROW_PACKED_END +#endif // ---------------------------------------------------------------------- // Convenience macro disabling a particular UBSan check in a function diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 0c0bd989662..b707b21e601 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -151,6 +151,7 @@ endif() # Library config set(PARQUET_SRCS + arrow/fuzz_encoding_internal.cc arrow/fuzz_internal.cc arrow/path_internal.cc arrow/reader.cc diff --git a/cpp/src/parquet/arrow/CMakeLists.txt b/cpp/src/parquet/arrow/CMakeLists.txt index 3913d5fe3e8..b51f7a6d105 100644 --- a/cpp/src/parquet/arrow/CMakeLists.txt +++ b/cpp/src/parquet/arrow/CMakeLists.txt @@ -19,13 +19,19 @@ arrow_install_all_headers("parquet/arrow") if(ARROW_BUILD_FUZZING_UTILITIES) add_executable(parquet-arrow-generate-fuzz-corpus generate_fuzz_corpus.cc) + add_executable(parquet-generate-encoding-fuzz-corpus generate_encoding_fuzz_corpus.cc) if(ARROW_BUILD_STATIC) target_link_libraries(parquet-arrow-generate-fuzz-corpus parquet_static arrow_testing_static) + target_link_libraries(parquet-generate-encoding-fuzz-corpus parquet_static + arrow_testing_static) else() target_link_libraries(parquet-arrow-generate-fuzz-corpus parquet_shared arrow_testing_shared) + target_link_libraries(parquet-generate-encoding-fuzz-corpus parquet_shared + arrow_testing_shared) endif() endif() add_parquet_fuzz_target(fuzz PREFIX "parquet-arrow") +add_parquet_fuzz_target(encoding_fuzz PREFIX "parquet") diff --git a/cpp/src/parquet/arrow/encoding_fuzz.cc b/cpp/src/parquet/arrow/encoding_fuzz.cc new file mode 100644 index 00000000000..64095795e28 --- /dev/null +++ b/cpp/src/parquet/arrow/encoding_fuzz.cc @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/status.h" +#include "arrow/util/fuzz_internal.h" +#include "parquet/arrow/fuzz_encoding_internal.h" + +extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { + auto status = + parquet::fuzzing::internal::FuzzEncoding(data, static_cast(size)); + arrow::internal::LogFuzzStatus(status, data, static_cast(size)); + return 0; +} diff --git a/cpp/src/parquet/arrow/fuzz_encoding_internal.cc b/cpp/src/parquet/arrow/fuzz_encoding_internal.cc new file mode 100644 index 00000000000..f3537054ca0 --- /dev/null +++ b/cpp/src/parquet/arrow/fuzz_encoding_internal.cc @@ -0,0 +1,444 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/fuzz_encoding_internal.h" + +#include +#include +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/buffer_builder.h" +#include "arrow/compare.h" +#include "arrow/io/memory.h" +#include "arrow/type.h" +#include "arrow/util/fuzz_internal.h" +#include "arrow/util/logging.h" +#include "arrow/util/string.h" +#include "parquet/encoding.h" +#include "parquet/schema.h" +#include "parquet/visit_type_inline.h" + +namespace parquet::fuzzing::internal { + +using ::arrow::Array; +using ::arrow::ArrayData; +using ::arrow::BufferBuilder; +using ::arrow::DataType; +using ::arrow::MemoryPool; +using ::arrow::Result; +using ::arrow::Status; +using ::arrow::TypedBufferBuilder; +using ::parquet::arrow::FileReader; + +ColumnDescriptor MakeColumnDescriptor(Type::type type, int type_length) { + // Repetition and max def/rep levels can take dummy values as they are not directly + // used by encoders and decoders. + auto node = schema::PrimitiveNode::Make("", Repetition::OPTIONAL, type, + ConvertedType::NONE, type_length); + return ColumnDescriptor(node, /*max_definition_level=*/1, /*max_repetition_level=*/0); +} + +namespace { + +constexpr auto kPackedEncodingHeaderSize = 64; + +ARROW_PACKED_START(struct, PackedEncodingHeader) { + ARROW_PACKED_START(struct, Header) { + uint8_t source_encoding_id; + uint8_t roundtrip_encoding_id; + uint8_t type_id; + int32_t type_length; + int32_t num_values; + }; + ARROW_PACKED_END + + static_assert(sizeof(Header) == 3 * 1 + 2 * 4); + using Reserved = std::array; + + Header header; + Reserved reserved; +}; +ARROW_PACKED_END + +static_assert(sizeof(PackedEncodingHeader) == kPackedEncodingHeaderSize); + +} // namespace + +FuzzEncodingHeader::FuzzEncodingHeader(Encoding::type source_encoding, + Encoding::type roundtrip_encoding, Type::type type, + int type_length, int num_values) + : source_encoding(source_encoding), + roundtrip_encoding(roundtrip_encoding), + type(type), + type_length(type_length), + num_values(num_values) {} + +FuzzEncodingHeader::FuzzEncodingHeader(Encoding::type source_encoding, + Encoding::type roundtrip_encoding, + const ColumnDescriptor* descr, int num_values) + : FuzzEncodingHeader(source_encoding, roundtrip_encoding, descr->physical_type(), + descr->type_length(), num_values) {} + +std::string FuzzEncodingHeader::Serialize() const { + PackedEncodingHeader packed{}; + packed.header.source_encoding_id = static_cast(source_encoding); + packed.header.roundtrip_encoding_id = static_cast(roundtrip_encoding); + packed.header.type_id = static_cast(type); + packed.header.type_length = type_length; + packed.header.num_values = num_values; + return std::string(reinterpret_cast(&packed), kPackedEncodingHeaderSize); +} + +::arrow::Result FuzzEncodingHeader::Parse( + std::span payload) { + auto invalid_payload = []() { + return Status::Invalid("Invalid fuzz encoding payload"); + }; + + if (payload.size() < kPackedEncodingHeaderSize) { + return invalid_payload(); + } + PackedEncodingHeader packed; + std::memcpy(&packed, payload.data(), kPackedEncodingHeaderSize); + // We are strict in what we accept because we don't want the fuzzer to go + // explore pointless variations. + if (packed.reserved != + PackedEncodingHeader::Reserved{}) { // reserved bytes should be zero + return invalid_payload(); + } + const auto& ph = packed.header; + if (ph.source_encoding_id >= static_cast(Encoding::UNDEFINED) || + ph.roundtrip_encoding_id >= static_cast(Encoding::UNDEFINED) || + ph.type_id >= static_cast(Type::UNDEFINED)) { + return invalid_payload(); + } + FuzzEncodingHeader header(static_cast(ph.source_encoding_id), + static_cast(ph.roundtrip_encoding_id), + static_cast(ph.type_id), ph.type_length, + ph.num_values); + if ((header.type == Type::FIXED_LEN_BYTE_ARRAY) ? (header.type_length <= 0) + : (header.type_length != -1)) { + return invalid_payload(); + } + if (header.num_values < 0) { + return invalid_payload(); + } + return ParseResult{header, payload.subspan(kPackedEncodingHeaderSize)}; +} + +namespace { + +// Just to use std::vector while avoiding std::vector +using BooleanSlot = std::array; + +template +struct TypedFuzzEncoding { + static constexpr Type::type kType = DType::type_num; + + using c_type = + std::conditional_t; + using EncoderType = typename EncodingTraits::Encoder; + using DecoderType = typename EncodingTraits::Decoder; + using Accumulator = EncodingTraits::Accumulator; + + TypedFuzzEncoding(Encoding::type source_encoding, Encoding::type roundtrip_encoding, + const ColumnDescriptor* descr, int num_values, + std::span encoded_data) + : source_encoding_(source_encoding), + roundtrip_encoding_(roundtrip_encoding), + descr_(descr), + num_values_(num_values), + encoded_data_(encoded_data) {} + + Result> Decode(Encoding::type encoding, + std::span encoded_data, + int chunk_size) { + std::vector values; + int total_values = 0; + + BEGIN_PARQUET_CATCH_EXCEPTIONS + auto decoder = MakeDecoder(encoding); + // NOTE: In real API usage, the `num_values` given to SetData() is read from + // the data page header and can include a number of nulls, so it's merely an + // upper bound for the number of physical values. + // However, Decode() calls are not supposed to ask more than the actual number + // of physical values. + decoder->SetData(num_values_, encoded_data.data(), + static_cast(encoded_data.size())); + while (total_values < num_values_) { + const int read_size = std::min(num_values_ - total_values, chunk_size); + values.resize(total_values + read_size); + int values_read; + if constexpr (kType == Type::BOOLEAN) { + values_read = decoder->Decode( + reinterpret_cast(values.data() + total_values), read_size); + } else { + values_read = decoder->Decode(values.data() + total_values, read_size); + } + total_values += values_read; + if (values_read < read_size) { + values.resize(total_values); + break; + } + } + END_PARQUET_CATCH_EXCEPTIONS + + return values; + } + + Result> DecodeArrow(Encoding::type encoding, + std::span encoded_data) { + ARROW_ASSIGN_OR_RAISE(auto arrow_type, ArrowType()); + auto decoder = MakeDecoder(encoding); + decoder->SetData(num_values_, encoded_data.data(), + static_cast(encoded_data.size())); + + if constexpr (kType == Type::BYTE_ARRAY) { + Accumulator acc; + acc.builder = std::make_unique<::arrow::BinaryBuilder>(pool()); + BEGIN_PARQUET_CATCH_EXCEPTIONS + decoder->DecodeArrowNonNull(num_values_, &acc); + END_PARQUET_CATCH_EXCEPTIONS + ARROW_CHECK_EQ(acc.chunks.size(), 0); + return acc.builder->Finish(); + } else { + Accumulator builder(arrow_type, pool()); + BEGIN_PARQUET_CATCH_EXCEPTIONS + decoder->DecodeArrowNonNull(num_values_, &builder); + END_PARQUET_CATCH_EXCEPTIONS + return builder.Finish(); + } + } + + Status Fuzz() { + const std::vector chunk_sizes = {(num_values_ + 1), 1 << 8, 1 << 12}; + + // Decode using source encoding + if constexpr (arrow_supported()) { + // Read as Arrow directly and use that as reference + ARROW_ASSIGN_OR_RAISE(reference_array_, + DecodeArrow(source_encoding_, encoded_data_)); + ARROW_CHECK_OK(reference_array_->ValidateFull()); + } else { + // Persist raw reference values, they shouldn't carry embedded pointers + // to short-lived decoder buffers. + static_assert(kType != Type::FIXED_LEN_BYTE_ARRAY && kType != Type::BYTE_ARRAY); + ARROW_ASSIGN_OR_RAISE(reference_values_, + Decode(source_encoding_, encoded_data_, num_values_)); + } + + // Re-encode and re-decode using roundtrip encoding + { + auto encoder = MakeEncoder(roundtrip_encoding_); + BEGIN_PARQUET_CATCH_EXCEPTIONS + if constexpr (arrow_supported()) { + encoder->Put(*reference_array_); + auto reencoded_buffer = encoder->FlushValues(); + auto reencoded_data = reencoded_buffer->template span_as(); + auto array = DecodeArrow(roundtrip_encoding_, reencoded_data).ValueOrDie(); + ARROW_CHECK_OK(array->ValidateFull()); + ARROW_CHECK_OK(CompareAgainstReference(array)); + // Compare with reading raw values + for (const int chunk_size : chunk_sizes) { + auto values = + Decode(roundtrip_encoding_, reencoded_data, chunk_size).ValueOrDie(); + ARROW_CHECK_OK(CompareAgainstReference(values)); + } + } else { + encoder->Put(reference_values_); + auto reencoded_buffer = encoder->FlushValues(); + auto reencoded_data = reencoded_buffer->template span_as(); + auto values = + Decode(roundtrip_encoding_, reencoded_data, /*chunk_size=*/num_values_ + 1) + .ValueOrDie(); + ARROW_CHECK_OK(CompareAgainstReference(values)); + // Try other chunk sizes + for (const int chunk_size : chunk_sizes) { + auto values = + Decode(roundtrip_encoding_, reencoded_data, chunk_size).ValueOrDie(); + ARROW_CHECK_OK(CompareAgainstReference(values)); + } + } + END_PARQUET_CATCH_EXCEPTIONS + } + + return Status::OK(); + } + + protected: + Result> MakeArrow(std::span values) { + ARROW_ASSIGN_OR_RAISE(auto arrow_type, ArrowType()); + const int64_t length = static_cast(values.size()); + + if constexpr (kType == Type::FIXED_LEN_BYTE_ARRAY) { + // Use a buffer builder instead of FixedSizeBinaryBuilder to avoid the cost + // of generating a trivial validity bitmap bit by bit. + const int32_t byte_width = descr_->type_length(); + BufferBuilder data_builder(pool()); + RETURN_NOT_OK(data_builder.Reserve(length * byte_width)); + for (const FLBA item : values) { + data_builder.UnsafeAppend(item.ptr, byte_width); + } + ARROW_ASSIGN_OR_RAISE(auto data_buffer, data_builder.Finish()); + auto data = ArrayData::Make(arrow_type, length, {nullptr, data_buffer}, + /*null_count=*/0); + return ::arrow::MakeArray(data); + } else if constexpr (kType == Type::BYTE_ARRAY) { + TypedBufferBuilder offsets_builder(pool()); + BufferBuilder data_builder(pool()); + int64_t total_data_size = 0; + for (const ByteArray item : values) { + total_data_size += item.len; + } + RETURN_NOT_OK(offsets_builder.Reserve(length + 1)); + RETURN_NOT_OK(data_builder.Reserve(total_data_size)); + int32_t offset = 0; + for (const ByteArray item : values) { + offsets_builder.UnsafeAppend(offset); + data_builder.UnsafeAppend(item.ptr, static_cast(item.len)); + offset += static_cast(item.len); + } + offsets_builder.UnsafeAppend(offset); + ARROW_CHECK_EQ(offset, total_data_size); + ARROW_ASSIGN_OR_RAISE(auto offsets_buffer, offsets_builder.Finish()); + ARROW_ASSIGN_OR_RAISE(auto data_buffer, data_builder.Finish()); + auto data = + ArrayData::Make(arrow_type, length, {nullptr, offsets_buffer, data_buffer}, + /*null_count=*/0); + return ::arrow::MakeArray(data); + } else if constexpr (kType == Type::BOOLEAN) { + // Convert C++ bools into validity bitmap + ::arrow::BooleanBuilder builder(pool()); + auto bool_data = reinterpret_cast(values.data()); + RETURN_NOT_OK(builder.AppendValues(bool_data, bool_data + values.size())); + return builder.Finish(); + } else { + auto data_buffer = std::make_shared( + reinterpret_cast(values.data()), length * sizeof(c_type)); + auto data = ArrayData::Make(arrow_type, length, {nullptr, data_buffer}, + /*null_count=*/0); + return ::arrow::MakeArray(data); + } + } + + Status CompareAgainstReference(const std::shared_ptr& array) { + std::stringstream ss; + auto options = ::arrow::EqualOptions{}.nans_equal(true).diff_sink(&ss); + if (!reference_array_->Equals(array, options)) { + return Status::Invalid("Arrays unequal: ", ss.str()); + } + return Status::OK(); + } + + Status CompareAgainstReference(std::span values) { + if constexpr (arrow_supported()) { + ARROW_CHECK_OK(CompareAgainstReference(reference_array_)); + } else { + static_assert(kType == Type::INT96); + if (reference_values_.size() != values.size() || + memcmp(reference_values_.data(), values.data(), values.size_bytes()) != 0) { + return Status::Invalid("Values unequal"); + } + } + return Status::OK(); + } + + Result> ArrowType() const { + switch (kType) { + case Type::BOOLEAN: + return ::arrow::boolean(); + case Type::INT32: + return ::arrow::int32(); + case Type::INT64: + return ::arrow::int64(); + case Type::FLOAT: + return ::arrow::float32(); + case Type::DOUBLE: + return ::arrow::float64(); + case Type::BYTE_ARRAY: + return ::arrow::binary(); + case Type::FIXED_LEN_BYTE_ARRAY: + return ::arrow::fixed_size_binary(descr_->type_length()); + default: + return Status::NotImplemented("Physical type does not have Arrow equivalent"); + } + } + + std::shared_ptr MakeDecoder(Encoding::type encoding) { + auto decoder = std::dynamic_pointer_cast( + std::shared_ptr(::parquet::MakeDecoder(kType, encoding, descr_, pool()))); + ARROW_CHECK_NE(decoder, nullptr); + return decoder; + } + + std::shared_ptr MakeEncoder(Encoding::type encoding) { + auto encoder = + std::dynamic_pointer_cast(std::shared_ptr(::parquet::MakeEncoder( + kType, encoding, /*use_dictionary=*/false, descr_, pool()))); + ARROW_CHECK_NE(encoder, nullptr); + return encoder; + } + + MemoryPool* pool() { return ::arrow::internal::fuzzing_memory_pool(); } + + static constexpr bool arrow_supported() { return kType != Type::INT96; } + + const Encoding::type source_encoding_, roundtrip_encoding_; + const ColumnDescriptor* descr_; + const int num_values_; + const std::span encoded_data_; + + std::shared_ptr reference_array_; + // Only for INT96 as there is no strictly equivalent Arrow type + std::vector reference_values_; +}; + +} // namespace + +Status FuzzEncoding(const uint8_t* data, int64_t size) { + constexpr auto kInt32Max = std::numeric_limits::max(); + + ARROW_ASSIGN_OR_RAISE(const auto parse_result, + FuzzEncodingHeader::Parse(std::span(data, size))); + auto& [header, encoded_data] = parse_result; + if (encoded_data.size() > static_cast(kInt32Max)) { + // Unlikely but who knows? + return Status::Invalid("Fuzz payload too large"); + } + const auto descr = MakeColumnDescriptor(header.type, header.type_length); + + BEGIN_PARQUET_CATCH_EXCEPTIONS + + auto typed_fuzz = [&](auto* dtype) { + using DType = std::decay_t; + TypedFuzzEncoding typed_fuzz{header.source_encoding, header.roundtrip_encoding, + &descr, header.num_values, encoded_data}; + return typed_fuzz.Fuzz(); + }; + RETURN_NOT_OK(VisitType(header.type, typed_fuzz)); + + END_PARQUET_CATCH_EXCEPTIONS + return Status::OK(); +} + +} // namespace parquet::fuzzing::internal diff --git a/cpp/src/parquet/arrow/fuzz_encoding_internal.h b/cpp/src/parquet/arrow/fuzz_encoding_internal.h new file mode 100644 index 00000000000..92d6b2834cc --- /dev/null +++ b/cpp/src/parquet/arrow/fuzz_encoding_internal.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type_fwd.h" +#include "arrow/util/macros.h" +#include "parquet/platform.h" +#include "parquet/types.h" + +namespace parquet::fuzzing::internal { + +// +// Helper APIs for Parquet encoding roundtrip fuzzing +// + +/// Custom header to workaround the lack of parametric fuzzing on OSS-Fuzz +/// +/// (see https://github.com/google/oss-fuzz/issues/14437) +/// +/// The idea is to prefix the fuzzer payload with a fixed-size header encoding +/// the fuzzer parameters. We also ensure that the seed corpus encompasses all +/// parameter variations. +/// +/// In the fuzzer payload, the fixed-size header is followed by the actual fuzz +/// data. That fuzz data is supposed to represent `num_values` of type `type` +/// encoded with the `source_encoding` (of course this may be wrong if the fuzzer +/// mutated the payload). +struct FuzzEncodingHeader { + /// The encoding the fuzz payload body is encoded with + Encoding::type source_encoding; + /// The encoding to roundtrip the decoded values with + Encoding::type roundtrip_encoding; + /// The type and type length (byte width) - the latter only for FIXED_LEN_BYTE_ARRAY + Type::type type; + int32_t type_length; + /// The number of encoded values + int32_t num_values; + + FuzzEncodingHeader(Encoding::type source_encoding, Encoding::type roundtrip_encoding, + Type::type type, int type_length, int num_values); + FuzzEncodingHeader(Encoding::type source_encoding, Encoding::type roundtrip_encoding, + const ColumnDescriptor* descr, int num_values); + + /// Serialize as a fixed size header for fuzz corpus generation + std::string Serialize() const; + + using ParseResult = std::pair>; + + /// Parse header from a fuzzer payload. + /// Returns a pair of (decoded header, fuzzer body). + static ::arrow::Result Parse(std::span payload); +}; + +/// Fuzz a payload encoded as explained in FuzzEncodingHeader +PARQUET_EXPORT ::arrow::Status FuzzEncoding(const uint8_t* data, int64_t size); + +PARQUET_EXPORT ColumnDescriptor MakeColumnDescriptor(Type::type type, + int type_length = -1); + +} // namespace parquet::fuzzing::internal diff --git a/cpp/src/parquet/arrow/fuzz_internal.cc b/cpp/src/parquet/arrow/fuzz_internal.cc index 8618a85fcca..384749dd48d 100644 --- a/cpp/src/parquet/arrow/fuzz_internal.cc +++ b/cpp/src/parquet/arrow/fuzz_internal.cc @@ -33,6 +33,7 @@ #include "parquet/bloom_filter_reader.h" #include "parquet/page_index.h" #include "parquet/properties.h" +#include "parquet/visit_type_inline.h" namespace parquet::fuzzing::internal { @@ -129,35 +130,10 @@ Status FuzzReadColumnIndex(const ColumnIndex* index, const ColumnDescriptor* des index->non_null_page_indices(); index->encoded_min_values(); index->encoded_max_values(); - switch (descr->physical_type()) { - case Type::BOOLEAN: - st &= FuzzReadTypedColumnIndex(dynamic_cast(index)); - break; - case Type::INT32: - st &= FuzzReadTypedColumnIndex(dynamic_cast(index)); - break; - case Type::INT64: - st &= FuzzReadTypedColumnIndex(dynamic_cast(index)); - break; - case Type::INT96: - st &= FuzzReadTypedColumnIndex( - dynamic_cast*>(index)); - break; - case Type::FLOAT: - st &= FuzzReadTypedColumnIndex(dynamic_cast(index)); - break; - case Type::DOUBLE: - st &= FuzzReadTypedColumnIndex(dynamic_cast(index)); - break; - case Type::FIXED_LEN_BYTE_ARRAY: - st &= FuzzReadTypedColumnIndex(dynamic_cast(index)); - break; - case Type::BYTE_ARRAY: - st &= FuzzReadTypedColumnIndex(dynamic_cast(index)); - break; - case Type::UNDEFINED: - break; - } + VisitType(descr->physical_type(), [&](auto* dtype) { + using DType = std::decay_t; + st &= FuzzReadTypedColumnIndex(dynamic_cast*>(index)); + }); END_PARQUET_CATCH_EXCEPTIONS return st; } diff --git a/cpp/src/parquet/arrow/fuzz_internal.h b/cpp/src/parquet/arrow/fuzz_internal.h index 3d1d6a17258..6c571378b50 100644 --- a/cpp/src/parquet/arrow/fuzz_internal.h +++ b/cpp/src/parquet/arrow/fuzz_internal.h @@ -30,6 +30,10 @@ namespace parquet::fuzzing::internal { +// +// Helper APIs for full file Parquet fuzzing +// + struct EncryptionKey { ::arrow::util::SecureString key; std::string key_metadata; diff --git a/cpp/src/parquet/arrow/generate_encoding_fuzz_corpus.cc b/cpp/src/parquet/arrow/generate_encoding_fuzz_corpus.cc new file mode 100644 index 00000000000..9616fe5d69d --- /dev/null +++ b/cpp/src/parquet/arrow/generate_encoding_fuzz_corpus.cc @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A command line executable that generates a bunch of valid Parquet files +// containing example record batches. Those are used as fuzzing seeds +// to make fuzzing more efficient. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/io/file.h" +#include "arrow/result.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging_internal.h" +#include "parquet/arrow/fuzz_encoding_internal.h" +#include "parquet/encoding.h" +#include "parquet/schema.h" + +namespace arrow { + +using internal::checked_cast; + +using Encoding = ::parquet::Encoding; +using ParquetType = ::parquet::Type; +using ::parquet::Int96; +using ::parquet::fuzzing::internal::FuzzEncodingHeader; + +using EncodingVector = std::vector; + +struct FullParquetType { + ParquetType::type type; + int type_length = -1; + + static FullParquetType FromArrow(const DataType& type) { + switch (type.id()) { + case Type::BOOL: + return {ParquetType::BOOLEAN}; + case Type::INT32: + return {ParquetType::INT32}; + case Type::INT64: + return {ParquetType::INT64}; + case Type::FLOAT: + return {ParquetType::FLOAT}; + case Type::DOUBLE: + return {ParquetType::DOUBLE}; + case Type::BINARY: + case Type::STRING: + return {ParquetType::BYTE_ARRAY}; + case Type::FIXED_SIZE_BINARY: + return {ParquetType::FIXED_LEN_BYTE_ARRAY, + checked_cast(type).byte_width()}; + default: + Status::TypeError("Unsupported Arrow type").Abort(); + } + } + + std::string ToString() const { return ::parquet::TypeToString(type, type_length); } +}; + +struct DataWithEncodings { + FullParquetType pq_type; + std::shared_ptr values; + EncodingVector encodings; +}; + +Result> SampleData() { + auto rag = random::RandomArrayGenerator(/*seed=*/42); + std::vector all_data; + + auto push_array = [&](std::shared_ptr arr, EncodingVector encodings = {}) { + auto pq_type = FullParquetType::FromArrow(*arr->type()); + if (encodings.empty()) { + encodings = ::parquet::SupportedEncodings(pq_type.type); + } + all_data.emplace_back(pq_type, arr, std::move(encodings)); + }; + + for (const auto size : {100, 10000}) { + push_array(rag.Int32(size, /*min=*/0, /*max=*/10)); + push_array(rag.Int32(size, /*min=*/std::numeric_limits::min(), + /*max=*/std::numeric_limits::max())); + push_array(rag.Int64(size, /*min=*/0, /*max=*/0)); + push_array(rag.Int64(size, /*min=*/0, /*max=*/10'000)); + push_array(rag.Int64(size, /*min=*/std::numeric_limits::min(), + /*max=*/std::numeric_limits::max())); + for (const double true_probability : {0.01, 0.5, 0.9}) { + push_array(rag.Boolean(size, true_probability)); + } + push_array(rag.Float32(size, /*min=*/-1.0f, /*max=*/1.0f)); + push_array(rag.Float32(size, /*min=*/std::numeric_limits::lowest(), + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0.0, /*nan_probability=*/1e-2)); + push_array(rag.Float64(size, /*min=*/-1.0, /*max=*/1.0)); + push_array(rag.Float64(size, /*min=*/std::numeric_limits::lowest(), + /*max=*/std::numeric_limits::max(), + /*null_probability=*/0.0, /*nan_probability=*/1e-2)); + for (const int32_t byte_width : {1, 3, 16}) { + const auto fsb_size = size / byte_width; + push_array(rag.FixedSizeBinary(fsb_size, byte_width)); + } + for (const int32_t max_binary_length : {1, 30}) { + const auto binary_size = size / max_binary_length; + push_array(rag.BinaryWithRepeats(binary_size, /*unique=*/binary_size / 2, + /*min_length=*/0, + /*max_length=*/max_binary_length)); + } + } + + return all_data; +} + +using Int96Vector = std::vector; + +Result> SampleInt96() { + auto rag = random::RandomArrayGenerator(/*seed=*/42); + std::vector all_data; + + for (const auto size : {50, 5000}) { + // The max value is chosen so that the nanoseconds component remains + // smaller than kNanosecondsPerDay. + auto ints = rag.Int32(size * 3, /*min=*/0, /*max=*/20000); + const Int96* int96_data = reinterpret_cast( + checked_cast(*ints).raw_values()); + all_data.push_back(Int96Vector(int96_data, int96_data + size)); + } + return all_data; +} + +Status DoMain(const std::string& out_dir) { + ARROW_ASSIGN_OR_RAISE(auto dir_fn, internal::PlatformFilename::FromString(out_dir)); + RETURN_NOT_OK(internal::CreateDir(dir_fn)); + + int sample_num = 1; + auto sample_file_name = [&](const std::string& name = "") -> std::string { + std::stringstream ss; + if (!name.empty()) { + ss << name << "-"; + } + ss << sample_num++ << ".pq"; + return std::move(ss).str(); + }; + + auto write_sample = [&](Encoding::type source_encoding, + Encoding::type roundtrip_encoding, FullParquetType type, + const BufferVector& buffers) -> Status { + std::stringstream ss; + ss << type.ToString() << "-" << ::parquet::EncodingToString(source_encoding) << "-" + << ::parquet::EncodingToString(roundtrip_encoding); + ARROW_ASSIGN_OR_RAISE(auto sample_fn, dir_fn.Join(sample_file_name(ss.str()))); + std::cerr << sample_fn.ToString() << std::endl; + ARROW_ASSIGN_OR_RAISE(auto file, io::FileOutputStream::Open(sample_fn.ToString())); + for (const auto& buf : buffers) { + RETURN_NOT_OK(file->Write(buf)); + } + return file->Close(); + }; + + ARROW_ASSIGN_OR_RAISE(auto all_data, SampleData()); + for (const auto& data : all_data) { + const auto descr = parquet::fuzzing::internal::MakeColumnDescriptor( + data.pq_type.type, data.pq_type.type_length); + for (const auto roundtrip_encoding : data.encodings) { + // We always add PLAIN as a source encoding because the fuzzer might be able + // to explore more search space using this encoding. + for (const auto source_encoding : std::set{Encoding::PLAIN, roundtrip_encoding}) { + BufferVector buffers; + FuzzEncodingHeader header( + source_encoding, roundtrip_encoding, &descr, /*num_values=*/ + static_cast(data.values->length() - data.values->null_count())); + + buffers.push_back(Buffer::FromString(header.Serialize())); + BEGIN_PARQUET_CATCH_EXCEPTIONS + auto encoder = ::parquet::MakeEncoder(data.pq_type.type, source_encoding, + /*use_dictionary=*/false, &descr); + encoder->Put(*data.values); + buffers.push_back(encoder->FlushValues()); + END_PARQUET_CATCH_EXCEPTIONS + RETURN_NOT_OK( + write_sample(source_encoding, roundtrip_encoding, data.pq_type, buffers)); + } + } + } + + // Special-case INT96 as there is no direct Arrow equivalent to encode. + ARROW_ASSIGN_OR_RAISE(auto int96_data, SampleInt96()); + for (const auto& data : int96_data) { + const auto pq_type = FullParquetType{ParquetType::INT96}; + const auto descr = parquet::fuzzing::internal::MakeColumnDescriptor( + pq_type.type, pq_type.type_length); + const auto source_encoding = Encoding::PLAIN; + const auto roundtrip_encoding = Encoding::PLAIN; + + BufferVector buffers; + FuzzEncodingHeader header(source_encoding, roundtrip_encoding, &descr, /*num_values=*/ + static_cast(data.size())); + buffers.push_back(Buffer::FromString(header.Serialize())); + BEGIN_PARQUET_CATCH_EXCEPTIONS + auto encoder = ::parquet::MakeEncoder(pq_type.type, source_encoding, + /*use_dictionary=*/false, &descr); + dynamic_cast<::parquet::Int96Encoder*>(encoder.get())->Put(data); + buffers.push_back(encoder->FlushValues()); + END_PARQUET_CATCH_EXCEPTIONS + RETURN_NOT_OK(write_sample(source_encoding, roundtrip_encoding, pq_type, buffers)); + } + + return Status::OK(); +} + +ARROW_NORETURN void Usage() { + std::cerr << "Usage: parquet-generate-encoding-fuzz-corpus " + << "" << std::endl; + std::exit(2); +} + +int Main(int argc, char** argv) { + if (argc != 2) { + Usage(); + } + auto out_dir = std::string(argv[1]); + + Status st = DoMain(out_dir); + if (!st.ok()) { + std::cerr << st.ToString() << std::endl; + return 1; + } + return 0; +} + +} // namespace arrow + +int main(int argc, char** argv) { return ::arrow::Main(argc, argv); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index a77323d29fa..a60af69aec9 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -37,7 +37,6 @@ #include "arrow/util/async_generator.h" #include "arrow/util/bit_util.h" #include "arrow/util/future.h" -#include "arrow/util/fuzz_internal.h" #include "arrow/util/iterator.h" #include "arrow/util/logging_internal.h" #include "arrow/util/parallel.h" diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc index 6c213f18f5e..4500a72f01d 100644 --- a/cpp/src/parquet/decoder.cc +++ b/cpp/src/parquet/decoder.cc @@ -2447,4 +2447,28 @@ std::unique_ptr MakeDictDecoder(Type::type type_num, } } // namespace detail + +std::vector SupportedEncodings(Type::type physical_type) { + switch (physical_type) { + case Type::BOOLEAN: + return {Encoding::PLAIN, Encoding::RLE}; + case Type::INT32: + case Type::INT64: + return {Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED, + Encoding::BYTE_STREAM_SPLIT}; + case Type::INT96: + return {Encoding::PLAIN}; + case Type::FLOAT: + case Type::DOUBLE: + return {Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT}; + case Type::FIXED_LEN_BYTE_ARRAY: + return {Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT, Encoding::DELTA_BYTE_ARRAY}; + case Type::BYTE_ARRAY: + return {Encoding::PLAIN, Encoding::DELTA_LENGTH_BYTE_ARRAY, + Encoding::DELTA_BYTE_ARRAY}; + default: + throw ParquetException("Invalid physical type"); + } +} + } // namespace parquet diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc index edfebad5ab4..0a1cfc6112e 100644 --- a/cpp/src/parquet/encoder.cc +++ b/cpp/src/parquet/encoder.cc @@ -116,9 +116,9 @@ class EncoderImpl : virtual public Encoder { const Encoding::type encoding_; MemoryPool* pool_; - /// Type length from descr + // Type length from descr const int type_length_; - /// Number of unencoded bytes written to the encoder. Used for ByteArray type only. + // Number of unencoded bytes written to the encoder. Used for ByteArray type only. int64_t unencoded_byte_array_data_bytes_ = 0; }; diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index d80bf0edcae..12ee8896797 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -455,4 +455,10 @@ std::unique_ptr::Decoder> MakeTypedDecoder( return std::unique_ptr(dynamic_cast(base.release())); } +/// Return the list of supported encodings for the given physical type +/// +/// Only non-dictionary encodings are returned. +PARQUET_EXPORT +std::vector SupportedEncodings(Type::type physical_type); + } // namespace parquet diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 3eb1940a4d5..5115516f9fc 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -57,6 +58,50 @@ namespace bit_util = arrow::bit_util; namespace parquet::test { +// Validate that `func` succeeds on supported (Type, Encoding) combinations, and +// raises on unsupported ones. +void TestSupportedEncodingsConsistentWith( + std::function func) { + // Try all possible types and encodings + for (int int_type = 0; int_type < static_cast(Type::UNDEFINED); ++int_type) { + const auto type = static_cast(int_type); + const auto supported_encodings = SupportedEncodings(type); + ARROW_SCOPED_TRACE("Type = ", TypeToString(type)); + const auto descr = + ColumnDescriptor(schema::PrimitiveNode::Make("col", Repetition::REQUIRED, type, + ConvertedType::NONE, /*length=*/2), + /*max_definition_level=*/0, /*max_repetition_level=*/0); + + for (int int_encoding = 0; int_encoding < static_cast(Encoding::UNDEFINED); + ++int_encoding) { + const auto encoding = static_cast(int_encoding); + ARROW_SCOPED_TRACE("Encoding = ", EncodingToString(encoding)); + if (std::find(supported_encodings.begin(), supported_encodings.end(), encoding) != + supported_encodings.end()) { + ASSERT_NO_THROW(func(type, encoding, descr)); + } else { + ASSERT_THROW(func(type, encoding, descr), ParquetException); + } + } + } +} + +TEST(SupportedEncodings, TestMakeDecoder) { + auto make_decoder = [](Type::type type, Encoding::type encoding, + const ColumnDescriptor& descr) { + ARROW_UNUSED(MakeDecoder(type, encoding, &descr)); + }; + TestSupportedEncodingsConsistentWith(make_decoder); +} + +TEST(SupportedEncodings, TestMakeEncoder) { + auto make_encoder = [](Type::type type, Encoding::type encoding, + const ColumnDescriptor& descr) { + ARROW_UNUSED(MakeEncoder(type, encoding, /*use_dictionary=*/false, &descr)); + }; + TestSupportedEncodingsConsistentWith(make_encoder); +} + TEST(VectorBooleanTest, TestEncodeBoolDecode) { // PARQUET-454 const int nvalues = 10000; diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc index 2e5f6fe37c4..9b5435e026f 100644 --- a/cpp/src/parquet/statistics.cc +++ b/cpp/src/parquet/statistics.cc @@ -38,6 +38,7 @@ #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/schema.h" +#include "parquet/visit_type_inline.h" using arrow::default_memory_pool; using arrow::MemoryPool; @@ -1102,28 +1103,14 @@ std::shared_ptr Statistics::Make( int64_t distinct_count, bool has_min_max, bool has_null_count, bool has_distinct_count, std::optional is_min_value_exact, std::optional is_max_value_exact, ::arrow::MemoryPool* pool) { -#define MAKE_STATS(CAP_TYPE, KLASS) \ - case Type::CAP_TYPE: \ - return std::make_shared>( \ - descr, encoded_min, encoded_max, num_values, null_count, distinct_count, \ - has_min_max, has_null_count, has_distinct_count, is_min_value_exact, \ - is_max_value_exact, pool) - - switch (descr->physical_type()) { - MAKE_STATS(BOOLEAN, BooleanType); - MAKE_STATS(INT32, Int32Type); - MAKE_STATS(INT64, Int64Type); - MAKE_STATS(INT96, Int96Type); - MAKE_STATS(FLOAT, FloatType); - MAKE_STATS(DOUBLE, DoubleType); - MAKE_STATS(BYTE_ARRAY, ByteArrayType); - MAKE_STATS(FIXED_LEN_BYTE_ARRAY, FLBAType); - default: - break; - } -#undef MAKE_STATS - DCHECK(false) << "Cannot reach here"; - return nullptr; + return VisitType(descr->physical_type(), + [&](auto* type) -> std::shared_ptr { + using DType = std::decay_t; + return std::make_shared>( + descr, encoded_min, encoded_max, num_values, null_count, + distinct_count, has_min_max, has_null_count, has_distinct_count, + is_min_value_exact, is_max_value_exact, pool); + }); } } // namespace parquet diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 7e8a18fc94d..ad4df5119e7 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -691,11 +692,13 @@ constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000); constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000); constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000); -MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; }; -STRUCT_END(Int96, 12); +ARROW_PACKED_START(struct, Int96) { std::array value; }; +ARROW_PACKED_END +static_assert(sizeof(Int96) == 12, "Int96 not packed to 12 bytes"); +static_assert(alignof(Int96) <= 4, "Int96 alignment too large"); inline bool operator==(const Int96& left, const Int96& right) { - return std::equal(left.value, left.value + 3, right.value); + return left.value == right.value; } inline bool operator!=(const Int96& left, const Int96& right) { return !(left == right); } @@ -752,7 +755,7 @@ static inline int64_t Int96GetSeconds(const parquet::Int96& i96) { static inline std::string Int96ToString(const Int96& a) { std::ostringstream result; - std::copy(a.value, a.value + 3, std::ostream_iterator(result, " ")); + std::copy(a.value.begin(), a.value.end(), std::ostream_iterator(result, " ")); return result.str(); } diff --git a/cpp/src/parquet/visit_type_inline.h b/cpp/src/parquet/visit_type_inline.h new file mode 100644 index 00000000000..44388661465 --- /dev/null +++ b/cpp/src/parquet/visit_type_inline.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/util/macros.h" +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet { + +#define TYPE_VISIT_INLINE(TYPE_VALUE) \ + case Type::TYPE_VALUE: { \ + const PhysicalType* concrete_ptr = NULLPTR; \ + return std::forward(visitor)(concrete_ptr, std::forward(args)...); \ + } + +/// \brief Call `visitor` with a nullptr of the corresponding concrete type class +/// \tparam ARGS Additional arguments, if any, will be passed to the visitor after +/// the type argument +/// +/// The intent is for this to be called on a generic lambda +/// that may internally use `if constexpr` or similar constructs. +template +auto VisitType(Type::type type, VISITOR&& visitor, ARGS&&... args) + -> decltype(std::forward(visitor)(std::declval*>(), + args...)) { + switch (type) { + TYPE_VISIT_INLINE(INT32) + TYPE_VISIT_INLINE(INT64) + TYPE_VISIT_INLINE(INT96) + TYPE_VISIT_INLINE(FLOAT) + TYPE_VISIT_INLINE(DOUBLE) + TYPE_VISIT_INLINE(FIXED_LEN_BYTE_ARRAY) + TYPE_VISIT_INLINE(BYTE_ARRAY) + TYPE_VISIT_INLINE(BOOLEAN) + default: + throw ParquetException("Invalid Type::type"); + } +} + +#undef TYPE_VISIT_INLINE + +} // namespace parquet diff --git a/cpp/src/parquet/xxhasher.cc b/cpp/src/parquet/xxhasher.cc index fbb32e1d4b9..1914e6e84d2 100644 --- a/cpp/src/parquet/xxhasher.cc +++ b/cpp/src/parquet/xxhasher.cc @@ -58,8 +58,7 @@ uint64_t XxHasher::Hash(const FLBA* value, uint32_t len) const { } uint64_t XxHasher::Hash(const Int96* value) const { - return XXH64(reinterpret_cast(value->value), sizeof(value->value), - kParquetBloomXxHashSeed); + return XXH64(value->value.data(), sizeof(value->value), kParquetBloomXxHashSeed); } uint64_t XxHasher::Hash(std::string_view value) const { @@ -89,8 +88,8 @@ void XxHasher::Hashes(const double* values, int num_values, uint64_t* hashes) co void XxHasher::Hashes(const Int96* values, int num_values, uint64_t* hashes) const { for (int i = 0; i < num_values; ++i) { - hashes[i] = XXH64(reinterpret_cast(values[i].value), - sizeof(values[i].value), kParquetBloomXxHashSeed); + hashes[i] = + XXH64(values[i].value.data(), sizeof(values[i].value), kParquetBloomXxHashSeed); } } diff --git a/docs/source/developers/cpp/fuzzing.rst b/docs/source/developers/cpp/fuzzing.rst index 4df5455de22..f4e4914cebc 100644 --- a/docs/source/developers/cpp/fuzzing.rst +++ b/docs/source/developers/cpp/fuzzing.rst @@ -29,6 +29,7 @@ fuzz testing on several parts of the Arrow C++ feature set, currently: * the IPC stream reader * the IPC file reader * the Parquet file reader +* the Parquet encoders and decoders * the CSV file reader We welcome any contribution to expand the scope of fuzz testing and cover