Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6899,6 +6899,14 @@ Possible values:
- `` (empty value) - use session timezone

Default value is `UTC`.
)", 0) \
DECLARE(UInt64, export_merge_tree_part_max_bytes_per_file, 0, R"(
Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit.
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
)", 0) \
DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"(
Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit.
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
)", 0) \
\
/* ####################################################### */ \
Expand Down
4 changes: 3 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/PartLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ ColumnsDescription PartLogElement::getColumnsDescription()
{"part_type", std::make_shared<DataTypeString>(), "The type of the part. Possible values: Wide and Compact."},
{"disk_name", std::make_shared<DataTypeString>(), "The disk name data part lies on."},
{"path_on_disk", std::make_shared<DataTypeString>(), "Absolute path to the folder with data part files."},
{"remote_file_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "In case of an export operation to remote storages, the file paths a given export generated"},

{"rows", std::make_shared<DataTypeUInt64>(), "The number of rows in the data part."},
{"size_in_bytes", std::make_shared<DataTypeUInt64>(), "Size of the data part on disk in bytes."},
Expand Down Expand Up @@ -187,6 +188,12 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(disk_name);
columns[i++]->insert(path_on_disk);

Array remote_file_paths_array;
remote_file_paths_array.reserve(remote_file_paths.size());
for (const auto & remote_file_path : remote_file_paths)
remote_file_paths_array.push_back(remote_file_path);
columns[i++]->insert(remote_file_paths_array);

columns[i++]->insert(rows);
columns[i++]->insert(bytes_compressed_on_disk);

Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/PartLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct PartLogElement
String partition;
String disk_name;
String path_on_disk;
std::vector<String> remote_file_paths;

MergeTreeDataPartType part_type;

Expand Down
4 changes: 3 additions & 1 deletion src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,10 @@ It is currently only implemented in StorageObjectStorage.
virtual SinkToStoragePtr import(
const std::string & /* file_name */,
Block & /* block_with_partition_values */,
std::string & /* destination_file_path */,
std::function<void(const std::string &)> /* new_file_path_callback */,
bool /* overwrite_if_exists */,
std::size_t /* max_bytes_per_file */,
std::size_t /* max_rows_per_file */,
const std::optional<FormatSettings> & /* format_settings */,
ContextPtr /* context */)
{
Expand Down
17 changes: 11 additions & 6 deletions src/Storages/MergeTree/ExportList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ExportsListElement::ExportsListElement(
const StorageID & destination_table_id_,
UInt64 part_size_,
const String & part_name_,
const String & target_file_name_,
const std::vector<String> & destination_file_paths_,
UInt64 total_rows_to_read_,
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
Expand All @@ -18,7 +18,7 @@ ExportsListElement::ExportsListElement(
, destination_table_id(destination_table_id_)
, part_size(part_size_)
, part_name(part_name_)
, destination_file_path(target_file_name_)
, destination_file_paths(destination_file_paths_)
, total_rows_to_read(total_rows_to_read_)
, total_size_bytes_compressed(total_size_bytes_compressed_)
, total_size_bytes_uncompressed(total_size_bytes_uncompressed_)
Expand All @@ -40,16 +40,21 @@ ExportInfo ExportsListElement::getInfo() const
res.destination_database = destination_table_id.database_name;
res.destination_table = destination_table_id.table_name;
res.part_name = part_name;
res.destination_file_path = destination_file_path;
res.rows_read = rows_read;

{
std::shared_lock lock(destination_file_paths_mutex);
res.destination_file_paths = destination_file_paths;
}

res.rows_read = rows_read.load(std::memory_order_relaxed);
res.total_rows_to_read = total_rows_to_read;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_bytes_uncompressed = total_size_bytes_uncompressed;
res.bytes_read_uncompressed = bytes_read_uncompressed;
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);
res.memory_usage = getMemoryUsage();
res.peak_memory_usage = getPeakMemoryUsage();
res.create_time = create_time;
res.elapsed = elapsed;
res.elapsed = watch.elapsedSeconds();
return res;
}

Expand Down
15 changes: 9 additions & 6 deletions src/Storages/MergeTree/ExportList.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Common/ThreadStatus.h>
#include <Poco/URI.h>
#include <boost/noncopyable.hpp>
#include <shared_mutex>

namespace CurrentMetrics
{
Expand All @@ -23,7 +24,7 @@ struct ExportInfo
String destination_database;
String destination_table;
String part_name;
String destination_file_path;
std::vector<String> destination_file_paths;
UInt64 rows_read;
UInt64 total_rows_to_read;
UInt64 total_size_bytes_compressed;
Expand All @@ -41,24 +42,26 @@ struct ExportsListElement : private boost::noncopyable
const StorageID destination_table_id;
const UInt64 part_size;
const String part_name;
String destination_file_path;
UInt64 rows_read {0};

/// see destination_file_paths_mutex
std::vector<String> destination_file_paths;
std::atomic<UInt64> rows_read {0};
UInt64 total_rows_to_read {0};
UInt64 total_size_bytes_compressed {0};
UInt64 total_size_bytes_uncompressed {0};
UInt64 bytes_read_uncompressed {0};
std::atomic<UInt64> bytes_read_uncompressed {0};
time_t create_time {0};
Float64 elapsed {0};

Stopwatch watch;
ThreadGroupPtr thread_group;
mutable std::shared_mutex destination_file_paths_mutex;

ExportsListElement(
const StorageID & source_table_id_,
const StorageID & destination_table_id_,
UInt64 part_size_,
const String & part_name_,
const String & destination_file_path_,
const std::vector<String> & destination_file_paths_,
UInt64 total_rows_to_read_,
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
Expand Down
Loading
Loading