Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions cpp/src/arrow/compute/kernels/scalar_if_else.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <cstring>
#include <limits>
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/builder_time.h"
Expand Down Expand Up @@ -688,6 +689,17 @@ struct IfElseFunctor<Type, enable_if_base_binary<Type>> {
using ArrayType = typename TypeTraits<Type>::ArrayType;
using BuilderType = typename TypeTraits<Type>::BuilderType;

static Status ValidateCapacityForOffsetType(int64_t data_bytes) {
int64_t max_offset = static_cast<int64_t>(std::numeric_limits<OffsetType>::max());
if (data_bytes > max_offset) {
return Status::CapacityError("Result may exceed offset capacity for this type: ",
data_bytes, " > ", max_offset,
". Convert inputs to a type that uses an int64 based "
"index such as a large_string");
}
return Status::OK();
}

// A - Array, S - Scalar, X = Array/Scalar

// SXX
Expand All @@ -712,9 +724,12 @@ struct IfElseFunctor<Type, enable_if_base_binary<Type>> {
const uint8_t* right_data = right.buffers[2].data;

// allocate data buffer conservatively
int64_t data_buff_alloc = left_offsets[left.length] - left_offsets[0] +
right_offsets[right.length] - right_offsets[0];
int64_t data_buff_alloc = (static_cast<int64_t>(left_offsets[left.length]) -
static_cast<int64_t>(left_offsets[0])) +
(static_cast<int64_t>(right_offsets[right.length]) -
static_cast<int64_t>(right_offsets[0]));

ARROW_RETURN_NOT_OK(ValidateCapacityForOffsetType(data_buff_alloc));
BuilderType builder(ctx->memory_pool());
ARROW_RETURN_NOT_OK(builder.Reserve(cond.length + 1));
ARROW_RETURN_NOT_OK(builder.ReserveData(data_buff_alloc));
Expand Down Expand Up @@ -745,18 +760,21 @@ struct IfElseFunctor<Type, enable_if_base_binary<Type>> {
ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], ctx->Allocate(offset_length));
std::memcpy(out_data->buffers[1]->mutable_data(), right_offsets, offset_length);

auto right_data_length = right_offsets[right.length] - right_offsets[0];
int64_t right_data_length = static_cast<int64_t>(right_offsets[right.length]) -
static_cast<int64_t>(right_offsets[0]);
ARROW_ASSIGN_OR_RAISE(out_data->buffers[2], ctx->Allocate(right_data_length));
std::memcpy(out_data->buffers[2]->mutable_data(), right_data, right_data_length);
return Status::OK();
}

std::string_view left_data = internal::UnboxScalar<Type>::Unbox(left);
auto left_size = static_cast<OffsetType>(left_data.size());

// allocate data buffer conservatively
int64_t data_buff_alloc =
left_size * cond.length + right_offsets[right.length] - right_offsets[0];
int64_t data_buff_alloc = static_cast<int64_t>(left_data.size()) * cond.length +
(static_cast<int64_t>(right_offsets[right.length]) -
static_cast<int64_t>(right_offsets[0]));
ARROW_RETURN_NOT_OK(ValidateCapacityForOffsetType(data_buff_alloc));
auto left_size = static_cast<OffsetType>(left_data.size());

BuilderType builder(ctx->memory_pool());
ARROW_RETURN_NOT_OK(builder.Reserve(cond.length + 1));
Expand Down Expand Up @@ -785,18 +803,22 @@ struct IfElseFunctor<Type, enable_if_base_binary<Type>> {
ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], ctx->Allocate(offset_length));
std::memcpy(out_data->buffers[1]->mutable_data(), left_offsets, offset_length);

auto left_data_length = left_offsets[left.length] - left_offsets[0];
int64_t left_data_length = static_cast<int64_t>(left_offsets[left.length]) -
static_cast<int64_t>(left_offsets[0]);
ARROW_RETURN_NOT_OK(ValidateCapacityForOffsetType(left_data_length));
ARROW_ASSIGN_OR_RAISE(out_data->buffers[2], ctx->Allocate(left_data_length));
std::memcpy(out_data->buffers[2]->mutable_data(), left_data, left_data_length);
return Status::OK();
}

std::string_view right_data = internal::UnboxScalar<Type>::Unbox(right);
auto right_size = static_cast<OffsetType>(right_data.size());

// allocate data buffer conservatively
int64_t data_buff_alloc =
right_size * cond.length + left_offsets[left.length] - left_offsets[0];
int64_t data_buff_alloc = static_cast<int64_t>(right_data.size()) * cond.length +
(static_cast<int64_t>(left_offsets[left.length]) -
static_cast<int64_t>(left_offsets[0]));
ARROW_RETURN_NOT_OK(ValidateCapacityForOffsetType(data_buff_alloc));
auto right_size = static_cast<OffsetType>(right_data.size());

BuilderType builder(ctx->memory_pool());
ARROW_RETURN_NOT_OK(builder.Reserve(cond.length + 1));
Expand All @@ -817,13 +839,15 @@ struct IfElseFunctor<Type, enable_if_base_binary<Type>> {
static Status Call(KernelContext* ctx, const ArraySpan& cond, const Scalar& left,
const Scalar& right, ExecResult* out) {
std::string_view left_data = internal::UnboxScalar<Type>::Unbox(left);
auto left_size = static_cast<OffsetType>(left_data.size());

std::string_view right_data = internal::UnboxScalar<Type>::Unbox(right);
auto right_size = static_cast<OffsetType>(right_data.size());

// allocate data buffer conservatively
int64_t data_buff_alloc = std::max(right_size, left_size) * cond.length;
int64_t data_buff_alloc =
static_cast<int64_t>(std::max(left_data.size(), right_data.size())) * cond.length;
ARROW_RETURN_NOT_OK(ValidateCapacityForOffsetType(data_buff_alloc));
auto left_size = static_cast<OffsetType>(left_data.size());
auto right_size = static_cast<OffsetType>(right_data.size());
BuilderType builder(ctx->memory_pool());
ARROW_RETURN_NOT_OK(builder.Reserve(cond.length + 1));
ARROW_RETURN_NOT_OK(builder.ReserveData(data_buff_alloc));
Expand Down
88 changes: 88 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_if_else_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <limits>
#include <numeric>

#include <gtest/gtest.h>
Expand Down Expand Up @@ -608,6 +609,93 @@ TYPED_TEST(TestIfElseBaseBinary, IfElseBaseBinaryRand) {
CheckIfElseOutput(cond, left, right, expected_data);
}

Result<std::shared_ptr<Array>> MakeAllocatedVarBinaryArray(
const std::shared_ptr<DataType>& type, int64_t data_length) {
ARROW_ASSIGN_OR_RAISE(auto values, AllocateBuffer(data_length));
if (type->id() == Type::STRING || type->id() == Type::BINARY) {
if (data_length > std::numeric_limits<int32_t>::max()) {
return Status::Invalid("data_length exceeds int32 offset range");
}
auto offsets =
Buffer::FromVector(std::vector<int32_t>{0, static_cast<int32_t>(data_length)});
return MakeArray(
ArrayData::Make(type, 1, {nullptr, std::move(offsets), std::move(values)}));
}
if (type->id() != Type::LARGE_STRING && type->id() != Type::LARGE_BINARY) {
return Status::TypeError("unsupported var-binary type for helper: ", *type);
}
auto offsets = Buffer::FromVector(std::vector<int64_t>{0, data_length});
return MakeArray(
ArrayData::Make(type, 1, {nullptr, std::move(offsets), std::move(values)}));
}

TEST_F(TestIfElseKernel, IfElseStringAndLargeStringAAAOverflowBehavior) {
constexpr int64_t kPerSide =
static_cast<int64_t>(std::numeric_limits<int32_t>::max() / 2) + 4096;
auto cond = ArrayFromJSON(boolean(), "[true]");

auto check = [&](const std::shared_ptr<DataType>& type, bool expect_capacity_error) {
ASSERT_OK_AND_ASSIGN(auto left, MakeAllocatedVarBinaryArray(type, kPerSide));
ASSERT_OK_AND_ASSIGN(auto right, MakeAllocatedVarBinaryArray(type, kPerSide));

auto maybe_out = CallFunction("if_else", {cond, left, right});
if (expect_capacity_error) {
ASSERT_TRUE(maybe_out.status().IsCapacityError()) << maybe_out.status();
ASSERT_THAT(
maybe_out.status().message(),
::testing::HasSubstr("Result may exceed offset capacity for this type"));
} else {
ASSERT_OK(maybe_out.status()) << maybe_out.status();
}
};

check(TypeTraits<StringType>::type_singleton(), true);
check(TypeTraits<LargeStringType>::type_singleton(), false);
}

TEST_F(TestIfElseKernel, IfElseBinaryCapacityErrorASA) {
constexpr int32_t capacity_limit = std::numeric_limits<int32_t>::max();

auto cond = ArrayFromJSON(boolean(), "[true]");
auto left = Datum(std::make_shared<BinaryScalar>("x"));
ASSERT_OK_AND_ASSIGN(
auto right, MakeAllocatedVarBinaryArray(TypeTraits<StringType>::type_singleton(),
capacity_limit));

EXPECT_RAISES_WITH_MESSAGE_THAT(
CapacityError,
::testing::HasSubstr("Result may exceed offset capacity for this type"),
CallFunction("if_else", {cond, left, right}));
}

TEST_F(TestIfElseKernel, IfElseBinaryCapacityErrorAAS) {
constexpr int32_t capacity_limit = std::numeric_limits<int32_t>::max();

auto cond = ArrayFromJSON(boolean(), "[false]");
ASSERT_OK_AND_ASSIGN(
auto left, MakeAllocatedVarBinaryArray(TypeTraits<StringType>::type_singleton(),
capacity_limit));
auto right = Datum(std::make_shared<BinaryScalar>("x"));

EXPECT_RAISES_WITH_MESSAGE_THAT(
CapacityError,
::testing::HasSubstr("Result may exceed offset capacity for this type"),
CallFunction("if_else", {cond, left, right}));
}

TEST_F(TestIfElseKernel, IfElseBinaryCapacityErrorASS) {
constexpr int64_t kLength = 3000;
std::string payload(1 << 20, 'x');
ASSERT_OK_AND_ASSIGN(auto cond, MakeArrayFromScalar(BooleanScalar(true), kLength));
auto left = Datum(std::make_shared<BinaryScalar>(payload));
auto right = Datum(std::make_shared<BinaryScalar>("x"));

EXPECT_RAISES_WITH_MESSAGE_THAT(
CapacityError,
::testing::HasSubstr("Result may exceed offset capacity for this type"),
CallFunction("if_else", {cond, left, right}));
}

TEST_F(TestIfElseKernel, IfElseFSBinary) {
auto type = fixed_size_binary(4);

Expand Down
Loading