Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmake_modules/IcebergThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ function(resolve_arrow_dependency)
set(ARROW_S3 ${ICEBERG_S3})
set(ARROW_JSON ON)
set(ARROW_PARQUET ON)
set(ARROW_ENABLE_THREADING ON)
set(ARROW_SIMD_LEVEL "NONE")
set(ARROW_RUNTIME_SIMD_LEVEL "NONE")
set(ARROW_POSITION_INDEPENDENT_CODE ON)
Expand Down Expand Up @@ -167,8 +168,8 @@ function(resolve_arrow_dependency)
endif()

# Arrow's exported static target interface may reference system libraries
# (e.g. OpenSSL, CURL, ZLIB) that consumers need to find.
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES ZLIB)
# (e.g. Threads, OpenSSL, CURL, ZLIB) that consumers need to find.
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Threads ZLIB)
if(ARROW_S3)
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES OpenSSL CURL)
endif()
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ set(ICEBERG_SOURCES
util/snapshot_util.cc
util/string_util.cc
util/struct_like_set.cc
util/task_group.cc
util/temporal_util.cc
util/timepoint.cc
util/transform_util.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ iceberg_sources = files(
'util/snapshot_util.cc',
'util/string_util.cc',
'util/struct_like_set.cc',
'util/task_group.cc',
'util/temporal_util.cc',
'util/timepoint.cc',
'util/transform_util.cc',
Expand Down
9 changes: 9 additions & 0 deletions src/iceberg/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

#pragma once

#include <concepts>
#include <expected>
#include <format>
#include <string>
#include <type_traits>

#include "iceberg/iceberg_export.h"

Expand Down Expand Up @@ -126,4 +128,11 @@ DEFINE_ERROR_FUNCTION(ValidationFailed)

#undef DEFINE_ERROR_FUNCTION

template <typename T>
concept AsResult = std::derived_from<std::remove_cvref_t<T>,
Result<typename std::remove_cvref_t<T>::value_type>>;

template <AsResult T>
using ResultValueT = typename std::remove_cvref_t<T>::value_type;

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ add_iceberg_test(util_test
retry_util_test.cc
string_util_test.cc
struct_like_set_test.cc
task_group_test.cc
temporal_util_test.cc
transform_util_test.cc
truncate_util_test.cc
Expand Down
81 changes: 69 additions & 12 deletions src/iceberg/test/arrow_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,39 @@
* under the License.
*/

#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>

#include <arrow/api.h>
#include <arrow/c/bridge.h>
#include <arrow/extension/uuid.h>
#include <arrow/result.h>
#include <arrow/type_fwd.h>
#include <arrow/util/config.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/util/thread_pool.h>
#include <gtest/gtest.h>

#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/constants.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/test/matchers.h"
#include "iceberg/util/executor.h"
#include "iceberg/util/task_group.h"

namespace iceberg {

struct ToArrowSchemaParam {
std::shared_ptr<Type> iceberg_type;
bool optional = true;
std::shared_ptr<arrow::DataType> arrow_type;
std::shared_ptr<::arrow::DataType> arrow_type;
};

class ToArrowSchemaTest : public ::testing::TestWithParam<ToArrowSchemaParam> {};
Expand Down Expand Up @@ -89,17 +100,17 @@ INSTANTIATE_TEST_SUITE_P(
ToArrowSchemaParam{.iceberg_type = iceberg::date(),
.arrow_type = ::arrow::date32()},
ToArrowSchemaParam{.iceberg_type = iceberg::time(),
.arrow_type = ::arrow::time64(arrow::TimeUnit::MICRO)},
.arrow_type = ::arrow::time64(::arrow::TimeUnit::MICRO)},
ToArrowSchemaParam{.iceberg_type = iceberg::timestamp(),
.arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO)},
.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO)},
ToArrowSchemaParam{
.iceberg_type = iceberg::timestamp_tz(),
.arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO, "UTC")},
.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC")},
ToArrowSchemaParam{.iceberg_type = iceberg::timestamp_ns(),
.arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO)},
.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO)},
ToArrowSchemaParam{
.iceberg_type = iceberg::timestamptz_ns(),
.arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO, "UTC")},
.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC")},
ToArrowSchemaParam{.iceberg_type = iceberg::string(),
.arrow_type = ::arrow::utf8()},
ToArrowSchemaParam{.iceberg_type = iceberg::binary(),
Expand Down Expand Up @@ -234,7 +245,7 @@ TEST(ToArrowSchemaTest, MapType) {
}

struct FromArrowSchemaParam {
std::shared_ptr<arrow::DataType> arrow_type;
std::shared_ptr<::arrow::DataType> arrow_type;
bool optional = true;
std::shared_ptr<Type> iceberg_type;
};
Expand Down Expand Up @@ -288,17 +299,17 @@ INSTANTIATE_TEST_SUITE_P(
.iceberg_type = iceberg::decimal(10, 2)},
FromArrowSchemaParam{.arrow_type = ::arrow::date32(),
.iceberg_type = iceberg::date()},
FromArrowSchemaParam{.arrow_type = ::arrow::time64(arrow::TimeUnit::MICRO),
FromArrowSchemaParam{.arrow_type = ::arrow::time64(::arrow::TimeUnit::MICRO),
.iceberg_type = iceberg::time()},
FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO),
FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO),
.iceberg_type = iceberg::timestamp()},
FromArrowSchemaParam{
.arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"),
.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"),
.iceberg_type = std::make_shared<TimestampTzType>()},
FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO),
FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO),
.iceberg_type = iceberg::timestamp_ns()},
FromArrowSchemaParam{
.arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO, "UTC"),
.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"),
.iceberg_type = iceberg::timestamptz_ns()},
FromArrowSchemaParam{.arrow_type = ::arrow::utf8(),
.iceberg_type = iceberg::string()},
Expand Down Expand Up @@ -467,4 +478,50 @@ TEST(FromArrowSchemaTest, MapType) {
ASSERT_EQ(value.type()->type_id(), TypeId::kInt);
}

TEST(ArrowExecutorAdapterTest, RunsTaskGroupOnThreadPool) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires ARROW_ENABLE_THREADING=ON";
#endif

class ArrowExecutorAdapter final : public Executor {
public:
explicit ArrowExecutorAdapter(::arrow::internal::Executor& executor)
: executor_(executor) {}

Status Submit(ExecutorTask task) override {
ICEBERG_ARROW_RETURN_NOT_OK(executor_.Spawn(std::move(task)));
return {};
}

private:
::arrow::internal::Executor& executor_;
};

auto thread_pool = ::arrow::internal::ThreadPool::Make(2).ValueOrDie();
ArrowExecutorAdapter executor(*thread_pool);

std::mutex mutex;
std::vector<std::thread::id> thread_ids;

auto status = TaskGroup<>()
.SetExecutor(std::ref(executor))
.Submit([&]() -> Status {
std::lock_guard lock(mutex);
thread_ids.push_back(std::this_thread::get_id());
return {};
})
.Submit([&]() -> Status {
std::lock_guard lock(mutex);
thread_ids.push_back(std::this_thread::get_id());
return {};
})
.Run();

EXPECT_THAT(status, IsOk());
EXPECT_EQ(thread_ids.size(), 2);
EXPECT_NE(thread_ids[0], std::this_thread::get_id());
EXPECT_NE(thread_ids[1], std::this_thread::get_id());
EXPECT_TRUE(thread_pool->Shutdown().ok());
}

} // namespace iceberg
62 changes: 62 additions & 0 deletions src/iceberg/test/executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <atomic>
#include <thread>
#include <utility>
#include <vector>

#include "iceberg/result.h"
#include "iceberg/util/executor.h"

namespace iceberg::test {

class ThreadExecutor final : public Executor {
public:
explicit ThreadExecutor(Status submit_status = {})
: submit_status_(std::move(submit_status)) {}

~ThreadExecutor() override {
for (auto& thread : threads_) {
if (thread.joinable()) {
thread.join();
}
}
}

Status Submit(ExecutorTask task) override {
submit_count_.fetch_add(1, std::memory_order_relaxed);
if (!submit_status_.has_value()) {
return std::unexpected(submit_status_.error());
}
threads_.emplace_back(std::move(task));
return {};
}

int submit_count() const { return submit_count_.load(std::memory_order_relaxed); }

private:
Status submit_status_;
std::atomic<int> submit_count_{0};
std::vector<std::thread> threads_;
};

} // namespace iceberg::test
1 change: 1 addition & 0 deletions src/iceberg/test/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ iceberg_tests = {
'roaring_position_bitmap_test.cc',
'string_util_test.cc',
'struct_like_set_test.cc',
'task_group_test.cc',
'temporal_util_test.cc',
'transform_util_test.cc',
'truncate_util_test.cc',
Expand Down
76 changes: 76 additions & 0 deletions src/iceberg/test/retry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <cstdint>
#include <vector>

#include "iceberg/result.h"
#include "iceberg/util/retry_util.h"
#include "iceberg/util/retry_util_internal.h"

namespace iceberg::test {

using CommitFailedRetry = retry::OnlyRetryOn<ErrorKind::kCommitFailed>;

using TransientIORetry =
retry::OnlyRetryOn<ErrorKind::kIOError, ErrorKind::kServiceUnavailable,
ErrorKind::kInternalServerError>;

class FakeRetryEnvironment {
public:
using Duration = RetryTestHooks::Duration;
using TimePoint = RetryTestHooks::TimePoint;

FakeRetryEnvironment() {
hooks_.now = [this]() { return now_; };
hooks_.sleep_for = [this](Duration duration) {
sleep_durations_.push_back(duration);
now_ += duration;
};
hooks_.jitter = [this](int32_t base_delay_ms) {
observed_base_delays_ms_.push_back(base_delay_ms);
return base_delay_ms + jitter_offset_ms_;
};
}

void Advance(Duration duration) { now_ += duration; }

void SetJitterOffsetMs(int32_t jitter_offset_ms) {
jitter_offset_ms_ = jitter_offset_ms;
}

const RetryTestHooks& hooks() const { return hooks_; }

const std::vector<Duration>& sleep_durations() const { return sleep_durations_; }

const std::vector<int32_t>& observed_base_delays_ms() const {
return observed_base_delays_ms_;
}

private:
RetryTestHooks hooks_;
TimePoint now_{};
int32_t jitter_offset_ms_ = 0;
std::vector<Duration> sleep_durations_;
std::vector<int32_t> observed_base_delays_ms_;
};

} // namespace iceberg::test
Loading
Loading