-
Notifications
You must be signed in to change notification settings - Fork 17
Added support for s3 tables #1617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: antalya-26.1
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,110 @@ | ||
| #include "config.h" | ||
|
|
||
| #if USE_AVRO && USE_SSL && USE_AWS_S3 | ||
|
|
||
| #include <Databases/DataLake/AWSV4Signer.h> | ||
|
|
||
| #include <Common/Exception.h> | ||
| #include <Poco/Net/HTTPRequest.h> | ||
| #include <Poco/String.h> | ||
|
|
||
| #include <aws/core/auth/signer/AWSAuthV4Signer.h> | ||
| #include <aws/core/http/standard/StandardHttpRequest.h> | ||
| #include <aws/core/http/URI.h> | ||
| #include <aws/core/utils/memory/AWSMemory.h> | ||
|
|
||
| #include <sstream> | ||
| #include <utility> | ||
|
|
||
| namespace DB | ||
| { | ||
| namespace ErrorCodes | ||
| { | ||
| extern const int BAD_ARGUMENTS; | ||
| extern const int LOGICAL_ERROR; | ||
| } | ||
| } | ||
|
|
||
| namespace DataLake | ||
| { | ||
| namespace | ||
| { | ||
|
|
||
| Aws::Http::HttpMethod mapPocoMethodToAws(const String & method) | ||
| { | ||
| using Aws::Http::HttpMethod; | ||
| using Poco::Net::HTTPRequest; | ||
|
|
||
| static const std::pair<String, HttpMethod> supported_methods[] = { | ||
| {HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET}, | ||
| {HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST}, | ||
| {HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT}, | ||
| {HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE}, | ||
| {HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD}, | ||
| {HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH}, | ||
| }; | ||
|
|
||
| for (const auto & [poco_method, aws_method] : supported_methods) | ||
| if (method == poco_method) | ||
| return aws_method; | ||
|
|
||
| throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| void signRequestWithAWSV4( | ||
| const String & method, | ||
| const Poco::URI & uri, | ||
| const DB::HTTPHeaderEntries & extra_headers, | ||
| const String & payload, | ||
| Aws::Client::AWSAuthV4Signer & signer, | ||
| const String & region, | ||
| const String & service, | ||
| DB::HTTPHeaderEntries & out_headers) | ||
| { | ||
| const Aws::Http::URI aws_uri(uri.toString().c_str()); | ||
| Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method)); | ||
|
|
||
| for (const auto & h : extra_headers) | ||
| { | ||
| if (Poco::icompare(h.name, "authorization") == 0) | ||
| continue; | ||
| request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size())); | ||
| } | ||
|
|
||
| if (!payload.empty()) | ||
| { | ||
| auto body_stream = Aws::MakeShared<std::stringstream>("AWSV4Signer"); | ||
| body_stream->write(payload.data(), static_cast<std::streamsize>(payload.size())); | ||
| body_stream->seekg(0); | ||
| request.AddContentBody(body_stream); | ||
| } | ||
|
|
||
| static constexpr bool sign_body = true; | ||
| if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body)) | ||
| throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "AWS SigV4 signing failed"); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| bool has_authorization = false; | ||
| for (const auto & [key, value] : request.GetHeaders()) | ||
| { | ||
| if (Poco::icompare(key, "authorization") == 0 && !value.empty()) | ||
| has_authorization = true; | ||
| } | ||
| if (!has_authorization) | ||
| throw DB::Exception( | ||
| DB::ErrorCodes::BAD_ARGUMENTS, | ||
| "AWS credentials are missing or incomplete; cannot sign S3 Tables REST request"); | ||
|
|
||
| out_headers.clear(); | ||
| for (const auto & [key, value] : request.GetHeaders()) | ||
| { | ||
| if (Poco::icompare(key, "host") == 0) | ||
| continue; | ||
| out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size())); | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| #pragma once | ||
|
|
||
| #include "config.h" | ||
|
|
||
| #if USE_AVRO && USE_SSL && USE_AWS_S3 | ||
|
|
||
| #include <Core/Types.h> | ||
| #include <IO/HTTPHeaderEntries.h> | ||
| #include <Poco/URI.h> | ||
|
|
||
| namespace Aws::Client | ||
| { | ||
| class AWSAuthV4Signer; | ||
| } | ||
|
|
||
| namespace DataLake | ||
| { | ||
|
|
||
| /// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer. | ||
| /// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts | ||
| /// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI). | ||
| void signRequestWithAWSV4( | ||
| const String & method, | ||
| const Poco::URI & uri, | ||
| const DB::HTTPHeaderEntries & extra_headers, | ||
| const String & payload, | ||
| Aws::Client::AWSAuthV4Signer & signer, | ||
| const String & region, | ||
| const String & service, | ||
| DB::HTTPHeaderEntries & out_headers); | ||
|
|
||
| } | ||
|
|
||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,9 @@ | |
| #include <Databases/DataLake/RestCatalog.h> | ||
| #include <Databases/DataLake/GlueCatalog.h> | ||
| #include <Databases/DataLake/PaimonRestCatalog.h> | ||
| #if USE_AWS_S3 && USE_SSL | ||
| #include <Databases/DataLake/S3TablesCatalog.h> | ||
| #endif | ||
| #include <DataTypes/DataTypeString.h> | ||
|
|
||
| #include <Storages/ObjectStorage/S3/Configuration.h> | ||
|
|
@@ -90,6 +93,7 @@ namespace Setting | |
| extern const SettingsBool allow_experimental_database_glue_catalog; | ||
| extern const SettingsBool allow_experimental_database_hms_catalog; | ||
| extern const SettingsBool allow_experimental_database_paimon_rest_catalog; | ||
| extern const SettingsBool allow_experimental_database_s3_tables; | ||
| extern const SettingsBool use_hive_partitioning; | ||
| extern const SettingsBool parallel_replicas_for_cluster_engines; | ||
| extern const SettingsString cluster_for_parallel_replicas; | ||
|
|
@@ -137,11 +141,12 @@ DatabaseDataLake::DatabaseDataLake( | |
|
|
||
| void DatabaseDataLake::validateSettings() | ||
| { | ||
| if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE) | ||
| if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE | ||
| || settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| { | ||
| if (settings[DatabaseDataLakeSetting::region].value.empty()) | ||
| throw Exception( | ||
| ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. " | ||
| ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue or S3 Tables catalog. " | ||
| "Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query"); | ||
| } | ||
| else if (settings[DatabaseDataLakeSetting::warehouse].value.empty()) | ||
|
|
@@ -336,6 +341,23 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const | |
| } | ||
| break; | ||
| } | ||
| case DB::DatabaseDataLakeCatalogType::S3_TABLES: | ||
| { | ||
| #if USE_AWS_S3 && USE_SSL | ||
| catalog_impl = std::make_shared<DataLake::S3TablesCatalog>( | ||
| settings[DatabaseDataLakeSetting::warehouse].value, | ||
| url, | ||
| settings[DatabaseDataLakeSetting::region].value, | ||
| catalog_parameters, | ||
| settings[DatabaseDataLakeSetting::namespaces].value, | ||
| Context::getGlobalContextInstance()); | ||
| #else | ||
| throw Exception( | ||
| ErrorCodes::SUPPORT_IS_DISABLED, | ||
| "Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL"); | ||
| #endif | ||
| break; | ||
| } | ||
| } | ||
| return catalog_impl; | ||
| } | ||
|
|
@@ -368,6 +390,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( | |
| case DatabaseDataLakeCatalogType::ICEBERG_HIVE: | ||
| case DatabaseDataLakeCatalogType::ICEBERG_REST: | ||
| case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: | ||
| case DatabaseDataLakeCatalogType::S3_TABLES: | ||
| { | ||
| switch (type) | ||
| { | ||
|
|
@@ -962,9 +985,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory) | |
| throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); | ||
| } | ||
|
|
||
| if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST) | ||
| if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST | ||
| && catalog_type != DatabaseDataLakeCatalogType::S3_TABLES) | ||
| { | ||
| throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only"); | ||
| throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only"); | ||
| } | ||
|
|
||
| for (auto & engine_arg : engine_args) | ||
|
|
@@ -1050,6 +1074,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory) | |
| engine_func->name = "Paimon"; | ||
| break; | ||
| } | ||
| case DatabaseDataLakeCatalogType::S3_TABLES: | ||
| { | ||
| if (!args.create_query.attach | ||
| && !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables]) | ||
| { | ||
| throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, | ||
| "DatabaseDataLake with S3 Tables catalog is experimental. " | ||
| "To allow its usage, enable setting allow_experimental_database_s3_tables"); | ||
| } | ||
|
|
||
| engine_func->name = "Iceberg"; | ||
| break; | ||
| } | ||
| case DatabaseDataLakeCatalogType::NONE: | ||
| break; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Perhaps name it like
signHeadersas the output of this method is a list of headers and not a request?