Skip to content

Commit db43b6b

Browse files
author
Daniel Q. Kim
committed
feature (iceberg): Implement TRUNCATE TABLE for Iceberg Engine (REST catalog support)
Forward-port of Iceberg TRUNCATE TABLE from antalya-26.1 to antalya-26.3. Adapted to antalya-26.3 API changes in RestCatalog constructor, DatabaseDataLakeSetting, and related interfaces. See original implementation details in antalya-26.1 PR #1529.
1 parent 26410f1 commit db43b6b

File tree

9 files changed

+276
-10
lines changed

9 files changed

+276
-10
lines changed

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ class IDataLakeMetadata : boost::noncopyable
168168
throwNotImplemented(fmt::format("EXECUTE {}", command_name));
169169
}
170170

171+
virtual bool supportsTruncate() const { return false; }
172+
virtual void truncate(ContextPtr /*context*/, std::shared_ptr<DataLake::ICatalog> /*catalog*/, const StorageID & /*storage_id*/)
173+
{
174+
throwNotImplemented("truncate");
175+
}
176+
171177
virtual void drop(ContextPtr) { }
172178

173179
protected:

src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ DEFINE_ICEBERG_FIELD_ALIAS(partition_spec, partition-spec);
113113
DEFINE_ICEBERG_FIELD_ALIAS(partition_specs, partition-specs);
114114
DEFINE_ICEBERG_FIELD_ALIAS(spec_id, spec-id);
115115
DEFINE_ICEBERG_FIELD_ALIAS(added_records, added-records);
116+
DEFINE_ICEBERG_FIELD_ALIAS(deleted_records, deleted-records);
116117
DEFINE_ICEBERG_FIELD_ALIAS(added_data_files, added-data-files);
118+
DEFINE_ICEBERG_FIELD_ALIAS(deleted_data_files, deleted-data-files);
117119
DEFINE_ICEBERG_FIELD_ALIAS(added_delete_files, added-delete-files);
118120
DEFINE_ICEBERG_FIELD_ALIAS(added_position_delete_files, added-position-delete-files);
119121
DEFINE_ICEBERG_FIELD_ALIAS(added_position_deletes, added-position-deletes);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
6868
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergTableStateSnapshot.h>
6969
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
70+
#include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h>
7071
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
7172
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>
7273
#include <Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h>
@@ -108,6 +109,7 @@ extern const int ICEBERG_SPECIFICATION_VIOLATION;
108109
extern const int S3_ERROR;
109110
extern const int TABLE_ALREADY_EXISTS;
110111
extern const int SUPPORT_IS_DISABLED;
112+
extern const int INCORRECT_DATA;
111113
}
112114

113115
namespace Setting
@@ -261,15 +263,17 @@ void IcebergMetadata::backgroundMetadataPrefetcherThread()
261263
/// first, we fetch the latest metadata version and cache it;
262264
/// as a part of the same method, we download metadata.json of the latest metadata version
263265
/// and after parsing it, we fetch manifest lists, parse and cache them
264-
auto ctx = Context::getGlobalContextInstance()->getBackgroundContext();
266+
auto ctx = Context::createCopy(Context::getGlobalContextInstance());
265267
auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(ctx, true);
266268
if (actual_data_snapshot)
267269
{
268270
for (const auto & entry : actual_data_snapshot->manifest_list_entries)
269271
{
270272
/// second, we fetch, parse and cache each manifest file
271-
auto manifest_file_ptr = getManifestFileEntriesHandle(
272-
object_storage, persistent_components, ctx, log, entry, actual_table_state_snapshot.schema_id);
273+
auto manifest_file_ptr = Iceberg::getManifestFile(
274+
object_storage, persistent_components, ctx, log,
275+
entry.manifest_file_path,
276+
entry.manifest_file_byte_size);
273277
}
274278
}
275279

@@ -595,6 +599,100 @@ void IcebergMetadata::mutate(
595599
);
596600
}
597601

602+
void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id)
603+
{
604+
if (!context->getSettingsRef()[Setting::allow_insert_into_iceberg].value)
605+
throw Exception(
606+
ErrorCodes::SUPPORT_IS_DISABLED,
607+
"Iceberg truncate requires the allow_insert_into_iceberg setting to be enabled.");
608+
609+
auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
610+
auto metadata_object = getMetadataJSONObject(
611+
actual_table_state_snapshot.metadata_file_path,
612+
object_storage,
613+
persistent_components.metadata_cache,
614+
context,
615+
log,
616+
persistent_components.metadata_compression_method,
617+
persistent_components.table_uuid);
618+
619+
// Use -1 as the Iceberg spec sentinel for "no parent snapshot"
620+
// (distinct from snapshot ID 0 which is a valid snapshot).
621+
Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(-1);
622+
623+
auto config_path = persistent_components.table_path;
624+
if (!config_path.starts_with('/')) config_path = '/' + config_path;
625+
if (!config_path.ends_with('/')) config_path += "/";
626+
627+
bool is_transactional = (catalog != nullptr && catalog->isTransactional());
628+
629+
// Transactional catalogs (e.g. REST) require a fully-qualified blob URI
630+
// (scheme://bucket/path) so the catalog can resolve the metadata location
631+
// independently of any local path configuration. Non-transactional catalogs
632+
// use bare paths relative to the object storage root.
633+
FileNamesGenerator filename_generator;
634+
if (is_transactional || context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata])
635+
{
636+
String location = metadata_object->getValue<String>(Iceberg::f_location);
637+
if (!location.ends_with("/")) location += "/";
638+
filename_generator = FileNamesGenerator(
639+
location, config_path, is_transactional,
640+
persistent_components.metadata_compression_method, write_format);
641+
}
642+
else
643+
{
644+
filename_generator = FileNamesGenerator(
645+
config_path, config_path, false,
646+
persistent_components.metadata_compression_method, write_format);
647+
}
648+
649+
Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1;
650+
filename_generator.setVersion(new_metadata_version);
651+
652+
auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName();
653+
654+
auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata(
655+
filename_generator, metadata_name, parent_snapshot_id,
656+
/* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0,
657+
/* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0,
658+
std::nullopt, std::nullopt, /*is_truncate=*/true);
659+
660+
auto write_settings = context->getWriteSettings();
661+
auto buf = object_storage->writeObject(
662+
StoredObject(storage_manifest_list_name),
663+
WriteMode::Rewrite, std::nullopt,
664+
DBMS_DEFAULT_BUFFER_SIZE, write_settings);
665+
666+
generateManifestList(filename_generator, metadata_object, object_storage,
667+
context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, /*use_previous_snapshots=*/false);
668+
buf->finalize();
669+
670+
String metadata_content = dumpMetadataObjectToString(metadata_object);
671+
writeMessageToFile(metadata_content, storage_metadata_name, object_storage,
672+
context, "*", "", persistent_components.metadata_compression_method);
673+
674+
if (catalog)
675+
{
676+
// Transactional catalogs require a fully-qualified blob URI so the catalog
677+
// can resolve the metadata location independently of local path configuration.
678+
String catalog_filename = metadata_name;
679+
if (is_transactional)
680+
{
681+
// Build full URI from the table's location field (e.g. "s3://bucket/namespace.table")
682+
// combined with the relative metadata name.
683+
String location = metadata_object->getValue<String>(Iceberg::f_location);
684+
if (!location.ends_with("/")) location += "/";
685+
catalog_filename = location + metadata_name;
686+
}
687+
688+
const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName());
689+
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
690+
throw Exception(ErrorCodes::INCORRECT_DATA,
691+
"Failed to commit Iceberg truncate update to catalog.");
692+
}
693+
}
694+
695+
598696
void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands)
599697
{
600698
for (const auto & command : commands)

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class IcebergMetadata : public IDataLakeMetadata
8383
bool supportsUpdate() const override { return true; }
8484
bool supportsWrites() const override { return true; }
8585
bool supportsParallelInsert() const override { return true; }
86+
bool supportsTruncate() const override { return true; }
87+
void truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) override;
8688

8789
IcebergHistory getHistory(ContextPtr local_context) const;
8890

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,31 @@ void generateManifestFile(
430430
writer.close();
431431
}
432432

433+
// Avro uses zigzag encoding for integers to efficiently represent small negative
434+
// numbers. Positive n maps to 2n, negative n maps to 2(-n)-1, keeping small
435+
// magnitudes compact regardless of sign. The value is then serialized as a
436+
// variable-length base-128 integer (little-endian), where the high bit of each
437+
// byte signals whether more bytes follow.
438+
// See: https://avro.apache.org/docs/1.11.1/specification/#binary-encoding
439+
static void writeAvroLong(WriteBuffer & out, int64_t val)
440+
{
441+
uint64_t n = (static_cast<uint64_t>(val) << 1) ^ static_cast<uint64_t>(val >> 63);
442+
while (n & ~0x7fULL)
443+
{
444+
char c = static_cast<char>((n & 0x7f) | 0x80);
445+
out.write(&c, 1);
446+
n >>= 7;
447+
}
448+
char c = static_cast<char>(n);
449+
out.write(&c, 1);
450+
}
451+
452+
static void writeAvroBytes(WriteBuffer & out, const String & s)
453+
{
454+
writeAvroLong(out, static_cast<int64_t>(s.size()));
455+
out.write(s.data(), s.size());
456+
}
457+
433458
void generateManifestList(
434459
const FileNamesGenerator & filename_generator,
435460
Poco::JSON::Object::Ptr metadata,
@@ -451,6 +476,38 @@ void generateManifestList(
451476
else
452477
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown iceberg version {}", version);
453478

479+
// For empty manifest list (e.g. TRUNCATE), write a valid Avro container
480+
// file manually so we can embed the full schema JSON with field-ids intact,
481+
// without triggering the DataFileWriter constructor's eager writeHeader()
482+
// which commits encoder state before we can override avro.schema.
483+
if (manifest_entry_names.empty() && !use_previous_snapshots)
484+
{
485+
// For an empty manifest list (e.g. after TRUNCATE), we write a minimal valid
486+
// Avro Object Container File manually rather than using avro::DataFileWriter.
487+
// The reason: DataFileWriter calls writeHeader() eagerly in its constructor,
488+
// committing the binary encoder state. Post-construction setMetadata() calls
489+
// corrupt StreamWriter::next_ causing a NULL dereference on close(). Writing
490+
// the OCF header directly ensures the full schema JSON (with Iceberg field-ids)
491+
// is embedded intact — the Avro C++ library strips unknown field properties
492+
// like field-id during schema node serialization.
493+
// Avro OCF format: [magic(4)] [metadata_map] [sync_marker(16)] [no data blocks]
494+
buf.write("Obj\x01", 4);
495+
496+
writeAvroLong(buf, 2); // 2 metadata entries
497+
writeAvroBytes(buf, "avro.codec");
498+
writeAvroBytes(buf, "null");
499+
writeAvroBytes(buf, "avro.schema");
500+
writeAvroBytes(buf, schema_representation); // full JSON with field-ids intact
501+
502+
writeAvroLong(buf, 0); // end of metadata map
503+
504+
static const char sync_marker[16] = {};
505+
buf.write(sync_marker, 16);
506+
507+
buf.finalize();
508+
return;
509+
}
510+
454511
auto schema = avro::compileJsonSchemaFromString(schema_representation); // NOLINT
455512

456513
auto adapter = std::make_unique<OutputStreamWriteBufferAdapter>(buf);

src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(
114114
Int64 added_delete_files,
115115
Int64 num_deleted_rows,
116116
std::optional<Int64> user_defined_snapshot_id,
117-
std::optional<Int64> user_defined_timestamp)
117+
std::optional<Int64> user_defined_timestamp,
118+
bool is_truncate)
118119
{
119120
int format_version = metadata_object->getValue<Int32>(Iceberg::f_format_version);
120121
Poco::JSON::Object::Ptr new_snapshot = new Poco::JSON::Object;
@@ -138,7 +139,16 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(
138139

139140
auto parent_snapshot = getParentSnapshot(parent_snapshot_id);
140141
Poco::JSON::Object::Ptr summary = new Poco::JSON::Object;
141-
if (num_deleted_rows == 0)
142+
if (is_truncate)
143+
{
144+
summary->set(Iceberg::f_operation, Iceberg::f_overwrite);
145+
Int32 prev_total_records = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_records) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_records)) : 0;
146+
Int32 prev_total_data_files = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_data_files) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_data_files)) : 0;
147+
148+
summary->set(Iceberg::f_deleted_records, std::to_string(prev_total_records));
149+
summary->set(Iceberg::f_deleted_data_files, std::to_string(prev_total_data_files));
150+
}
151+
else if (num_deleted_rows == 0)
142152
{
143153
summary->set(Iceberg::f_operation, Iceberg::f_append);
144154
summary->set(Iceberg::f_added_data_files, std::to_string(added_files));
@@ -158,7 +168,12 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(
158168

159169
auto sum_with_parent_snapshot = [&](const char * field_name, Int64 snapshot_value)
160170
{
161-
Int64 prev_value = parent_snapshot ? parse<Int64>(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
171+
if (is_truncate)
172+
{
173+
summary->set(field_name, std::to_string(0));
174+
return;
175+
}
176+
Int64 prev_value = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(field_name) ? parse<Int64>(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
162177
summary->set(field_name, std::to_string(prev_value + snapshot_value));
163178
};
164179

src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ class MetadataGenerator
3030
Int64 added_delete_files,
3131
Int64 num_deleted_rows,
3232
std::optional<Int64> user_defined_snapshot_id = std::nullopt,
33-
std::optional<Int64> user_defined_timestamp = std::nullopt);
33+
std::optional<Int64> user_defined_timestamp = std::nullopt,
34+
bool is_truncate = false);
3435

3536
void generateAddColumnMetadata(const String & column_name, DataTypePtr type);
3637
void generateDropColumnMetadata(const String & column_name);

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ bool StorageObjectStorage::optimize(
599599
void StorageObjectStorage::truncate(
600600
const ASTPtr & /* query */,
601601
const StorageMetadataPtr & /* metadata_snapshot */,
602-
ContextPtr /* context */,
602+
ContextPtr local_context,
603603
TableExclusiveLockHolder & /* table_holder */)
604604
{
605605
const auto path = configuration->getRawPath();
@@ -613,8 +613,12 @@ void StorageObjectStorage::truncate(
613613

614614
if (configuration->isDataLakeConfiguration())
615615
{
616-
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
617-
"Truncate is not supported for data lake engine");
616+
auto * data_lake_metadata = getExternalMetadata(local_context);
617+
if (!data_lake_metadata || !data_lake_metadata->supportsTruncate())
618+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine");
619+
620+
data_lake_metadata->truncate(local_context, catalog, getStorageID());
621+
return;
618622
}
619623

620624
if (path.hasGlobsIgnorePlaceholders())
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#!/usr/bin/env python3
2+
3+
from pyiceberg.catalog import load_catalog
4+
from helpers.config_cluster import minio_secret_key, minio_access_key
5+
import uuid
6+
import pyarrow as pa
7+
from pyiceberg.schema import Schema, NestedField
8+
from pyiceberg.types import LongType, StringType
9+
from pyiceberg.partitioning import PartitionSpec
10+
11+
BASE_URL_LOCAL_RAW = "http://localhost:8182"
12+
CATALOG_NAME = "demo"
13+
14+
def load_catalog_impl(started_cluster):
15+
return load_catalog(
16+
CATALOG_NAME,
17+
**{
18+
"uri": BASE_URL_LOCAL_RAW,
19+
"type": "rest",
20+
"s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000",
21+
"s3.access-key-id": minio_access_key,
22+
"s3.secret-access-key": minio_secret_key,
23+
},
24+
)
25+
26+
27+
def test_iceberg_truncate_restart(started_cluster_iceberg_no_spark):
28+
instance = started_cluster_iceberg_no_spark.instances["node1"]
29+
catalog = load_catalog_impl(started_cluster_iceberg_no_spark)
30+
31+
namespace = f"clickhouse_truncate_restart_{uuid.uuid4().hex}"
32+
catalog.create_namespace(namespace)
33+
34+
schema = Schema(
35+
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
36+
NestedField(field_id=2, name="val", field_type=StringType(), required=False),
37+
)
38+
table_name = "test_truncate_restart"
39+
catalog.create_table(
40+
identifier=f"{namespace}.{table_name}",
41+
schema=schema,
42+
location=f"s3://warehouse-rest/{namespace}.{table_name}",
43+
partition_spec=PartitionSpec(),
44+
)
45+
46+
ch_table_identifier = f"`{namespace}.{table_name}`"
47+
48+
instance.query(f"DROP DATABASE IF EXISTS {namespace}")
49+
instance.query(
50+
f"""
51+
CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}')
52+
SETTINGS
53+
catalog_type='rest',
54+
warehouse='demo',
55+
storage_endpoint='http://minio:9000/warehouse-rest';
56+
""",
57+
settings={"allow_database_iceberg": 1}
58+
)
59+
60+
# 1. Insert initial data and truncate
61+
df = pa.Table.from_pylist([{"id": 1, "val": "A"}, {"id": 2, "val": "B"}])
62+
catalog.load_table(f"{namespace}.{table_name}").append(df)
63+
64+
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 2
65+
66+
instance.query(
67+
f"TRUNCATE TABLE {namespace}.{ch_table_identifier}",
68+
settings={"allow_experimental_insert_into_iceberg": 1}
69+
)
70+
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0
71+
72+
# 2. Restart ClickHouse and verify table is still readable (count = 0)
73+
instance.restart_clickhouse()
74+
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0
75+
76+
# 3. Insert new data after restart and verify it's readable
77+
new_df = pa.Table.from_pylist([{"id": 3, "val": "C"}])
78+
catalog.load_table(f"{namespace}.{table_name}").append(new_df)
79+
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 1
80+
81+
instance.query(f"DROP DATABASE {namespace}")

0 commit comments

Comments
 (0)