diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index a0d418e05..ea5fa951a 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -105,6 +105,7 @@ function(resolve_arrow_dependency) set(ARROW_S3 ${ICEBERG_S3}) set(ARROW_JSON ON) set(ARROW_PARQUET ON) + set(ARROW_ENABLE_THREADING ON) set(ARROW_SIMD_LEVEL "NONE") set(ARROW_RUNTIME_SIMD_LEVEL "NONE") set(ARROW_POSITION_INDEPENDENT_CODE ON) @@ -167,8 +168,8 @@ function(resolve_arrow_dependency) endif() # Arrow's exported static target interface may reference system libraries - # (e.g. OpenSSL, CURL, ZLIB) that consumers need to find. - list(APPEND ICEBERG_SYSTEM_DEPENDENCIES ZLIB) + # (e.g. Threads, OpenSSL, CURL, ZLIB) that consumers need to find. + list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Threads ZLIB) if(ARROW_S3) list(APPEND ICEBERG_SYSTEM_DEPENDENCIES OpenSSL CURL) endif() diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 38d85534a..3008b970e 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -113,6 +113,7 @@ set(ICEBERG_SOURCES util/snapshot_util.cc util/string_util.cc util/struct_like_set.cc + util/task_group.cc util/temporal_util.cc util/timepoint.cc util/transform_util.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 678f30fbd..a95fcd69d 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -135,6 +135,7 @@ iceberg_sources = files( 'util/snapshot_util.cc', 'util/string_util.cc', 'util/struct_like_set.cc', + 'util/task_group.cc', 'util/temporal_util.cc', 'util/timepoint.cc', 'util/transform_util.cc', diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 765508705..01d17b299 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -19,9 +19,11 @@ #pragma once +#include #include #include #include +#include #include "iceberg/iceberg_export.h" @@ -126,4 +128,11 @@ DEFINE_ERROR_FUNCTION(ValidationFailed) #undef DEFINE_ERROR_FUNCTION +template +concept AsResult = std::derived_from, + Result::value_type>>; + +template +using ResultValueT = typename std::remove_cvref_t::value_type; + } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 20babf19c..37e32ad0e 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -135,6 +135,7 @@ add_iceberg_test(util_test retry_util_test.cc string_util_test.cc struct_like_set_test.cc + task_group_test.cc temporal_util_test.cc transform_util_test.cc truncate_util_test.cc diff --git a/src/iceberg/test/arrow_test.cc b/src/iceberg/test/arrow_test.cc index dcfdb6b56..a954cce88 100644 --- a/src/iceberg/test/arrow_test.cc +++ b/src/iceberg/test/arrow_test.cc @@ -17,28 +17,39 @@ * under the License. */ +#include #include +#include +#include #include +#include +#include #include #include #include #include #include +#include #include +#include #include +#include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/constants.h" +#include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/test/matchers.h" +#include "iceberg/util/executor.h" +#include "iceberg/util/task_group.h" namespace iceberg { struct ToArrowSchemaParam { std::shared_ptr iceberg_type; bool optional = true; - std::shared_ptr arrow_type; + std::shared_ptr<::arrow::DataType> arrow_type; }; class ToArrowSchemaTest : public ::testing::TestWithParam {}; @@ -89,17 +100,17 @@ INSTANTIATE_TEST_SUITE_P( ToArrowSchemaParam{.iceberg_type = iceberg::date(), .arrow_type = ::arrow::date32()}, ToArrowSchemaParam{.iceberg_type = iceberg::time(), - .arrow_type = ::arrow::time64(arrow::TimeUnit::MICRO)}, + .arrow_type = ::arrow::time64(::arrow::TimeUnit::MICRO)}, ToArrowSchemaParam{.iceberg_type = iceberg::timestamp(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO)}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO)}, ToArrowSchemaParam{ .iceberg_type = iceberg::timestamp_tz(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO, "UTC")}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC")}, ToArrowSchemaParam{.iceberg_type = iceberg::timestamp_ns(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO)}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO)}, ToArrowSchemaParam{ .iceberg_type = iceberg::timestamptz_ns(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO, "UTC")}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC")}, ToArrowSchemaParam{.iceberg_type = iceberg::string(), .arrow_type = ::arrow::utf8()}, ToArrowSchemaParam{.iceberg_type = iceberg::binary(), @@ -234,7 +245,7 @@ TEST(ToArrowSchemaTest, MapType) { } struct FromArrowSchemaParam { - std::shared_ptr arrow_type; + std::shared_ptr<::arrow::DataType> arrow_type; bool optional = true; std::shared_ptr iceberg_type; }; @@ -288,17 +299,17 @@ INSTANTIATE_TEST_SUITE_P( .iceberg_type = iceberg::decimal(10, 2)}, FromArrowSchemaParam{.arrow_type = ::arrow::date32(), .iceberg_type = iceberg::date()}, - FromArrowSchemaParam{.arrow_type = ::arrow::time64(arrow::TimeUnit::MICRO), + FromArrowSchemaParam{.arrow_type = ::arrow::time64(::arrow::TimeUnit::MICRO), .iceberg_type = iceberg::time()}, - FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO), + FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO), .iceberg_type = iceberg::timestamp()}, FromArrowSchemaParam{ - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"), + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"), .iceberg_type = std::make_shared()}, - FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO), + FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO), .iceberg_type = iceberg::timestamp_ns()}, FromArrowSchemaParam{ - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO, "UTC"), + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"), .iceberg_type = iceberg::timestamptz_ns()}, FromArrowSchemaParam{.arrow_type = ::arrow::utf8(), .iceberg_type = iceberg::string()}, @@ -467,4 +478,50 @@ TEST(FromArrowSchemaTest, MapType) { ASSERT_EQ(value.type()->type_id(), TypeId::kInt); } +TEST(ArrowExecutorAdapterTest, RunsTaskGroupOnThreadPool) { +#ifndef ARROW_ENABLE_THREADING + GTEST_SKIP() << "Test requires ARROW_ENABLE_THREADING=ON"; +#endif + + class ArrowExecutorAdapter final : public Executor { + public: + explicit ArrowExecutorAdapter(::arrow::internal::Executor& executor) + : executor_(executor) {} + + Status Submit(ExecutorTask task) override { + ICEBERG_ARROW_RETURN_NOT_OK(executor_.Spawn(std::move(task))); + return {}; + } + + private: + ::arrow::internal::Executor& executor_; + }; + + auto thread_pool = ::arrow::internal::ThreadPool::Make(2).ValueOrDie(); + ArrowExecutorAdapter executor(*thread_pool); + + std::mutex mutex; + std::vector thread_ids; + + auto status = TaskGroup<>() + .SetExecutor(std::ref(executor)) + .Submit([&]() -> Status { + std::lock_guard lock(mutex); + thread_ids.push_back(std::this_thread::get_id()); + return {}; + }) + .Submit([&]() -> Status { + std::lock_guard lock(mutex); + thread_ids.push_back(std::this_thread::get_id()); + return {}; + }) + .Run(); + + EXPECT_THAT(status, IsOk()); + EXPECT_EQ(thread_ids.size(), 2); + EXPECT_NE(thread_ids[0], std::this_thread::get_id()); + EXPECT_NE(thread_ids[1], std::this_thread::get_id()); + EXPECT_TRUE(thread_pool->Shutdown().ok()); +} + } // namespace iceberg diff --git a/src/iceberg/test/executor.h b/src/iceberg/test/executor.h new file mode 100644 index 000000000..5b4de65b0 --- /dev/null +++ b/src/iceberg/test/executor.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/util/executor.h" + +namespace iceberg::test { + +class ThreadExecutor final : public Executor { + public: + explicit ThreadExecutor(Status submit_status = {}) + : submit_status_(std::move(submit_status)) {} + + ~ThreadExecutor() override { + for (auto& thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } + } + + Status Submit(ExecutorTask task) override { + submit_count_.fetch_add(1, std::memory_order_relaxed); + if (!submit_status_.has_value()) { + return std::unexpected(submit_status_.error()); + } + threads_.emplace_back(std::move(task)); + return {}; + } + + int submit_count() const { return submit_count_.load(std::memory_order_relaxed); } + + private: + Status submit_status_; + std::atomic submit_count_{0}; + std::vector threads_; +}; + +} // namespace iceberg::test diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index b21a264b1..226217ca7 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -99,6 +99,7 @@ iceberg_tests = { 'roaring_position_bitmap_test.cc', 'string_util_test.cc', 'struct_like_set_test.cc', + 'task_group_test.cc', 'temporal_util_test.cc', 'transform_util_test.cc', 'truncate_util_test.cc', diff --git a/src/iceberg/test/retry.h b/src/iceberg/test/retry.h new file mode 100644 index 000000000..ce96ce163 --- /dev/null +++ b/src/iceberg/test/retry.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/util/retry_util.h" +#include "iceberg/util/retry_util_internal.h" + +namespace iceberg::test { + +using CommitFailedRetry = retry::OnlyRetryOn; + +using TransientIORetry = + retry::OnlyRetryOn; + +class FakeRetryEnvironment { + public: + using Duration = RetryTestHooks::Duration; + using TimePoint = RetryTestHooks::TimePoint; + + FakeRetryEnvironment() { + hooks_.now = [this]() { return now_; }; + hooks_.sleep_for = [this](Duration duration) { + sleep_durations_.push_back(duration); + now_ += duration; + }; + hooks_.jitter = [this](int32_t base_delay_ms) { + observed_base_delays_ms_.push_back(base_delay_ms); + return base_delay_ms + jitter_offset_ms_; + }; + } + + void Advance(Duration duration) { now_ += duration; } + + void SetJitterOffsetMs(int32_t jitter_offset_ms) { + jitter_offset_ms_ = jitter_offset_ms; + } + + const RetryTestHooks& hooks() const { return hooks_; } + + const std::vector& sleep_durations() const { return sleep_durations_; } + + const std::vector& observed_base_delays_ms() const { + return observed_base_delays_ms_; + } + + private: + RetryTestHooks hooks_; + TimePoint now_{}; + int32_t jitter_offset_ms_ = 0; + std::vector sleep_durations_; + std::vector observed_base_delays_ms_; +}; + +} // namespace iceberg::test diff --git a/src/iceberg/test/retry_util_test.cc b/src/iceberg/test/retry_util_test.cc index ead221910..1b2f35578 100644 --- a/src/iceberg/test/retry_util_test.cc +++ b/src/iceberg/test/retry_util_test.cc @@ -27,6 +27,7 @@ #include "iceberg/result.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/retry.h" #include "iceberg/util/retry_util_internal.h" namespace iceberg { @@ -40,50 +41,16 @@ struct NonResultReturningTask { int operator()() const { return 1; } }; +using test::CommitFailedRetry; +using test::FakeRetryEnvironment; +using test::TransientIORetry; + static_assert(detail::RetryTask); static_assert(!detail::RetryTask); -static_assert(requires(RetryRunner runner, ResultReturningTask task) { +static_assert(requires(RetryRunner runner, ResultReturningTask task) { { runner.Run(task) } -> std::same_as>; }); - -class FakeRetryEnvironment { - public: - using Duration = RetryTestHooks::Duration; - using TimePoint = RetryTestHooks::TimePoint; - - FakeRetryEnvironment() { - hooks_.now = [this]() { return now_; }; - hooks_.sleep_for = [this](Duration duration) { - sleep_durations_.push_back(duration); - now_ += duration; - }; - hooks_.jitter = [this](int32_t base_delay_ms) { - observed_base_delays_ms_.push_back(base_delay_ms); - return base_delay_ms + jitter_offset_ms_; - }; - } - - void Advance(Duration duration) { now_ += duration; } - - void SetJitterOffsetMs(int32_t jitter_offset_ms) { - jitter_offset_ms_ = jitter_offset_ms; - } - - const RetryTestHooks& hooks() const { return hooks_; } - - const std::vector& sleep_durations() const { return sleep_durations_; } - - const std::vector& observed_base_delays_ms() const { - return observed_base_delays_ms_; - } - - private: - RetryTestHooks hooks_; - TimePoint now_{}; - int32_t jitter_offset_ms_ = 0; - std::vector sleep_durations_; - std::vector observed_base_delays_ms_; -}; +static_assert(retry::NoRetry::kMode == retry::RetryPolicyMode::kNoRetry); } // namespace @@ -91,11 +58,10 @@ TEST(RetryRunnerTest, SuccessOnFirstAttempt) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -113,11 +79,10 @@ TEST(RetryRunnerTest, RetryOnceThenSucceed) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -138,11 +103,10 @@ TEST(RetryRunnerTest, MaxAttemptsExhausted) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -159,11 +123,10 @@ TEST(RetryRunnerTest, OnlyRetryOnFilter) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -180,11 +143,10 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -201,41 +163,15 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { EXPECT_EQ(attempts, 3); } -TEST(RetryRunnerTest, OnlyRetryOnTakesPrecedenceOverStopRetryOn) { - int call_count = 0; - int32_t attempts = 0; - - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) - .StopRetryOn(ErrorKind::kCommitFailed) - .Run( - [&]() -> Result { - ++call_count; - if (call_count == 1) { - return CommitFailed("transient"); - } - return 100; - }, - &attempts); - - EXPECT_THAT(result, IsOk()); - EXPECT_EQ(*result, 100); - EXPECT_EQ(call_count, 2); - EXPECT_EQ(attempts, 2); -} - TEST(RetryRunnerTest, StopRetryOnMatchingError) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 5, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .StopRetryOn(ErrorKind::kCommitStateUnknown) + auto result = RetryRunner>( + RetryConfig{.num_retries = 5, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -252,11 +188,11 @@ TEST(RetryRunnerTest, StopRetryOnNonMatchingErrorAllowsRetry) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .StopRetryOn({ErrorKind::kCommitStateUnknown}) + auto result = RetryRunner>( + RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -277,11 +213,11 @@ TEST(RetryRunnerTest, ZeroRetriesAllowsUnsetPolicyAndSkipsBackoffValidation) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 0, - .min_wait_ms = 0, - .max_wait_ms = 0, - .total_timeout_ms = 5000, - .scale_factor = 0.5}) + auto result = RetryRunner(RetryConfig{.num_retries = 0, + .min_wait_ms = 0, + .max_wait_ms = 0, + .total_timeout_ms = 5000, + .scale_factor = 0.5}) .Run( [&]() -> Result { ++call_count; @@ -298,10 +234,10 @@ TEST(RetryRunnerTest, NegativeRetriesFailsBeforeTaskRuns) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = -1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) + auto result = RetryRunner(RetryConfig{.num_retries = -1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -360,8 +296,7 @@ TEST(RetryRunnerTest, InvalidBackoffConfigFailsBeforeTaskRuns) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(test_case.config) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(test_case.config) .Run( [&]() -> Result { ++call_count; @@ -377,36 +312,14 @@ TEST(RetryRunnerTest, InvalidBackoffConfigFailsBeforeTaskRuns) { } } -TEST(RetryRunnerTest, UnsetRetryPolicyFailsBeforeTaskRuns) { +TEST(RetryRunnerTest, NoRetryWithRetries) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .Run( - [&]() -> Result { - ++call_count; - return CommitFailed("fail"); - }, - &attempts); - - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("Retry policy must be explicitly configured")); - EXPECT_EQ(call_count, 0); - EXPECT_EQ(attempts, 0); -} - -TEST(RetryRunnerTest, EmptyOnlyRetryOnPolicyFailsBeforeTaskRuns) { - int call_count = 0; - int32_t attempts = 0; - - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(std::initializer_list{}) + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -416,30 +329,7 @@ TEST(RetryRunnerTest, EmptyOnlyRetryOnPolicyFailsBeforeTaskRuns) { EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); EXPECT_THAT(result, - HasErrorMessage("Retry policy must include at least one error kind")); - EXPECT_EQ(call_count, 0); - EXPECT_EQ(attempts, 0); -} - -TEST(RetryRunnerTest, EmptyStopRetryOnPolicyFailsBeforeTaskRuns) { - int call_count = 0; - int32_t attempts = 0; - - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .StopRetryOn({}) - .Run( - [&]() -> Result { - ++call_count; - return CommitFailed("fail"); - }, - &attempts); - - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, - HasErrorMessage("Retry policy must include at least one error kind")); + HasErrorMessage("Retry policy must be enabled when num_retries > 0")); EXPECT_EQ(call_count, 0); EXPECT_EQ(attempts, 0); } @@ -450,11 +340,10 @@ TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 20, - .max_wait_ms = 20, - .total_timeout_ms = 15}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 20, + .max_wait_ms = 20, + .total_timeout_ms = 15}) .Run( [&]() -> Result { ++call_count; @@ -478,11 +367,10 @@ TEST(RetryRunnerTest, TotalTimeoutStopsWhenDelayEqualsRemainingBudget) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 10, - .max_wait_ms = 10, - .total_timeout_ms = 20}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 20}) .Run( [&]() -> Result { ++call_count; @@ -504,11 +392,10 @@ TEST(RetryRunnerTest, NonPositiveTotalTimeoutDisablesDeadline) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 10, - .max_wait_ms = 10, - .total_timeout_ms = 0}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 0}) .Run( [&]() -> Result { ++call_count; @@ -537,11 +424,10 @@ TEST(RetryRunnerTest, RetryDelayDoesNotExceedMaxWaitAfterJitter) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 10, - .max_wait_ms = 10, - .total_timeout_ms = 0}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 0}) .Run( [&]() -> Result { ++call_count; @@ -603,24 +489,26 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { int call_count = 0; int32_t attempts = 0; - auto result = - RetryRunner(RetryConfig{.num_retries = 5, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable}) - .Run( - [&]() -> Result { - ++call_count; - if (call_count == 1) { - return CommitFailed("conflict"); - } - if (call_count == 2) { - return ServiceUnavailable("server busy"); - } - return 77; - }, - &attempts); + using CommitOrUnavailable = + retry::RetryPolicy; + + auto result = RetryRunner(RetryConfig{.num_retries = 5, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("conflict"); + } + if (call_count == 2) { + return ServiceUnavailable("server busy"); + } + return 77; + }, + &attempts); EXPECT_THAT(result, IsOk()); EXPECT_EQ(*result, 77); @@ -628,4 +516,50 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { EXPECT_EQ(attempts, 3); } +TEST(RetryRunnerTest, RetriesTransientIO) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Status { + ++call_count; + if (call_count == 1) { + return IOError("read failed"); + } + if (call_count == 2) { + return ServiceUnavailable("server busy"); + } + return {}; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +TEST(RetryRunnerTest, DoesNotRetryNotFound) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Status { + ++call_count; + return NotFound("missing file"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kNotFound)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + } // namespace iceberg diff --git a/src/iceberg/test/task_group_test.cc b/src/iceberg/test/task_group_test.cc new file mode 100644 index 000000000..2f0da3de2 --- /dev/null +++ b/src/iceberg/test/task_group_test.cc @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/task_group.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/result.h" +#include "iceberg/test/executor.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/retry.h" +#include "iceberg/util/functional.h" + +namespace iceberg { + +namespace { + +RetryConfig FastRetryConfig(int32_t num_retries = 2) { + return RetryConfig{.num_retries = num_retries, + .min_wait_ms = 1, + .max_wait_ms = 1, + .total_timeout_ms = 0}; +} + +} // namespace + +TEST(TaskGroupCompileTest, TaskConcepts) { + auto move_only_mutable_lambda = [value = std::make_unique(1)]() mutable -> Status { + return *value == 1 ? Status{} : IOError("unexpected value"); + }; + using MoveOnlyMutableLambda = decltype(move_only_mutable_lambda); + + auto move_only_const_lambda = [value = std::make_unique(1)]() -> Status { + return *value == 1 ? Status{} : IOError("unexpected value"); + }; + using MoveOnlyConstLambda = decltype(move_only_const_lambda); + + auto copyable_mutable_lambda = [attempt = 0]() mutable -> Status { + return ++attempt > 0 ? Status{} : Status{}; + }; + using CopyableMutableLambda = decltype(copyable_mutable_lambda); + + static_assert(!std::copy_constructible>); + static_assert(!std::default_initializable>); + static_assert(std::move_constructible>); + + static_assert(internal::OnceStatusTask); + static_assert(!internal::OnceStatusTask); + static_assert(internal::OnceStatusTask); + static_assert(!internal::OnceStatusTask); + static_assert(internal::OnceStatusTask); + static_assert(internal::OnceStatusTask); + + static_assert(!internal::RepeatableStatusTask); + static_assert(internal::RepeatableStatusTask); + static_assert(internal::RepeatableStatusTask); + + static_assert(!internal::RetryableStatusTask); + static_assert(!internal::RetryableStatusTask); + static_assert(internal::RetryableStatusTask); + static_assert(!internal::RetryableStatusTask); + static_assert(internal::RetryableStatusTask); + static_assert(internal::RetryableStatusTask); +} + +TEST(FnOnceTest, SupportsMoveOnlyCapture) { + auto value = std::make_unique(41); + FnOnce task([value = std::move(value)]() { return *value + 1; }); + + EXPECT_EQ(std::move(task)(), 42); +} + +TEST(TaskGroupTest, UsesExecutor) { + test::ThreadExecutor executor; + TaskGroup<> group; + bool ran = false; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + ran = true; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_TRUE(ran); + EXPECT_EQ(executor.submit_count(), 1); +} + +TEST(TaskGroupTest, ReturnsSubmitError) { + test::ThreadExecutor executor(ServiceUnavailable("executor busy")); + TaskGroup<> group; + + group.SetExecutor(std::ref(executor)); + group.Submit([]() -> Status { return {}; }); + + EXPECT_THAT(std::move(group).Run(), IsError(ErrorKind::kServiceUnavailable)); + EXPECT_EQ(executor.submit_count(), 1); +} + +TEST(TaskGroupTest, DirectMoveOnlyTask) { + TaskGroup<> group; + auto value = std::make_unique(7); + int observed = 0; + + group.Submit([value = std::move(value), &observed]() mutable -> Status { + observed = *value; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(observed, 7); +} + +TEST(TaskGroupTest, ClearsExecutor) { + test::ThreadExecutor executor; + TaskGroup<> group; + int call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.SetExecutor(std::nullopt); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(executor.submit_count(), 0); +} + +TEST(TaskGroupTest, FluentSubmit) { + int call_count = 0; + + auto status = TaskGroup<>() + .Submit([&]() -> Status { + ++call_count; + return {}; + }) + .Submit([&]() -> Status { + ++call_count; + return {}; + }) + .Run(); + + EXPECT_THAT(status, IsOk()); + EXPECT_EQ(call_count, 2); +} + +TEST(TaskGroupTest, DirectAggregatesErrors) { + TaskGroup<> group; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + return IOError("first failure"); + }); + group.Submit([&]() -> Status { + ++call_count; + return ValidationFailed("second failure"); + }); + + auto status = std::move(group).Run(); + EXPECT_THAT(status, IsError(ErrorKind::kIOError)); + EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); + EXPECT_THAT(status, HasErrorMessage("first failure")); + EXPECT_THAT(status, HasErrorMessage("second failure")); + EXPECT_EQ(call_count, 2); +} + +TEST(TaskGroupTest, ParallelSubmitsAll) { + test::ThreadExecutor executor; + TaskGroup<> group; + std::atomic call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + call_count.fetch_add(1, std::memory_order_relaxed); + return {}; + }); + group.Submit([&]() -> Status { + call_count.fetch_add(1, std::memory_order_relaxed); + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count.load(std::memory_order_relaxed), 2); + EXPECT_EQ(executor.submit_count(), 2); +} + +TEST(TaskGroupTest, ParallelAggregatesErrors) { + test::ThreadExecutor executor; + TaskGroup<> group; + std::atomic call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + call_count.fetch_add(1, std::memory_order_relaxed); + return IOError("first failure"); + }); + group.Submit([&]() -> Status { + call_count.fetch_add(1, std::memory_order_relaxed); + return ValidationFailed("second failure"); + }); + + auto status = std::move(group).Run(); + EXPECT_THAT(status, IsError(ErrorKind::kIOError)); + EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); + EXPECT_THAT(status, HasErrorMessage("first failure")); + EXPECT_THAT(status, HasErrorMessage("second failure")); + EXPECT_EQ(call_count.load(std::memory_order_relaxed), 2); + EXPECT_EQ(executor.submit_count(), 2); +} + +TEST(TaskGroupTest, ParallelSubmitErrors) { + test::ThreadExecutor executor(ServiceUnavailable("executor busy")); + TaskGroup<> group; + int call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + + auto status = std::move(group).Run(); + EXPECT_THAT(status, IsError(ErrorKind::kServiceUnavailable)); + EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); + EXPECT_THAT(status, HasErrorMessage("executor busy")); + EXPECT_EQ(call_count, 0); + EXPECT_EQ(executor.submit_count(), 2); +} + +TEST(TaskGroupTest, RetriesTasks) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + TaskGroup group{FastRetryConfig()}; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + if (call_count == 1) { + return IOError("transient read failure"); + } + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(fake_retry.sleep_durations(), + std::vector( + {test::FakeRetryEnvironment::Duration(1)})); +} + +TEST(TaskGroupTest, RetryReusesTaskState) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + TaskGroup group{FastRetryConfig()}; + + group.Submit([attempt = 0]() mutable -> Status { + ++attempt; + if (attempt == 1) { + return IOError("transient read failure"); + } + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(fake_retry.sleep_durations(), + std::vector( + {test::FakeRetryEnvironment::Duration(1)})); +} + +TEST(TaskGroupTest, RetryAcceptsMoveOnlyRepeatableTask) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + TaskGroup group{FastRetryConfig()}; + int call_count = 0; + auto value = std::make_unique(7); + + group.Submit([value = std::move(value), &call_count]() -> Status { + ++call_count; + if (call_count == 1) { + return IOError("transient read failure"); + } + return *value == 7 ? Status{} : IOError("unexpected value"); + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(fake_retry.sleep_durations(), + std::vector( + {test::FakeRetryEnvironment::Duration(1)})); +} + +TEST(TaskGroupTest, DefaultRetryConfig) { + TaskGroup group; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 1); +} + +TEST(TaskGroupTest, DoesNotRetryNotFound) { + TaskGroup group{FastRetryConfig()}; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + return NotFound("missing manifest"); + }); + + EXPECT_THAT(std::move(group).Run(), IsError(ErrorKind::kNotFound)); + EXPECT_EQ(call_count, 1); +} + +TEST(TaskGroupTest, RetryUsesExecutor) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + test::ThreadExecutor executor; + TaskGroup group{FastRetryConfig()}; + std::atomic first_task_calls = 0; + std::atomic second_task_calls = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + auto call_count = first_task_calls.fetch_add(1, std::memory_order_relaxed) + 1; + if (call_count == 1) { + return ServiceUnavailable("server busy"); + } + return {}; + }); + group.Submit([&]() -> Status { + second_task_calls.fetch_add(1, std::memory_order_relaxed); + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(first_task_calls.load(std::memory_order_relaxed), 2); + EXPECT_EQ(second_task_calls.load(std::memory_order_relaxed), 1); + EXPECT_EQ(executor.submit_count(), 2); +} + +} // namespace iceberg diff --git a/src/iceberg/util/executor.h b/src/iceberg/util/executor.h new file mode 100644 index 000000000..01aa83d7a --- /dev/null +++ b/src/iceberg/util/executor.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/util/functional.h" + +namespace iceberg { + +using ExecutorTask = FnOnce; + +/// \brief Schedules iceberg-cpp internal planning tasks. +/// +/// Public APIs that accept an executor remain synchronous: the calling thread may block +/// while waiting for submitted tasks to finish. Callers must ensure the executor can +/// continue making progress while the caller is blocked. Calling those APIs from one of +/// the same bounded executor's worker threads can deadlock unless the executor supports +/// nested blocking work. +/// +/// When an executor is configured, planning callbacks may be called concurrently. Any +/// shared mutable state captured by those callbacks must be synchronized by the caller. +class ICEBERG_EXPORT Executor { + public: + virtual ~Executor() = default; + + /// \brief Schedule a task for execution. + virtual Status Submit(ExecutorTask task) = 0; +}; + +using OptionalExecutor = std::optional>; + +} // namespace iceberg diff --git a/src/iceberg/util/functional.h b/src/iceberg/util/functional.h new file mode 100644 index 000000000..c5ab2164b --- /dev/null +++ b/src/iceberg/util/functional.h @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Borrowed the file from Apache Arrow: +// https://github.com/apache/arrow/blob/main/cpp/src/arrow/util/functional.h + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +namespace internal { + +template +concept RvalueInvocable = std::constructible_from, Fn> && + std::move_constructible> && + std::is_invocable_r_v&&, Args...>; + +} // namespace internal + +template +class FnOnce; + +template +class ICEBERG_TEMPLATE_CLASS_EXPORT FnOnce { + public: + template + requires(!std::same_as, FnOnce> && + internal::RvalueInvocable) + explicit FnOnce(Fn&& fn) : impl_(std::make_unique>(std::forward(fn))) {} + + FnOnce(FnOnce&&) noexcept = default; + FnOnce& operator=(FnOnce&&) noexcept = default; + FnOnce(const FnOnce&) = delete; + FnOnce& operator=(const FnOnce&) = delete; + + R operator()(Args... args) && { + return std::move(*impl_).Invoke(std::forward(args)...); + } + + private: + struct Impl { + virtual ~Impl() = default; + virtual R Invoke(Args&&... args) && = 0; + }; + + template + struct ImplFor final : Impl { + explicit ImplFor(Fn&& fn) : fn_(std::forward(fn)) {} + R Invoke(Args&&... args) && override { + return std::invoke(std::move(fn_), std::forward(args)...); + } + std::remove_cvref_t fn_; + }; + + std::unique_ptr impl_; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/lazy.h b/src/iceberg/util/lazy.h index be7bcd41d..b31cbc786 100644 --- a/src/iceberg/util/lazy.h +++ b/src/iceberg/util/lazy.h @@ -34,20 +34,14 @@ namespace iceberg { template class Lazy { - template - struct Trait; - template - struct Trait { - using ReturnType = R::value_type; - }; + static R ExtractReturnType(R (*)(Args...)); // only declaration, never defined - using T = Trait::ReturnType; + using T = ResultValueT; public: template - requires std::invocable && - std::same_as, Result> + requires std::invocable Result> Get(Args&&... args) const { std::call_once( flag_, [this, &args...]() { value_ = InitFunc(std::forward(args)...); }); diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build index d70855016..f3a45e599 100644 --- a/src/iceberg/util/meson.build +++ b/src/iceberg/util/meson.build @@ -26,8 +26,10 @@ install_headers( 'decimal.h', 'endian.h', 'error_collector.h', + 'executor.h', 'formattable.h', 'formatter.h', + 'functional.h', 'int128.h', 'lazy.h', 'location_util.h', @@ -37,6 +39,7 @@ install_headers( 'retry_util.h', 'string_util.h', 'struct_like_set.h', + 'task_group.h', 'temporal_util.h', 'timepoint.h', 'transform_util.h', diff --git a/src/iceberg/util/retry_util.cc b/src/iceberg/util/retry_util.cc index d6e0d509e..63cfc61ee 100644 --- a/src/iceberg/util/retry_util.cc +++ b/src/iceberg/util/retry_util.cc @@ -31,6 +31,7 @@ #include "iceberg/util/retry_util_internal.h" namespace iceberg { + namespace { const RetryTestHooks*& ActiveRetryTestHooks() { @@ -79,7 +80,7 @@ void SetActiveRetryTestHooks(const RetryTestHooks* hooks) { ActiveRetryTestHooks() = hooks; } -Status RetryRunner::ValidateConfig() const { +Status detail::RetryRunnerBase::ValidateConfig() const { if (config_.num_retries < 0) { return InvalidArgument("num_retries must be non-negative, got {}", config_.num_retries); @@ -103,48 +104,24 @@ Status RetryRunner::ValidateConfig() const { return InvalidArgument("scale_factor must be finite and at least 1.0, got {}", config_.scale_factor); } - if (retry_policy_mode_ == RetryPolicyMode::kUnset) { - return InvalidArgument( - "Retry policy must be explicitly configured with OnlyRetryOn(...) or " - "StopRetryOn(...) when num_retries > 0"); - } - if (retry_error_kinds_.empty()) { - return InvalidArgument("Retry policy must include at least one error kind"); - } - return {}; } -std::optional RetryRunner::ComputeDeadline() const { +std::optional +detail::RetryRunnerBase::ComputeDeadline() const { if (config_.total_timeout_ms <= 0) { return std::nullopt; } return RetryNow() + Duration(config_.total_timeout_ms); } -bool RetryRunner::HasTimedOut(const std::optional& deadline) const { +bool detail::RetryRunnerBase::HasTimedOut( + const std::optional& deadline) const { return deadline.has_value() && RetryNow() >= *deadline; } -bool RetryRunner::ShouldRetry(ErrorKind kind) const { - const bool policy_contains_kind = std::ranges::contains(retry_error_kinds_, kind); - switch (retry_policy_mode_) { - case RetryPolicyMode::kOnlyRetryOn: - return policy_contains_kind; - case RetryPolicyMode::kStopRetryOn: - return !policy_contains_kind; - case RetryPolicyMode::kUnset: - return false; - } - return false; -} - -bool RetryRunner::CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts, - const std::optional& deadline) const { - return attempt < max_attempts && !HasTimedOut(deadline) && ShouldRetry(kind); -} - -std::optional RetryRunner::RetryDelayWithinBudget( +std::optional +detail::RetryRunnerBase::RetryDelayWithinBudget( int32_t attempt, const std::optional& deadline) const { const auto delay = Duration(CalculateDelay(attempt)); if (!deadline.has_value()) { @@ -164,8 +141,8 @@ std::optional RetryRunner::RetryDelayWithinBudget( return delay; } -bool RetryRunner::WaitForNextAttempt(int32_t attempt, - const std::optional& deadline) const { +bool detail::RetryRunnerBase::WaitForNextAttempt( + int32_t attempt, const std::optional& deadline) const { const auto delay = RetryDelayWithinBudget(attempt, deadline); if (!delay.has_value()) { return false; @@ -175,7 +152,7 @@ bool RetryRunner::WaitForNextAttempt(int32_t attempt, return !HasTimedOut(deadline); } -int32_t RetryRunner::CalculateDelay(int32_t attempt) const { +int32_t detail::RetryRunnerBase::CalculateDelay(int32_t attempt) const { const double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1); const int32_t delay_ms = static_cast( diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h index 83e2cd5c8..656213976 100644 --- a/src/iceberg/util/retry_util.h +++ b/src/iceberg/util/retry_util.h @@ -20,38 +20,22 @@ #pragma once #include -#include #include #include -#include #include #include #include -#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" +#include "iceberg/util/macros.h" namespace iceberg { namespace detail { -template -struct IsResult : std::false_type {}; - -template -struct IsResult> : std::true_type {}; - -template -concept ResultType = IsResult>::value; - template -concept RetryTask = requires(F& f) { - { std::invoke(f) } -> ResultType; -}; - -template -using RetryTaskResult = std::remove_cvref_t>; +concept RetryTask = AsResult>; } // namespace detail @@ -69,76 +53,104 @@ struct ICEBERG_EXPORT RetryConfig { double scale_factor = 2.0; }; -/// \brief Utility class for running tasks with retry logic -/// -/// When retries are enabled (`num_retries > 0`), callers must explicitly configure -/// retry policy with `OnlyRetryOn(...)` or `StopRetryOn(...)`. -class ICEBERG_EXPORT RetryRunner { - public: - /// \brief Construct a RetryRunner with the given configuration - explicit RetryRunner(RetryConfig config = {}) : config_(std::move(config)) {} +namespace detail { - /// \brief Specify error types that should trigger a retry. - /// - /// When set, only errors matching one of these kinds will be retried. - /// All other errors will stop retries immediately. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& OnlyRetryOn(std::initializer_list error_kinds) { - retry_policy_mode_ = RetryPolicyMode::kOnlyRetryOn; - retry_error_kinds_ = std::vector(error_kinds); - return *this; - } +class ICEBERG_EXPORT RetryRunnerBase { + protected: + explicit RetryRunnerBase(RetryConfig config) : config_(std::move(config)) {} - /// \brief Specify a single error type that should trigger a retry. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& OnlyRetryOn(ErrorKind error_kind) { return OnlyRetryOn({error_kind}); } + using Clock = std::chrono::steady_clock; + using Duration = std::chrono::milliseconds; + using TimePoint = Clock::time_point; - /// \brief Specify error types that should stop retries immediately. - /// - /// When set, errors matching one of these kinds will not be retried. - /// All other errors will be retried. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& StopRetryOn(std::initializer_list error_kinds) { - if (retry_policy_mode_ == RetryPolicyMode::kOnlyRetryOn) { - return *this; - } + /// \brief Validate retry counts and timing bounds. + Status ValidateConfig() const; + std::optional ComputeDeadline() const; + bool HasTimedOut(const std::optional& deadline) const; + std::optional RetryDelayWithinBudget( + int32_t attempt, const std::optional& deadline) const; + bool WaitForNextAttempt(int32_t attempt, + const std::optional& deadline) const; + /// \brief Calculate delay with exponential backoff and jitter + int32_t CalculateDelay(int32_t attempt) const; + + RetryConfig config_; +}; + +} // namespace detail + +namespace retry { + +enum class RetryPolicyMode { + kNoRetry, + kOnlyRetryOn, + kStopRetryOn, +}; - retry_policy_mode_ = RetryPolicyMode::kStopRetryOn; - retry_error_kinds_ = std::vector(error_kinds); - return *this; +template +struct RetryPolicy { + static_assert(Mode != RetryPolicyMode::kNoRetry || sizeof...(Kinds) == 0, + "NoRetry must not include error kinds"); + static_assert(Mode == RetryPolicyMode::kNoRetry || sizeof...(Kinds) > 0, + "RetryPolicy must include at least one error kind"); + + static constexpr RetryPolicyMode kMode = Mode; + static constexpr bool kEnabled = Mode != RetryPolicyMode::kNoRetry; + + static constexpr bool ShouldRetry(ErrorKind kind) { + if constexpr (Mode == RetryPolicyMode::kNoRetry) { + return false; + } else if constexpr (Mode == RetryPolicyMode::kOnlyRetryOn) { + return ((kind == Kinds) || ...); + } else { + return !((kind == Kinds) || ...); + } } +}; - /// \brief Specify a single error type that should stop retries immediately. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& StopRetryOn(ErrorKind error_kind) { return StopRetryOn({error_kind}); } +using NoRetry = RetryPolicy; + +template +using OnlyRetryOn = RetryPolicy; + +template +using StopRetryOn = RetryPolicy; + +template +inline constexpr bool kIsRetryPolicy = false; + +template +inline constexpr bool kIsRetryPolicy> = true; + +template +concept Policy = kIsRetryPolicy>; + +} // namespace retry + +/// \brief Utility class for running tasks with retry logic +/// +/// When retries are enabled (`num_retries > 0`), RetryPolicy must be an enabled +/// policy such as `retry::OnlyRetryOn<...>` or `retry::StopRetryOn<...>`. +template +class RetryRunner : private detail::RetryRunnerBase { + public: + /// \brief Construct a RetryRunner with the given configuration + explicit RetryRunner(RetryConfig config = {}) + : detail::RetryRunnerBase(std::move(config)) {} /// \brief Run a task that returns a Result /// - /// When `num_retries > 0`, the retry policy must be configured explicitly via - /// `OnlyRetryOn(...)` or `StopRetryOn(...)`. + /// When `num_retries > 0`, RetryPolicy must allow retrying matching errors. /// /// TODO: Replace attempt_counter with a metrics reporter once it is available. - template - requires detail::RetryTask - auto Run(F&& task, int32_t* attempt_counter = nullptr) -> detail::RetryTaskResult { - using TaskResult = detail::RetryTaskResult; - - const auto validation = ValidateConfig(); - if (!validation.has_value()) { - return TaskResult(std::unexpected(validation.error())); - } + template + auto Run(F&& task, int32_t* attempt_counter = nullptr) + -> std::remove_cvref_t> { + ICEBERG_RETURN_UNEXPECTED(ValidatePolicyConfig()); - const auto deadline = ComputeDeadline(); + const auto deadline = this->ComputeDeadline(); int32_t attempt = 0; - const int32_t max_attempts = config_.num_retries + 1; + const int32_t max_attempts = this->config_.num_retries + 1; while (true) { ++attempt; @@ -155,57 +167,42 @@ class ICEBERG_EXPORT RetryRunner { return result; } - if (!WaitForNextAttempt(attempt, deadline)) { + if (!this->WaitForNextAttempt(attempt, deadline)) { return result; } } } private: - enum class RetryPolicyMode { - // No retry policy was selected; invalid when retries are enabled. - kUnset, - // Retry only errors listed in retry_error_kinds_. - kOnlyRetryOn, - // Retry all errors except those listed in retry_error_kinds_. - kStopRetryOn, - }; + using TimePoint = detail::RetryRunnerBase::TimePoint; - using Clock = std::chrono::steady_clock; - using Duration = std::chrono::milliseconds; - using TimePoint = Clock::time_point; - - /// \brief Validate retry counts, timing bounds, and the selected retry policy. - Status ValidateConfig() const; - std::optional ComputeDeadline() const; - bool HasTimedOut(const std::optional& deadline) const; + Status ValidatePolicyConfig() const { + auto validation = this->ValidateConfig(); + if (!validation.has_value()) { + return validation; + } + if (this->config_.num_retries > 0 && !RetryPolicy::kEnabled) { + return InvalidArgument("Retry policy must be enabled when num_retries > 0"); + } + return {}; + } - /// \brief Check if the given error kind should trigger a retry. - bool ShouldRetry(ErrorKind kind) const; bool CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts, - const std::optional& deadline) const; - std::optional RetryDelayWithinBudget( - int32_t attempt, const std::optional& deadline) const; - bool WaitForNextAttempt(int32_t attempt, - const std::optional& deadline) const; - /// \brief Calculate delay with exponential backoff and jitter - int32_t CalculateDelay(int32_t attempt) const; - - RetryConfig config_; - RetryPolicyMode retry_policy_mode_ = RetryPolicyMode::kUnset; - std::vector retry_error_kinds_; + const std::optional& deadline) const { + return attempt < max_attempts && !this->HasTimedOut(deadline) && + RetryPolicy::ShouldRetry(kind); + } }; /// \brief Helper function to create a RetryRunner with table commit configuration -ICEBERG_EXPORT inline RetryRunner MakeCommitRetryRunner(int32_t num_retries, - int32_t min_wait_ms, - int32_t max_wait_ms, - int32_t total_timeout_ms) { - return RetryRunner(RetryConfig{.num_retries = num_retries, - .min_wait_ms = min_wait_ms, - .max_wait_ms = max_wait_ms, - .total_timeout_ms = total_timeout_ms}) - .OnlyRetryOn(ErrorKind::kCommitFailed); +ICEBERG_EXPORT inline auto MakeCommitRetryRunner(int32_t num_retries, int32_t min_wait_ms, + int32_t max_wait_ms, + int32_t total_timeout_ms) { + return RetryRunner>( + RetryConfig{.num_retries = num_retries, + .min_wait_ms = min_wait_ms, + .max_wait_ms = max_wait_ms, + .total_timeout_ms = total_timeout_ms}); } } // namespace iceberg diff --git a/src/iceberg/util/task_group.cc b/src/iceberg/util/task_group.cc new file mode 100644 index 000000000..cbac305fd --- /dev/null +++ b/src/iceberg/util/task_group.cc @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/task_group.h" + +#include +#include +#include +#include + +#include "iceberg/util/macros.h" + +namespace iceberg::internal { + +namespace { + +Status AggregateTaskErrors(std::vector errors) { + if (errors.empty()) { + return {}; + } + if (errors.size() == 1) { + return std::unexpected(std::move(errors.front())); + } + + ErrorKind kind = errors.front().kind; + std::string message = std::format("Task group failed with {} errors:", errors.size()); + for (const auto& error : errors) { + message += std::format("\n - {}", error.message); + } + return std::unexpected(Error{.kind = kind, .message = std::move(message)}); +} + +Result> SubmitTask(Executor& executor, FnOnce task) { + std::promise promise; + auto future = promise.get_future(); + + ExecutorTask executor_task( + [promise = std::move(promise), task = std::move(task)]() mutable { + promise.set_value(std::move(task)()); + }); + + ICEBERG_RETURN_UNEXPECTED(executor.Submit(std::move(executor_task))); + + return future; +} + +} // namespace + +Status RunTasksSingleThreaded(std::vector> tasks) { + std::vector errors; + for (auto& task : tasks) { + auto status = std::move(task)(); + if (!status.has_value()) { + errors.push_back(std::move(status.error())); + } + } + return AggregateTaskErrors(std::move(errors)); +} + +Status RunTasksParallel(Executor& executor, std::vector> tasks) { + std::vector> futures; + futures.reserve(tasks.size()); + + std::vector errors; + for (auto& task : tasks) { + auto future = SubmitTask(executor, std::move(task)); + if (!future.has_value()) { + errors.push_back(std::move(future.error())); + continue; + } + futures.push_back(std::move(future.value())); + } + + for (auto& future : futures) { + auto status = future.get(); + if (!status.has_value()) { + errors.push_back(std::move(status.error())); + } + } + + return AggregateTaskErrors(std::move(errors)); +} + +} // namespace iceberg::internal diff --git a/src/iceberg/util/task_group.h b/src/iceberg/util/task_group.h new file mode 100644 index 000000000..5263816e3 --- /dev/null +++ b/src/iceberg/util/task_group.h @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/util/executor.h" +#include "iceberg/util/functional.h" +#include "iceberg/util/retry_util.h" + +namespace iceberg { + +namespace internal { + +template +concept OnceStatusTask = RvalueInvocable; + +template +concept RepeatableStatusTask = + std::is_invocable_r_v || + (std::copy_constructible && std::is_invocable_r_v); + +template +concept RetryableStatusTask = std::constructible_from, F> && + RepeatableStatusTask>; + +ICEBERG_EXPORT Status RunTasksSingleThreaded(std::vector> tasks); + +ICEBERG_EXPORT Status RunTasksParallel(Executor& executor, + std::vector> tasks); + +} // namespace internal + +template +class ICEBERG_TEMPLATE_CLASS_EXPORT TaskGroup { + private: + static constexpr bool kRetryEnabled = !std::same_as; + + struct Empty {}; + + using RetryConfigStorage = std::conditional_t; + + public: + TaskGroup() = default; + + explicit TaskGroup(RetryConfig retry_config) + requires(kRetryEnabled) + : retry_config_(std::move(retry_config)) {} + + auto&& SetExecutor(this auto&& self, OptionalExecutor executor) { + self.executor_ = std::move(executor); + return std::forward(self); + } + + template + requires((!kRetryEnabled && internal::OnceStatusTask) || + (kRetryEnabled && internal::RetryableStatusTask)) + auto&& Submit(this auto&& self, F&& task) { + self.tasks_.emplace_back([&] { + if constexpr (!kRetryEnabled) { + return std::forward(task); + } else { + return [retry_config = self.retry_config_, + task = std::forward(task)]() mutable -> Status { + return RetryRunner(retry_config).Run(task); + }; + } + }()); + return std::forward(self); + } + + Status Run() && { + if (!executor_.has_value()) { + return internal::RunTasksSingleThreaded(std::move(tasks_)); + } + return internal::RunTasksParallel(executor_->get(), std::move(tasks_)); + } + + private: + std::vector> tasks_; + OptionalExecutor executor_; + [[no_unique_address]] RetryConfigStorage retry_config_; +}; + +} // namespace iceberg