Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ class IDataLakeMetadata : boost::noncopyable
throwNotImplemented(fmt::format("EXECUTE {}", command_name));
}

virtual bool supportsTruncate() const { return false; }
virtual void truncate(ContextPtr /*context*/, std::shared_ptr<DataLake::ICatalog> /*catalog*/, const StorageID & /*storage_id*/)
{
throwNotImplemented("truncate");
}

virtual void drop(ContextPtr) { }

protected:
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ DEFINE_ICEBERG_FIELD_ALIAS(partition_spec, partition-spec);
DEFINE_ICEBERG_FIELD_ALIAS(partition_specs, partition-specs);
DEFINE_ICEBERG_FIELD_ALIAS(spec_id, spec-id);
DEFINE_ICEBERG_FIELD_ALIAS(added_records, added-records);
DEFINE_ICEBERG_FIELD_ALIAS(deleted_records, deleted-records);
DEFINE_ICEBERG_FIELD_ALIAS(added_data_files, added-data-files);
DEFINE_ICEBERG_FIELD_ALIAS(deleted_data_files, deleted-data-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_delete_files, added-delete-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_position_delete_files, added-position-delete-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_position_deletes, added-position-deletes);
Expand Down
104 changes: 101 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergTableStateSnapshot.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h>
Expand Down Expand Up @@ -108,6 +109,7 @@ extern const int ICEBERG_SPECIFICATION_VIOLATION;
extern const int S3_ERROR;
extern const int TABLE_ALREADY_EXISTS;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_DATA;
}

namespace Setting
Expand Down Expand Up @@ -261,15 +263,17 @@ void IcebergMetadata::backgroundMetadataPrefetcherThread()
/// first, we fetch the latest metadata version and cache it;
/// as a part of the same method, we download metadata.json of the latest metadata version
/// and after parsing it, we fetch manifest lists, parse and cache them
auto ctx = Context::getGlobalContextInstance()->getBackgroundContext();
auto ctx = Context::createCopy(Context::getGlobalContextInstance());
auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(ctx, true);
if (actual_data_snapshot)
{
for (const auto & entry : actual_data_snapshot->manifest_list_entries)
{
/// second, we fetch, parse and cache each manifest file
auto manifest_file_ptr = getManifestFileEntriesHandle(
object_storage, persistent_components, ctx, log, entry, actual_table_state_snapshot.schema_id);
auto manifest_file_ptr = Iceberg::getManifestFile(
object_storage, persistent_components, ctx, log,
entry.manifest_file_path,
entry.manifest_file_byte_size);
}
}

Expand Down Expand Up @@ -595,6 +599,100 @@ void IcebergMetadata::mutate(
);
}

void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id)
{
if (!context->getSettingsRef()[Setting::allow_insert_into_iceberg].value)
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Iceberg truncate requires the allow_insert_into_iceberg setting to be enabled.");

auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
auto metadata_object = getMetadataJSONObject(
actual_table_state_snapshot.metadata_file_path,
object_storage,
persistent_components.metadata_cache,
context,
log,
persistent_components.metadata_compression_method,
persistent_components.table_uuid);

// Use -1 as the Iceberg spec sentinel for "no parent snapshot"
// (distinct from snapshot ID 0 which is a valid snapshot).
Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(-1);

auto config_path = persistent_components.table_path;
if (!config_path.starts_with('/')) config_path = '/' + config_path;
if (!config_path.ends_with('/')) config_path += "/";

bool is_transactional = (catalog != nullptr && catalog->isTransactional());

// Transactional catalogs (e.g. REST) require a fully-qualified blob URI
// (scheme://bucket/path) so the catalog can resolve the metadata location
// independently of any local path configuration. Non-transactional catalogs
// use bare paths relative to the object storage root.
FileNamesGenerator filename_generator;
if (is_transactional || context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata])
{
String location = metadata_object->getValue<String>(Iceberg::f_location);
if (!location.ends_with("/")) location += "/";
filename_generator = FileNamesGenerator(
location, config_path, is_transactional,
persistent_components.metadata_compression_method, write_format);
}
else
{
filename_generator = FileNamesGenerator(
config_path, config_path, false,
persistent_components.metadata_compression_method, write_format);
}

Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1;
filename_generator.setVersion(new_metadata_version);

auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName();

auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata(
filename_generator, metadata_name, parent_snapshot_id,
/* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0,
/* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0,
std::nullopt, std::nullopt, /*is_truncate=*/true);

auto write_settings = context->getWriteSettings();
auto buf = object_storage->writeObject(
StoredObject(storage_manifest_list_name),
WriteMode::Rewrite, std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE, write_settings);

generateManifestList(filename_generator, metadata_object, object_storage,
context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, /*use_previous_snapshots=*/false);
buf->finalize();

String metadata_content = dumpMetadataObjectToString(metadata_object);
writeMessageToFile(metadata_content, storage_metadata_name, object_storage,
context, "*", "", persistent_components.metadata_compression_method);

if (catalog)
{
// Transactional catalogs require a fully-qualified blob URI so the catalog
// can resolve the metadata location independently of local path configuration.
String catalog_filename = metadata_name;
if (is_transactional)
{
// Build full URI from the table's location field (e.g. "s3://bucket/namespace.table")
// combined with the relative metadata name.
String location = metadata_object->getValue<String>(Iceberg::f_location);
if (!location.ends_with("/")) location += "/";
catalog_filename = location + metadata_name;
}

const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName());
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
throw Exception(ErrorCodes::INCORRECT_DATA,
"Failed to commit Iceberg truncate update to catalog.");
}
}


void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands)
{
for (const auto & command : commands)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class IcebergMetadata : public IDataLakeMetadata
bool supportsUpdate() const override { return true; }
bool supportsWrites() const override { return true; }
bool supportsParallelInsert() const override { return true; }
bool supportsTruncate() const override { return true; }
void truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) override;

IcebergHistory getHistory(ContextPtr local_context) const;

Expand Down
57 changes: 57 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,31 @@ void generateManifestFile(
writer.close();
}

// Avro uses zigzag encoding for integers to efficiently represent small negative
// numbers. Positive n maps to 2n, negative n maps to 2(-n)-1, keeping small
// magnitudes compact regardless of sign. The value is then serialized as a
// variable-length base-128 integer (little-endian), where the high bit of each
// byte signals whether more bytes follow.
// See: https://avro.apache.org/docs/1.11.1/specification/#binary-encoding
static void writeAvroLong(WriteBuffer & out, int64_t val)
{
uint64_t n = (static_cast<uint64_t>(val) << 1) ^ static_cast<uint64_t>(val >> 63);
while (n & ~0x7fULL)
{
char c = static_cast<char>((n & 0x7f) | 0x80);
out.write(&c, 1);
n >>= 7;
}
char c = static_cast<char>(n);
out.write(&c, 1);
}

static void writeAvroBytes(WriteBuffer & out, const String & s)
{
writeAvroLong(out, static_cast<int64_t>(s.size()));
out.write(s.data(), s.size());
}

void generateManifestList(
const FileNamesGenerator & filename_generator,
Poco::JSON::Object::Ptr metadata,
Expand All @@ -451,6 +476,38 @@ void generateManifestList(
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown iceberg version {}", version);

// For empty manifest list (e.g. TRUNCATE), write a valid Avro container
// file manually so we can embed the full schema JSON with field-ids intact,
// without triggering the DataFileWriter constructor's eager writeHeader()
// which commits encoder state before we can override avro.schema.
if (manifest_entry_names.empty() && !use_previous_snapshots)
{
// For an empty manifest list (e.g. after TRUNCATE), we write a minimal valid
// Avro Object Container File manually rather than using avro::DataFileWriter.
// The reason: DataFileWriter calls writeHeader() eagerly in its constructor,
// committing the binary encoder state. Post-construction setMetadata() calls
// corrupt StreamWriter::next_ causing a NULL dereference on close(). Writing
// the OCF header directly ensures the full schema JSON (with Iceberg field-ids)
// is embedded intact — the Avro C++ library strips unknown field properties
// like field-id during schema node serialization.
// Avro OCF format: [magic(4)] [metadata_map] [sync_marker(16)] [no data blocks]
buf.write("Obj\x01", 4);

writeAvroLong(buf, 2); // 2 metadata entries
writeAvroBytes(buf, "avro.codec");
writeAvroBytes(buf, "null");
writeAvroBytes(buf, "avro.schema");
writeAvroBytes(buf, schema_representation); // full JSON with field-ids intact

writeAvroLong(buf, 0); // end of metadata map

static const char sync_marker[16] = {};
buf.write(sync_marker, 16);

buf.finalize();
return;
}

auto schema = avro::compileJsonSchemaFromString(schema_representation); // NOLINT

auto adapter = std::make_unique<OutputStreamWriteBufferAdapter>(buf);
Expand Down
21 changes: 18 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(
Int64 added_delete_files,
Int64 num_deleted_rows,
std::optional<Int64> user_defined_snapshot_id,
std::optional<Int64> user_defined_timestamp)
std::optional<Int64> user_defined_timestamp,
bool is_truncate)
{
int format_version = metadata_object->getValue<Int32>(Iceberg::f_format_version);
Poco::JSON::Object::Ptr new_snapshot = new Poco::JSON::Object;
Expand All @@ -138,7 +139,16 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(

auto parent_snapshot = getParentSnapshot(parent_snapshot_id);
Poco::JSON::Object::Ptr summary = new Poco::JSON::Object;
if (num_deleted_rows == 0)
if (is_truncate)
{
summary->set(Iceberg::f_operation, Iceberg::f_overwrite);
Int32 prev_total_records = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_records) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_records)) : 0;
Int32 prev_total_data_files = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_data_files) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_data_files)) : 0;

summary->set(Iceberg::f_deleted_records, std::to_string(prev_total_records));
summary->set(Iceberg::f_deleted_data_files, std::to_string(prev_total_data_files));
}
else if (num_deleted_rows == 0)
{
summary->set(Iceberg::f_operation, Iceberg::f_append);
summary->set(Iceberg::f_added_data_files, std::to_string(added_files));
Expand All @@ -158,7 +168,12 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(

auto sum_with_parent_snapshot = [&](const char * field_name, Int64 snapshot_value)
{
Int64 prev_value = parent_snapshot ? parse<Int64>(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
if (is_truncate)
{
summary->set(field_name, std::to_string(0));
return;
}
Int64 prev_value = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(field_name) ? parse<Int64>(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
summary->set(field_name, std::to_string(prev_value + snapshot_value));
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class MetadataGenerator
Int64 added_delete_files,
Int64 num_deleted_rows,
std::optional<Int64> user_defined_snapshot_id = std::nullopt,
std::optional<Int64> user_defined_timestamp = std::nullopt);
std::optional<Int64> user_defined_timestamp = std::nullopt,
bool is_truncate = false);

void generateAddColumnMetadata(const String & column_name, DataTypePtr type);
void generateDropColumnMetadata(const String & column_name);
Expand Down
10 changes: 7 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ bool StorageObjectStorage::optimize(
void StorageObjectStorage::truncate(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
ContextPtr local_context,
TableExclusiveLockHolder & /* table_holder */)
{
const auto path = configuration->getRawPath();
Expand All @@ -613,8 +613,12 @@ void StorageObjectStorage::truncate(

if (configuration->isDataLakeConfiguration())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Truncate is not supported for data lake engine");
auto * data_lake_metadata = getExternalMetadata(local_context);
if (!data_lake_metadata || !data_lake_metadata->supportsTruncate())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine");

data_lake_metadata->truncate(local_context, catalog, getStorageID());
return;
}

if (path.hasGlobsIgnorePlaceholders())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python3

from pyiceberg.catalog import load_catalog
from helpers.config_cluster import minio_secret_key, minio_access_key
import uuid
import pyarrow as pa
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import LongType, StringType
from pyiceberg.partitioning import PartitionSpec

BASE_URL_LOCAL_RAW = "http://localhost:8182"
CATALOG_NAME = "demo"

def load_catalog_impl(started_cluster):
return load_catalog(
CATALOG_NAME,
**{
"uri": BASE_URL_LOCAL_RAW,
"type": "rest",
"s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000",
"s3.access-key-id": minio_access_key,
"s3.secret-access-key": minio_secret_key,
},
)


def test_iceberg_truncate_restart(started_cluster_iceberg_no_spark):
instance = started_cluster_iceberg_no_spark.instances["node1"]
catalog = load_catalog_impl(started_cluster_iceberg_no_spark)

namespace = f"clickhouse_truncate_restart_{uuid.uuid4().hex}"
catalog.create_namespace(namespace)

schema = Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
NestedField(field_id=2, name="val", field_type=StringType(), required=False),
)
table_name = "test_truncate_restart"
catalog.create_table(
identifier=f"{namespace}.{table_name}",
schema=schema,
location=f"s3://warehouse-rest/{namespace}.{table_name}",
partition_spec=PartitionSpec(),
)

ch_table_identifier = f"`{namespace}.{table_name}`"

instance.query(f"DROP DATABASE IF EXISTS {namespace}")
instance.query(
f"""
CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}')
SETTINGS
catalog_type='rest',
warehouse='demo',
storage_endpoint='http://minio:9000/warehouse-rest';
""",
settings={"allow_database_iceberg": 1}
)

# 1. Insert initial data and truncate
df = pa.Table.from_pylist([{"id": 1, "val": "A"}, {"id": 2, "val": "B"}])
catalog.load_table(f"{namespace}.{table_name}").append(df)

assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 2

instance.query(
f"TRUNCATE TABLE {namespace}.{ch_table_identifier}",
settings={"allow_experimental_insert_into_iceberg": 1}
)
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0

# 2. Restart ClickHouse and verify table is still readable (count = 0)
instance.restart_clickhouse()
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0

# 3. Insert new data after restart and verify it's readable
new_df = pa.Table.from_pylist([{"id": 3, "val": "C"}])
catalog.load_table(f"{namespace}.{table_name}").append(new_df)
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 1

instance.query(f"DROP DATABASE {namespace}")
Loading