diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index ab20ef00e452..6ba195bfa8c7 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7710,6 +7710,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'. )", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"( Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest' +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_database_s3_tables, false, R"( +Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4) )", EXPERIMENTAL) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 643b5ccbb7cf..9cb8af42703d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -63,6 +63,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"optimize_read_in_window_order", true, false, "Disable this logic by default."}, {"correlated_subqueries_use_in_memory_buffer", false, true, "Use in-memory buffer for input of correlated subqueries by default."}, {"allow_experimental_database_paimon_rest_catalog", false, false, "New setting"}, + {"allow_experimental_database_s3_tables", false, false, "New setting"}, {"allow_experimental_object_storage_queue_hive_partitioning", false, false, "New setting."}, {"type_json_use_partial_match_to_skip_paths_by_regexp", false, true, "Add new setting that allows to use partial match in regexp paths skip in JSON type parsing"}, {"max_insert_block_size_bytes", 0, 0, "New setting that allows to control the size of blocks in bytes during parsing of data in Row Input Format."}, diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 19d47df4a88d..2c9cc986e4f2 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -346,7 +346,8 @@ IMPLEMENT_SETTING_ENUM( {"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE}, {"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE}, {"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE}, - {"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}}) + {"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}, + {"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}}) IMPLEMENT_SETTING_ENUM( FileCachePolicy, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 05c3b26340f4..8160bf4ee9df 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -439,6 +439,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t ICEBERG_ONELAKE, ICEBERG_BIGLAKE, PAIMON_REST, + S3_TABLES, }; DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType) diff --git a/src/Databases/DataLake/AWSV4Signer.cpp b/src/Databases/DataLake/AWSV4Signer.cpp new file mode 100644 index 000000000000..b063031188a5 --- /dev/null +++ b/src/Databases/DataLake/AWSV4Signer.cpp @@ -0,0 +1,110 @@ +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; +} +} + +namespace DataLake +{ +namespace +{ + +Aws::Http::HttpMethod mapPocoMethodToAws(const String & method) +{ + using Aws::Http::HttpMethod; + using Poco::Net::HTTPRequest; + + static const std::pair supported_methods[] = { + {HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET}, + {HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST}, + {HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT}, + {HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE}, + {HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD}, + {HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH}, + }; + + for (const auto & [poco_method, aws_method] : supported_methods) + if (method == poco_method) + return aws_method; + + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method); +} + +} + +void signRequestWithAWSV4( + const String & method, + const Poco::URI & uri, + const DB::HTTPHeaderEntries & extra_headers, + const String & payload, + Aws::Client::AWSAuthV4Signer & signer, + const String & region, + const String & service, + DB::HTTPHeaderEntries & out_headers) +{ + const Aws::Http::URI aws_uri(uri.toString().c_str()); + Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method)); + + for (const auto & h : extra_headers) + { + if (Poco::icompare(h.name, "authorization") == 0) + continue; + request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size())); + } + + if (!payload.empty()) + { + auto body_stream = Aws::MakeShared("AWSV4Signer"); + body_stream->write(payload.data(), static_cast(payload.size())); + body_stream->seekg(0); + request.AddContentBody(body_stream); + } + + static constexpr bool sign_body = true; + if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body)) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "AWS SigV4 signing failed"); + + bool has_authorization = false; + for (const auto & [key, value] : request.GetHeaders()) + { + if (Poco::icompare(key, "authorization") == 0 && !value.empty()) + has_authorization = true; + } + if (!has_authorization) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "AWS credentials are missing or incomplete; cannot sign S3 Tables REST request"); + + out_headers.clear(); + for (const auto & [key, value] : request.GetHeaders()) + { + if (Poco::icompare(key, "host") == 0) + continue; + out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size())); + } +} + +} + +#endif diff --git a/src/Databases/DataLake/AWSV4Signer.h b/src/Databases/DataLake/AWSV4Signer.h new file mode 100644 index 000000000000..cdc42adaca5f --- /dev/null +++ b/src/Databases/DataLake/AWSV4Signer.h @@ -0,0 +1,34 @@ +#pragma once + +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include +#include + +namespace Aws::Client +{ +class AWSAuthV4Signer; +} + +namespace DataLake +{ + +/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer. +/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts +/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI). +void signRequestWithAWSV4( + const String & method, + const Poco::URI & uri, + const DB::HTTPHeaderEntries & extra_headers, + const String & payload, + Aws::Client::AWSAuthV4Signer & signer, + const String & region, + const String & service, + DB::HTTPHeaderEntries & out_headers); + +} + +#endif diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 5bee6258e1c6..53b3d096cc4b 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -27,6 +27,9 @@ #include #include #include +#if USE_AWS_S3 && USE_SSL +#include +#endif #include #include @@ -90,6 +93,7 @@ namespace Setting extern const SettingsBool allow_experimental_database_glue_catalog; extern const SettingsBool allow_experimental_database_hms_catalog; extern const SettingsBool allow_experimental_database_paimon_rest_catalog; + extern const SettingsBool allow_experimental_database_s3_tables; extern const SettingsBool use_hive_partitioning; extern const SettingsBool parallel_replicas_for_cluster_engines; extern const SettingsString cluster_for_parallel_replicas; @@ -137,11 +141,12 @@ DatabaseDataLake::DatabaseDataLake( void DatabaseDataLake::validateSettings() { - if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE) + if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE + || settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES) { if (settings[DatabaseDataLakeSetting::region].value.empty()) throw Exception( - ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. " + ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue or S3 Tables catalog. " "Please specify 'SETTINGS region=' in the CREATE DATABASE query"); } else if (settings[DatabaseDataLakeSetting::warehouse].value.empty()) @@ -336,6 +341,23 @@ std::shared_ptr DatabaseDataLake::getCatalog() const } break; } + case DB::DatabaseDataLakeCatalogType::S3_TABLES: + { +#if USE_AWS_S3 && USE_SSL + catalog_impl = std::make_shared( + settings[DatabaseDataLakeSetting::warehouse].value, + url, + settings[DatabaseDataLakeSetting::region].value, + catalog_parameters, + settings[DatabaseDataLakeSetting::namespaces].value, + Context::getGlobalContextInstance()); +#else + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL"); +#endif + break; + } } return catalog_impl; } @@ -368,6 +390,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( case DatabaseDataLakeCatalogType::ICEBERG_HIVE: case DatabaseDataLakeCatalogType::ICEBERG_REST: case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: + case DatabaseDataLakeCatalogType::S3_TABLES: { switch (type) { @@ -962,9 +985,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); } - if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST) + if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST + && catalog_type != DatabaseDataLakeCatalogType::S3_TABLES) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only"); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only"); } for (auto & engine_arg : engine_args) @@ -1050,6 +1074,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory) engine_func->name = "Paimon"; break; } + case DatabaseDataLakeCatalogType::S3_TABLES: + { + if (!args.create_query.attach + && !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "DatabaseDataLake with S3 Tables catalog is experimental. " + "To allow its usage, enable setting allow_experimental_database_s3_tables"); + } + + engine_func->name = "Iceberg"; + break; + } case DatabaseDataLakeCatalogType::NONE: break; } diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index e2170c038e52..a135580f29b6 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -257,6 +257,11 @@ bool TableMetadata::hasStorageCredentials() const return storage_credentials != nullptr; } +bool TableMetadata::hasDataLakeSpecificProperties() const +{ + return data_lake_specific_metadata.has_value(); +} + std::string TableMetadata::getMetadataLocation(const std::string & iceberg_metadata_file_location) const { std::string metadata_location = iceberg_metadata_file_location; diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 17170436898d..88bb84b0776c 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -148,7 +148,7 @@ class RestCatalog : public ICatalog, public DB::WithContext Poco::Net::HTTPBasicCredentials credentials{}; - DB::ReadWriteBufferFromHTTPPtr createReadBuffer( + virtual DB::ReadWriteBufferFromHTTPPtr createReadBuffer( const std::string & endpoint, const Poco::URI::QueryParameters & params = {}, const DB::HTTPHeaderEntries & headers = {}) const; @@ -183,7 +183,7 @@ class RestCatalog : public ICatalog, public DB::WithContext AccessToken retrieveAccessTokenOAuth() const; static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); - void sendRequest( + virtual void sendRequest( const String & endpoint, Poco::JSON::Object::Ptr request_body, const String & method = Poco::Net::HTTPRequest::HTTP_POST, diff --git a/src/Databases/DataLake/S3TablesCatalog.cpp b/src/Databases/DataLake/S3TablesCatalog.cpp new file mode 100644 index 000000000000..7ae022b9a2d1 --- /dev/null +++ b/src/Databases/DataLake/S3TablesCatalog.cpp @@ -0,0 +1,283 @@ +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace DB::ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +namespace DB::Setting +{ + extern const SettingsUInt64 s3_max_connections; + extern const SettingsUInt64 s3_max_redirects; + extern const SettingsUInt64 s3_retry_attempts; + extern const SettingsBool s3_slow_all_threads_after_network_error; + extern const SettingsBool enable_s3_requests_logging; + extern const SettingsUInt64 s3_connect_timeout_ms; + extern const SettingsUInt64 s3_request_timeout_ms; +} + +namespace DB::ServerSetting +{ + extern const ServerSettingsUInt64 s3_max_redirects; + extern const ServerSettingsUInt64 s3_retry_attempts; +} + +namespace DataLake +{ + +S3TablesCatalog::S3TablesCatalog( + const String & warehouse_, + const String & base_url_, + const String & region_, + const CatalogSettings & catalog_settings_, + const String & namespaces_, + DB::ContextPtr context_) + : RestCatalog(warehouse_, base_url_, "", "", false, namespaces_, context_) + , region(region_) + , signing_service("s3tables") +{ + if (region.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Tables catalog requires non-empty `region` setting"); + + DB::S3::CredentialsConfiguration creds_config; + creds_config.use_environment_credentials = true; + creds_config.role_arn = catalog_settings_.aws_role_arn; + creds_config.role_session_name = catalog_settings_.aws_role_session_name; + + const auto & server_settings = getContext()->getGlobalContext()->getServerSettings(); + const DB::Settings & global_settings = getContext()->getGlobalContext()->getSettingsRef(); + + int s3_max_redirects = static_cast(server_settings[DB::ServerSetting::s3_max_redirects]); + if (global_settings.isChanged("s3_max_redirects")) + s3_max_redirects = static_cast(global_settings[DB::Setting::s3_max_redirects]); + + int s3_retry_attempts = static_cast(server_settings[DB::ServerSetting::s3_retry_attempts]); + if (global_settings.isChanged("s3_retry_attempts")) + s3_retry_attempts = static_cast(global_settings[DB::Setting::s3_retry_attempts]); + + bool s3_slow_all_threads_after_network_error = global_settings[DB::Setting::s3_slow_all_threads_after_network_error]; + bool s3_slow_all_threads_after_retryable_error = false; + bool enable_s3_requests_logging = global_settings[DB::Setting::enable_s3_requests_logging]; + + DB::S3::PocoHTTPClientConfiguration poco_config = DB::S3::ClientFactory::instance().createClientConfiguration( + region, + getContext()->getRemoteHostFilter(), + s3_max_redirects, + DB::S3::PocoHTTPClientConfiguration::RetryStrategy{.max_retries = static_cast(s3_retry_attempts)}, + s3_slow_all_threads_after_network_error, + s3_slow_all_threads_after_retryable_error, + enable_s3_requests_logging, + /* for_disk_s3 = */ false, + /* opt_disk_name = */ {}, + /* request_throttler = */ {}); + + Aws::Auth::AWSCredentials credentials(catalog_settings_.aws_access_key_id, catalog_settings_.aws_secret_access_key); + credentials_provider = DB::S3::getCredentialsProvider(poco_config, credentials, creds_config); + + signer = std::make_unique( + credentials_provider, + "s3tables", + Aws::String(region.data(), region.size()), + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always, + /* urlEscapePath = */ false); + + config = loadConfig(); + + if (config.prefix.empty()) + { + String encoded_warehouse; + Poco::URI::encode(warehouse_, "", encoded_warehouse); + config.prefix = encoded_warehouse; + } +} + +DB::Names S3TablesCatalog::getTables() const +{ + auto namespaces = getNamespaces(""); + + auto & pool = getContext()->getIcebergCatalogThreadpool(); + DB::ThreadPoolCallbackRunnerLocal runner(pool, DB::ThreadName::DATALAKE_REST_CATALOG); + + DB::Names tables; + std::mutex mutex; + for (const auto & ns : namespaces) + { + if (!allowed_namespaces.isNamespaceAllowed(ns, /*nested*/ false)) + continue; + runner.enqueueAndKeepTrack( + [&, ns] + { + auto tables_in_ns = RestCatalog::getTables(ns); + std::lock_guard lock(mutex); + std::move(tables_in_ns.begin(), tables_in_ns.end(), std::back_inserter(tables)); + }); + } + runner.waitForAllToFinishAndRethrowFirstError(); + return tables; +} + +bool S3TablesCatalog::tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const +{ + if (!RestCatalog::tryGetTableMetadata(namespace_name, table_name, context_, result)) + return false; + + if (!result.requiresCredentials()) + return true; + + bool need_credentials = !result.hasStorageCredentials() || !result.getStorageCredentials(); + if (!need_credentials) + { + auto creds = std::dynamic_pointer_cast(result.getStorageCredentials()); + if (creds && creds->isEmpty()) + need_credentials = true; + } + + if (need_credentials) + { + LOG_DEBUG(log, "S3 Tables: no vended credentials for {}.{}, injecting catalog IAM credentials", namespace_name, table_name); + auto aws_creds = credentials_provider->GetAWSCredentials(); + result.setStorageCredentials(std::make_shared( + String(aws_creds.GetAWSAccessKeyId().c_str(), aws_creds.GetAWSAccessKeyId().size()), + String(aws_creds.GetAWSSecretKey().c_str(), aws_creds.GetAWSSecretKey().size()), + String(aws_creds.GetSessionToken().c_str(), aws_creds.GetSessionToken().size()))); + } + + if (result.getEndpoint().empty()) + { + String regional_endpoint = "https://s3." + region + ".amazonaws.com"; + LOG_DEBUG(log, "S3 Tables: no s3.endpoint for {}.{}, injecting regional endpoint: {}", namespace_name, table_name, regional_endpoint); + result.setEndpoint(regional_endpoint); + } + + if (result.hasDataLakeSpecificProperties()) + { + auto props = result.getDataLakeSpecificProperties(); + if (props.has_value() && !props->iceberg_metadata_file_location.empty()) + { + const String & loc = props->iceberg_metadata_file_location; + auto scheme_end = loc.find("://"); + if (scheme_end != String::npos) + { + auto path_start = loc.find('/', scheme_end + 3); + if (path_start != String::npos) + props->iceberg_metadata_file_location = loc.substr(path_start + 1); + } + result.setDataLakeSpecificProperties(std::move(props)); + } + } + + return true; +} + +DB::HTTPHeaderEntries S3TablesCatalog::getAuthHeaders(bool /* update_token */) const +{ + return {}; +} + +DB::ReadWriteBufferFromHTTPPtr S3TablesCatalog::createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params, + const DB::HTTPHeaderEntries & headers) const +{ + const auto & context = getContext(); + + Poco::URI url(base_url / endpoint, /* enable_url_encoding */ false); + if (!params.empty()) + url.setQueryParameters(params); + + DB::HTTPHeaderEntries signed_headers; + signRequestWithAWSV4(Poco::Net::HTTPRequest::HTTP_GET, url, headers, "", *signer, region, signing_service, signed_headers); + + LOG_DEBUG(log, "Requesting: {}", url.toString()); + + return DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withSettings(getContext()->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&getContext()->getRemoteHostFilter()) + .withHeaders(signed_headers) + .withDelayInit(false) + .withSkipNotFound(false) + .create(credentials); +} + +void S3TablesCatalog::sendRequest( + const String & endpoint, + Poco::JSON::Object::Ptr request_body, + const String & method, + bool ignore_result) const +{ + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + if (request_body) + request_body->stringify(oss); + const std::string body_str = DB::removeEscapedSlashes(oss.str()); + + DB::HTTPHeaderEntries extra_headers; + if (!body_str.empty()) + extra_headers.emplace_back("Content-Type", "application/json"); + + const auto & context = getContext(); + + Poco::URI url(endpoint, /* enable_url_encoding */ false); + + DB::HTTPHeaderEntries signed_headers; + signRequestWithAWSV4(method, url, extra_headers, body_str, *signer, region, signing_service, signed_headers); + + DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback; + if (!body_str.empty()) + { + out_stream_callback = [body_str](std::ostream & os) { os << body_str; }; + } + + auto wb = DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withMethod(method) + .withSettings(context->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&context->getRemoteHostFilter()) + .withHeaders(signed_headers) + .withOutCallback(out_stream_callback) + .withSkipNotFound(false) + .create(credentials); + + String response_str; + if (!ignore_result) + readJSONObjectPossiblyInvalid(response_str, *wb); + else + wb->ignoreAll(); +} + +} + +#endif diff --git a/src/Databases/DataLake/S3TablesCatalog.h b/src/Databases/DataLake/S3TablesCatalog.h new file mode 100644 index 000000000000..2d6e54712e59 --- /dev/null +++ b/src/Databases/DataLake/S3TablesCatalog.h @@ -0,0 +1,68 @@ +#pragma once + +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include + +#include + +#include + +namespace Aws::Auth +{ +class AWSCredentialsProvider; +} + +namespace DataLake +{ + +/// Iceberg REST catalog for Amazon S3 Tables (SigV4, signing name `s3tables`). +/// https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-open-source.html +class S3TablesCatalog final : public RestCatalog +{ +public: + S3TablesCatalog( + const String & warehouse_, + const String & base_url_, + const String & region_, + const DataLake::CatalogSettings & catalog_settings_, + const String & namespaces_, + DB::ContextPtr context_); + + DB::DatabaseDataLakeCatalogType getCatalogType() const override { return DB::DatabaseDataLakeCatalogType::S3_TABLES; } + + DB::Names getTables() const override; + + bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const override; + + DB::ReadWriteBufferFromHTTPPtr createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params = {}, + const DB::HTTPHeaderEntries & headers = {}) const override; + + void sendRequest( + const String & endpoint, + Poco::JSON::Object::Ptr request_body, + const String & method = Poco::Net::HTTPRequest::HTTP_POST, + bool ignore_result = false) const override; + +protected: + DB::HTTPHeaderEntries getAuthHeaders(bool /* update_token */) const override; + +private: + const String region; + const String signing_service; + std::shared_ptr credentials_provider; + std::unique_ptr signer; +}; + +} + +#endif diff --git a/src/Databases/DataLake/StorageCredentials.h b/src/Databases/DataLake/StorageCredentials.h index ab09e3420889..5e261b9bb3a4 100644 --- a/src/Databases/DataLake/StorageCredentials.h +++ b/src/Databases/DataLake/StorageCredentials.h @@ -31,6 +31,8 @@ class S3Credentials final : public IStorageCredentials , session_token(session_token_) {} + bool isEmpty() const { return access_key_id.empty() || secret_access_key.empty(); } + void addCredentialsToEngineArgs(DB::ASTs & engine_args) const override { if (engine_args.size() != 1) diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index 2ef37686b3d2..64bac904b3da 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -64,6 +64,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("allow_dynamic_type_in_join_keys", 1); context->setSetting("allow_experimental_alias_table_engine", 1); context->setSetting("allow_experimental_database_paimon_rest_catalog", 1); + context->setSetting("allow_experimental_database_s3_tables", 1); context->setSetting("allow_experimental_object_storage_queue_hive_partitioning", 1); /// clickhouse-private settings