diff --git a/native/Cargo.lock b/native/Cargo.lock index 4adf9ed06e..a47d004afe 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -17,17 +17,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" -[[package]] -name = "ahash" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" -dependencies = [ - "getrandom 0.2.17", - "once_cell", - "version_check", -] - [[package]] name = "ahash" version = "0.8.12" @@ -201,7 +190,7 @@ version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65ca404ea6191e06bf30956394173337fa9c35f445bd447fe6c21ab944e1a23c" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow-buffer", "arrow-data", "arrow-schema", @@ -358,7 +347,7 @@ version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c96d8a1c180b44ecf2e66c9a2f2bbcb8b1b6f14e165ce46ac8bde211a363411b" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow-array", "arrow-buffer", "arrow-data", @@ -569,7 +558,7 @@ dependencies = [ "aws-sdk-ssooidc", "aws-sdk-sts", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -601,9 +590,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.15.3" +version = "1.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e84ce723ab67259cfeb9877c6a639ee9eb7a27b28123abd71db7f0d5d0cc9d86" +checksum = "7b7b6141e96a8c160799cc2d5adecd5cbbe5054cb8c7c4af53da0f83bb7ad256" dependencies = [ "aws-lc-sys", "zeroize", @@ -611,9 +600,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.36.0" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a442ece363113bd4bd4c8b18977a7798dd4d3c3383f34fb61936960e8f4ad8" +checksum = "5c34dda4df7017c8db52132f0f8a2e0f8161649d15723ed63fc00c82d0f2081a" dependencies = [ "cc", "cmake", @@ -630,7 +619,7 @@ dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -654,7 +643,7 @@ dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", @@ -677,7 +666,7 @@ dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", @@ -700,7 +689,7 @@ dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-observability", "aws-smithy-query", @@ -722,7 +711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69e523e1c4e8e7e8ff219d732988e22bfeae8a1cafdbe6d9eca1546fa080be7c" dependencies = [ "aws-credential-types", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -739,9 +728,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.7" +version = "1.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ee19095c7c4dda59f1697d028ce704c24b2d33c6718790c7f1d5a3015b4107c" +checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" dependencies = [ "futures-util", "pin-project-lite", @@ -769,11 +758,32 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-smithy-http" +version = "0.63.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630e67f2a31094ffa51b210ae030855cb8f3b7ee1329bdd8d085aaf61e8b97fc" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + [[package]] name = "aws-smithy-http-client" -version = "1.1.5" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59e62db736db19c488966c8d787f52e6270be565727236fd5579eaa301e7bc4a" +checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -804,18 +814,18 @@ dependencies = [ [[package]] name = "aws-smithy-observability" -version = "0.2.0" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef1fcbefc7ece1d70dcce29e490f269695dfca2d2bacdeaf9e5c3f799e4e6a42" +checksum = "c0a46543fbc94621080b3cf553eb4cbbdc41dd9780a30c4756400f0139440a1d" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.9" +version = "0.60.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae5d689cf437eae90460e944a58b5668530d433b4ff85789e69d2f2a556e057d" +checksum = "0cebbddb6f3a5bd81553643e9c7daf3cc3dc5b0b5f398ac668630e8a84e6fff0" dependencies = [ "aws-smithy-types", "urlencoding", @@ -823,12 +833,12 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.8" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb5b6167fcdf47399024e81ac08e795180c576a20e4d4ce67949f9a88ae37dc1" +checksum = "f3df87c14f0127a0d77eb261c3bc45d5b4833e2a1f63583ebfb728e4852134ee" dependencies = [ "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.63.3", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", @@ -839,6 +849,7 @@ dependencies = [ "http 1.4.0", "http-body 0.4.6", "http-body 1.0.1", + "http-body-util", "pin-project-lite", "pin-utils", "tokio", @@ -847,9 +858,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.10.0" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efce7aaaf59ad53c5412f14fc19b2d5c6ab2c3ec688d272fd31f76ec12f44fb0" +checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -864,9 +875,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.3.6" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65f172bcb02424eb94425db8aed1b6d583b5104d4d5ddddf22402c661a320048" +checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" dependencies = [ "base64-simd", "bytes", @@ -1024,18 +1035,6 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" -[[package]] -name = "bitvec" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" -dependencies = [ - "funty", - "radium", - "tap", - "wyz", -] - [[package]] name = "blake2" version = "0.10.6" @@ -1081,6 +1080,16 @@ dependencies = [ "piper", ] +[[package]] +name = "bnum" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f781dba93de3a5ef6dc5b17c9958b208f6f3f021623b360fb605ea51ce443f10" +dependencies = [ + "serde", + "serde-big-array", +] + [[package]] name = "bon" version = "3.8.2" @@ -1106,29 +1115,6 @@ dependencies = [ "syn 2.0.114", ] -[[package]] -name = "borsh" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1da5ab77c1437701eeff7c88d968729e7766172279eab0676857b3d63af7a6f" -dependencies = [ - "borsh-derive", - "cfg_aliases", -] - -[[package]] -name = "borsh-derive" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0686c856aa6aac0c4498f936d7d6a02df690f614c03e4d906d1018062b5c5e2c" -dependencies = [ - "once_cell", - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 2.0.114", -] - [[package]] name = "brotli" version = "8.0.2" @@ -1156,33 +1142,11 @@ version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" -[[package]] -name = "bytecheck" -version = "0.6.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" -dependencies = [ - "bytecheck_derive", - "ptr_meta", - "simdutf8", -] - -[[package]] -name = "bytecheck_derive" -version = "0.6.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "bytemuck" -version = "1.24.0" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" [[package]] name = "byteorder" @@ -1242,9 +1206,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.54" +version = "1.2.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" +checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" dependencies = [ "find-msvc-tools", "jobserver", @@ -1343,18 +1307,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.54" +version = "4.5.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394" +checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.54" +version = "4.5.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00" +checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" dependencies = [ "anstyle", "clap_lex", @@ -1935,7 +1899,7 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c10f7659e96127d25e8366be7c8be4109595d6a2c3eac70421f380a7006a1b0" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "arrow-ipc", "chrono", @@ -2196,7 +2160,7 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c25210520a9dcf9c2b2cbbce31ebd4131ef5af7fc60ee92b266dc7d159cb305" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-doc", @@ -2217,7 +2181,7 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f4a66f3b87300bb70f4124b55434d2ae3fe80455f3574701d0348da040b55d" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-expr-common", @@ -2327,7 +2291,7 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c30cc8012e9eedcb48bbe112c6eff4ae5ed19cf3003cb0f505662e88b7014c5d" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-expr", @@ -2364,7 +2328,7 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90da43e1ec550b172f34c87ec68161986ced70fd05c8d2a2add66eef9c276f03" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-expr-common", @@ -2396,7 +2360,7 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0acf0ad6b6924c6b1aa7d213b181e012e2d3ec0a64ff5b10ee6282ab0f8532ac" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "arrow-ord", "arrow-schema", @@ -2650,6 +2614,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3" +dependencies = [ + "serde", + "serde_core", + "typeid", +] + [[package]] name = "errno" version = "0.3.14" @@ -2697,6 +2672,18 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fastnum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4089ab2dfd45d8ddc92febb5ca80644389d5ebb954f40231274a3f18341762e2" +dependencies = [ + "bnum", + "num-integer", + "num-traits", + "serde", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -2705,9 +2692,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "findshlibs" @@ -2789,12 +2776,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" -[[package]] -name = "funty" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" - [[package]] name = "futures" version = "0.3.31" @@ -3000,9 +2981,6 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -dependencies = [ - "ahash 0.7.8", -] [[package]] name = "hashbrown" @@ -3010,7 +2988,7 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ - "ahash 0.8.12", + "ahash", "allocator-api2", ] @@ -3198,14 +3176,13 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "base64", "bytes", "futures-channel", - "futures-core", "futures-util", "http 1.4.0", "http-body 1.0.1", @@ -3222,9 +3199,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.64" +version = "0.1.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -3247,7 +3224,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.8.0" -source = "git+https://github.com/apache/iceberg-rust?rev=ee21563#ee21563c2032948f636eae84870f317a0b299a05" +source = "git+https://github.com/mbutrovich/iceberg-rust?branch=cache_parquet_metadata#84a5723ea4e42e0163c71a1e22c05b185f155a4b" dependencies = [ "anyhow", "apache-avro", @@ -3269,13 +3246,13 @@ dependencies = [ "chrono", "derive_builder", "expect-test", + "fastnum", "flate2", "fnv", "futures", "itertools 0.13.0", "moka", "murmur3", - "num-bigint", "once_cell", "opendal", "ordered-float 4.6.0", @@ -3284,7 +3261,6 @@ dependencies = [ "reqsign", "reqwest", "roaring", - "rust_decimal", "serde", "serde_bytes", "serde_derive", @@ -3294,6 +3270,7 @@ dependencies = [ "strum", "tokio", "typed-builder", + "typetag", "url", "uuid", "zstd", @@ -3436,7 +3413,7 @@ version = "0.11.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88" dependencies = [ - "ahash 0.8.12", + "ahash", "indexmap 2.13.0", "is-terminal", "itoa", @@ -3454,6 +3431,15 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "inventory" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e" +dependencies = [ + "rustversion", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -3711,9 +3697,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libmimalloc-sys" @@ -3892,9 +3878,9 @@ checksum = "dce6dd36094cac388f119d2e9dc82dc730ef91c32a6222170d630e5414b956e6" [[package]] name = "moka" -version = "0.12.12" +version = "0.12.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a" +checksum = "b4ac832c50ced444ef6be0767a008b02c106a909ba79d1d830501e94b96f6b7e" dependencies = [ "async-lock", "crossbeam-channel", @@ -3979,9 +3965,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-format" @@ -4141,9 +4127,9 @@ dependencies = [ [[package]] name = "openssl-probe" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "ordered-float" @@ -4214,7 +4200,7 @@ version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f6a2926a30477c0b95fea6c28c3072712b139337a242c2cc64817bdc20a8854" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow-array", "arrow-buffer", "arrow-cast", @@ -4434,15 +4420,15 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f89776e4d69bb58bc6993e99ffa1d11f228b839984854c7daeb5d37f87cbe950" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" [[package]] name = "portable-atomic-util" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5" dependencies = [ "portable-atomic", ] @@ -4514,9 +4500,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] @@ -4596,26 +4582,6 @@ dependencies = [ "prost", ] -[[package]] -name = "ptr_meta" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" -dependencies = [ - "ptr_meta_derive", -] - -[[package]] -name = "ptr_meta_derive" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "quad-rand" version = "0.2.3" @@ -4708,9 +4674,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" +checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" dependencies = [ "proc-macro2", ] @@ -4721,12 +4687,6 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" -[[package]] -name = "radium" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" - [[package]] name = "rand" version = "0.8.5" @@ -4876,15 +4836,6 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" -[[package]] -name = "rend" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" -dependencies = [ - "bytecheck", -] - [[package]] name = "reqsign" version = "0.16.5" @@ -4980,35 +4931,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rkyv" -version = "0.7.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" -dependencies = [ - "bitvec", - "bytecheck", - "bytes", - "hashbrown 0.12.3", - "ptr_meta", - "rend", - "rkyv_derive", - "seahash", - "tinyvec", - "uuid", -] - -[[package]] -name = "rkyv_derive" -version = "0.7.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "roaring" version = "0.11.3" @@ -5058,22 +4980,6 @@ dependencies = [ "ordered-multimap", ] -[[package]] -name = "rust_decimal" -version = "1.40.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61f703d19852dbf87cbc513643fa81428361eb6940f1ac14fd58155d295a3eb0" -dependencies = [ - "arrayvec", - "borsh", - "bytes", - "num-traits", - "rand 0.8.5", - "rkyv", - "serde", - "serde_json", -] - [[package]] name = "rustc-demangle" version = "0.1.27" @@ -5229,9 +5135,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e910108742c57a770f492731f99be216a52fadd361b06c8fb59d74ccc267d2" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ "dyn-clone", "ref-cast", @@ -5245,12 +5151,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "seahash" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" - [[package]] name = "security-framework" version = "3.5.1" @@ -5296,6 +5196,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-big-array" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11fc7cc2c76d73e0f27ee52abbd64eec84d46f370c88371120433196934e4b7f" +dependencies = [ + "serde", +] + [[package]] name = "serde-value" version = "0.7.0" @@ -5384,7 +5293,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.13.0", "schemars 0.9.0", - "schemars 1.2.0", + "schemars 1.2.1", "serde_core", "serde_json", "serde_with_macros", @@ -5468,15 +5377,15 @@ checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" [[package]] name = "siphasher" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" [[package]] name = "slab" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "smallvec" @@ -5492,9 +5401,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "socket2" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" dependencies = [ "libc", "windows-sys 0.60.2", @@ -5646,12 +5555,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" -[[package]] -name = "tap" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" - [[package]] name = "tempfile" version = "3.24.0" @@ -5759,9 +5662,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.45" +version = "0.3.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd" +checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" dependencies = [ "deranged", "itoa", @@ -5774,15 +5677,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.25" +version = "0.2.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd" +checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" dependencies = [ "num-conv", "time-core", @@ -6024,6 +5927,12 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typemap-ors" version = "1.0.0" @@ -6039,6 +5948,30 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "typetag" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be2212c8a9b9bcfca32024de14998494cf9a5dfa59ea1b829de98bac374b86bf" +dependencies = [ + "erased-serde", + "inventory", + "once_cell", + "serde", + "typetag-impl", +] + +[[package]] +name = "typetag-impl" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27a7a9b72ba121f6f1f6c3632b85604cac41aedb5ddc70accbebb6cac83de846" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "unicode-ident" version = "1.0.22" @@ -6627,15 +6560,6 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" -[[package]] -name = "wyz" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" -dependencies = [ - "tap", -] - [[package]] name = "xmlparser" version = "0.13.6" @@ -6676,18 +6600,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.33" +version = "0.8.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" +checksum = "7456cf00f0685ad319c5b1693f291a650eaf345e941d082fc4e03df8a03996ac" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.33" +version = "0.8.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" +checksum = "1328722bbf2115db7e19d69ebcc15e795719e2d66b60827c6a69a117365e37a0" dependencies = [ "proc-macro2", "quote", @@ -6762,9 +6686,9 @@ checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" [[package]] name = "zmij" -version = "1.0.16" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfcd145825aace48cff44a8844de64bf75feec3080e0aa5cdbde72961ae51a65" +checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" [[package]] name = "zstd" diff --git a/native/Cargo.toml b/native/Cargo.toml index 216057f9bd..1fef259a73 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -54,7 +54,7 @@ object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"] url = "2.2" aws-config = "1.8.10" aws-credential-types = "1.2.9" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "ee21563" } +iceberg = { git = "https://github.com/mbutrovich/iceberg-rust", branch = "cache_parquet_metadata" } [profile.release] debug = true diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index 2f639e9f70..bc20592e90 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -44,6 +44,7 @@ use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper}; use datafusion_comet_spark_expr::EvalMode; +use iceberg::scan::FileScanTask; /// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables. /// @@ -58,8 +59,8 @@ pub struct IcebergScanExec { plan_properties: PlanProperties, /// Catalog-specific configuration for FileIO catalog_properties: HashMap, - /// Pre-planned file scan tasks, grouped by partition - file_task_groups: Vec>, + /// Pre-planned file scan tasks + tasks: Vec, /// Metrics metrics: ExecutionPlanMetricsSet, } @@ -69,11 +70,10 @@ impl IcebergScanExec { metadata_location: String, schema: SchemaRef, catalog_properties: HashMap, - file_task_groups: Vec>, + tasks: Vec, ) -> Result { let output_schema = schema; - let num_partitions = file_task_groups.len(); - let plan_properties = Self::compute_properties(Arc::clone(&output_schema), num_partitions); + let plan_properties = Self::compute_properties(Arc::clone(&output_schema), 1); let metrics = ExecutionPlanMetricsSet::new(); @@ -82,7 +82,7 @@ impl IcebergScanExec { output_schema, plan_properties, catalog_properties, - file_task_groups, + tasks, metrics, }) } @@ -127,19 +127,10 @@ impl ExecutionPlan for IcebergScanExec { fn execute( &self, - partition: usize, + _partition: usize, context: Arc, ) -> DFResult { - if partition < self.file_task_groups.len() { - let tasks = &self.file_task_groups[partition]; - self.execute_with_tasks(tasks.clone(), partition, context) - } else { - Err(DataFusionError::Execution(format!( - "IcebergScanExec: Partition index {} out of range (only {} task groups available)", - partition, - self.file_task_groups.len() - ))) - } + self.execute_with_tasks(self.tasks.clone(), context) } fn metrics(&self) -> Option { @@ -152,15 +143,14 @@ impl IcebergScanExec { /// deletes via iceberg-rust's ArrowReader. fn execute_with_tasks( &self, - tasks: Vec, - partition: usize, + tasks: Vec, context: Arc, ) -> DFResult { let output_schema = Arc::clone(&self.output_schema); let file_io = Self::load_file_io(&self.catalog_properties, &self.metadata_location)?; let batch_size = context.session_config().batch_size(); - let metrics = IcebergScanMetrics::new(&self.metrics, partition); + let metrics = IcebergScanMetrics::new(&self.metrics); let num_tasks = tasks.len(); metrics.num_splits.add(num_tasks); @@ -221,10 +211,10 @@ struct IcebergScanMetrics { } impl IcebergScanMetrics { - fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + fn new(metrics: &ExecutionPlanMetricsSet) -> Self { Self { - baseline: BaselineMetrics::new(metrics, partition), - num_splits: MetricBuilder::new(metrics).counter("num_splits", partition), + baseline: BaselineMetrics::new(metrics, 0), + num_splits: MetricBuilder::new(metrics).counter("num_splits", 0), } } } @@ -311,11 +301,11 @@ where impl DisplayAs for IcebergScanExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - let num_tasks: usize = self.file_task_groups.iter().map(|g| g.len()).sum(); write!( f, "IcebergScanExec: metadata_location={}, num_tasks={}", - self.metadata_location, num_tasks + self.metadata_location, + self.tasks.len() ) } } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 44ff20a44f..12db052394 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1132,33 +1132,28 @@ impl PhysicalPlanner { )) } OpStruct::IcebergScan(scan) => { - let required_schema: SchemaRef = - convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); + // Extract common data and single partition's file tasks + // Per-partition injection happens in Scala before sending to native + let common = scan + .common + .as_ref() + .ok_or_else(|| GeneralError("IcebergScan missing common data".into()))?; - let catalog_properties: HashMap = scan + let required_schema = + convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); + let catalog_properties: HashMap = common .catalog_properties .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(); - - let metadata_location = scan.metadata_location.clone(); - - debug_assert!( - !scan.file_partitions.is_empty(), - "IcebergScan must have at least one file partition. This indicates a bug in Scala serialization." - ); - - let tasks = parse_file_scan_tasks( - scan, - &scan.file_partitions[self.partition as usize].file_scan_tasks, - )?; - let file_task_groups = vec![tasks]; + let metadata_location = common.metadata_location.clone(); + let tasks = parse_file_scan_tasks_from_common(common, &scan.file_scan_tasks)?; let iceberg_scan = IcebergScanExec::new( metadata_location, required_schema, catalog_properties, - file_task_groups, + tasks, )?; Ok(( @@ -2743,15 +2738,14 @@ fn partition_data_to_struct( /// Each task contains a residual predicate that is used for row-group level filtering /// during Parquet scanning. /// -/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing -/// of schemas, partition specs, partition types, name mappings, and other repeated data. -fn parse_file_scan_tasks( - proto_scan: &spark_operator::IcebergScan, +/// This function uses deduplication pools from the IcebergScanCommon to avoid redundant +/// parsing of schemas, partition specs, partition types, name mappings, and other repeated data. +fn parse_file_scan_tasks_from_common( + proto_common: &spark_operator::IcebergScanCommon, proto_tasks: &[spark_operator::IcebergFileScanTask], ) -> Result, ExecutionError> { - // Build caches upfront: for 10K tasks with 1 schema, this parses the schema - // once instead of 10K times, eliminating redundant JSON deserialization - let schema_cache: Vec> = proto_scan + // Parse each unique schema once, not once per task + let schema_cache: Vec> = proto_common .schema_pool .iter() .map(|json| { @@ -2764,7 +2758,7 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - let partition_spec_cache: Vec>> = proto_scan + let partition_spec_cache: Vec>> = proto_common .partition_spec_pool .iter() .map(|json| { @@ -2774,7 +2768,7 @@ fn parse_file_scan_tasks( }) .collect(); - let name_mapping_cache: Vec>> = proto_scan + let name_mapping_cache: Vec>> = proto_common .name_mapping_pool .iter() .map(|json| { @@ -2784,7 +2778,7 @@ fn parse_file_scan_tasks( }) .collect(); - let delete_files_cache: Vec> = proto_scan + let delete_files_cache: Vec> = proto_common .delete_files_pool .iter() .map(|list| { @@ -2796,7 +2790,7 @@ fn parse_file_scan_tasks( "EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes, other => { return Err(GeneralError(format!( - "Invalid delete content type '{}'. This indicates a bug in Scala serialization.", + "Invalid delete content type '{}'", other ))) } @@ -2817,7 +2811,6 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - // Partition data pool is in protobuf messages let results: Result, _> = proto_tasks .iter() .map(|proto_task| { @@ -2851,7 +2844,7 @@ fn parse_file_scan_tasks( }; let bound_predicate = if let Some(idx) = proto_task.residual_idx { - proto_scan + proto_common .residual_pool .get(idx as usize) .and_then(convert_spark_expr_to_predicate) @@ -2871,24 +2864,22 @@ fn parse_file_scan_tasks( }; let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - // Get partition data from protobuf pool - let partition_data_proto = proto_scan + let partition_data_proto = proto_common .partition_data_pool .get(partition_data_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid partition_data_idx: {} (pool size: {})", partition_data_idx, - proto_scan.partition_data_pool.len() + proto_common.partition_data_pool.len() )) })?; - // Convert protobuf PartitionData to iceberg Struct match partition_data_to_struct(partition_data_proto) { Ok(s) => Some(s), Err(e) => { return Err(ExecutionError::GeneralError(format!( - "Failed to deserialize partition data from protobuf: {}", + "Failed to deserialize partition data: {}", e ))) } @@ -2907,14 +2898,14 @@ fn parse_file_scan_tasks( .and_then(|idx| name_mapping_cache.get(idx as usize)) .and_then(|opt| opt.clone()); - let project_field_ids = proto_scan + let project_field_ids = proto_common .project_field_ids_pool .get(proto_task.project_field_ids_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid project_field_ids_idx: {} (pool size: {})", proto_task.project_field_ids_idx, - proto_scan.project_field_ids_pool.len() + proto_common.project_field_ids_pool.len() )) })? .field_ids diff --git a/native/proto/src/lib.rs b/native/proto/src/lib.rs index 6dfe546ac8..a55657b7af 100644 --- a/native/proto/src/lib.rs +++ b/native/proto/src/lib.rs @@ -34,6 +34,7 @@ pub mod spark_partitioning { // Include generated modules from .proto files. #[allow(missing_docs)] +#[allow(clippy::large_enum_variant)] pub mod spark_operator { include!(concat!("generated", "/spark.spark_operator.rs")); } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 73c087cf36..78f118e6db 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -156,28 +156,34 @@ message PartitionData { repeated PartitionValue values = 1; } -message IcebergScan { - // Schema to read - repeated SparkStructField required_schema = 1; - +// Common data shared by all partitions in split mode (sent once, captured in closure) +message IcebergScanCommon { // Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.) - map catalog_properties = 2; - - // Pre-planned file scan tasks grouped by Spark partition - repeated IcebergFilePartition file_partitions = 3; + map catalog_properties = 1; // Table metadata file path for FileIO initialization - string metadata_location = 4; + string metadata_location = 2; + + // Schema to read + repeated SparkStructField required_schema = 3; - // Deduplication pools - shared data referenced by index from tasks - repeated string schema_pool = 5; - repeated string partition_type_pool = 6; - repeated string partition_spec_pool = 7; - repeated string name_mapping_pool = 8; - repeated ProjectFieldIdList project_field_ids_pool = 9; - repeated PartitionData partition_data_pool = 10; - repeated DeleteFileList delete_files_pool = 11; - repeated spark.spark_expression.Expr residual_pool = 12; + // Deduplication pools (must contain all entries for cross-partition deduplication) + repeated string schema_pool = 4; + repeated string partition_type_pool = 5; + repeated string partition_spec_pool = 6; + repeated string name_mapping_pool = 7; + repeated ProjectFieldIdList project_field_ids_pool = 8; + repeated PartitionData partition_data_pool = 9; + repeated DeleteFileList delete_files_pool = 10; + repeated spark.spark_expression.Expr residual_pool = 11; +} + +message IcebergScan { + // Common data shared across partitions (pools, metadata, catalog props) + IcebergScanCommon common = 1; + + // Single partition's file scan tasks + repeated IcebergFileScanTask file_scan_tasks = 2; } // Helper message for deduplicating field ID lists @@ -190,11 +196,6 @@ message DeleteFileList { repeated IcebergDeleteFile delete_files = 1; } -// Groups FileScanTasks for a single Spark partition -message IcebergFilePartition { - repeated IcebergFileScanTask file_scan_tasks = 1; -} - // Iceberg FileScanTask containing data file, delete files, and residual filter message IcebergFileScanTask { // Data file path (e.g., s3://bucket/warehouse/db/table/data/00000-0-abc.parquet) diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 2d772063e4..c5b6554054 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -734,7 +734,7 @@ case class CometIcebergNativeScanMetadata( table: Any, metadataLocation: String, nameMapping: Option[String], - tasks: java.util.List[_], + @transient tasks: java.util.List[_], scanSchema: Any, tableSchema: Any, globalFieldIdMapping: Map[String, Int], diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 68a63b6ae8..a3b3208b02 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -28,12 +28,13 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SubqueryAdaptiveBroadcastExec} +import org.apache.spark.sql.execution.InSubqueryExec import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan @@ -51,11 +52,15 @@ import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet} import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} import org.apache.comet.serde.operator.CometNativeScan import org.apache.comet.shims.CometTypeShim +import org.apache.comet.shims.ShimSubqueryBroadcast /** * Spark physical optimizer rule for replacing Spark scans with Comet scans. */ -case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with CometTypeShim { +case class CometScanRule(session: SparkSession) + extends Rule[SparkPlan] + with CometTypeShim + with ShimSubqueryBroadcast { import CometScanRule._ @@ -327,10 +332,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com case _ if scanExec.scan.getClass.getName == "org.apache.iceberg.spark.source.SparkBatchQueryScan" => - if (scanExec.runtimeFilters.exists(isDynamicPruningFilter)) { - return withInfo(scanExec, "Dynamic Partition Pruning is not supported") - } - val fallbackReasons = new ListBuffer[String]() // Native Iceberg scan requires both configs to be enabled @@ -621,10 +622,47 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com !hasUnsupportedDeletes } + // Check that all DPP subqueries use InSubqueryExec which we know how to handle. + // Future Spark versions might introduce new subquery types we haven't tested. + val dppSubqueriesSupported = { + val unsupportedSubqueries = scanExec.runtimeFilters.collect { + case DynamicPruningExpression(e) if !e.isInstanceOf[InSubqueryExec] => + e.getClass.getSimpleName + } + // Check for multi-index DPP which we don't support yet. + // SPARK-46946 changed SubqueryAdaptiveBroadcastExec from index: Int to indices: Seq[Int] + // as a preparatory refactor for future features (Null Safe Equality DPP, multiple + // equality predicates). Currently indices always has one element, but future Spark + // versions might use multiple indices. + val multiIndexDpp = scanExec.runtimeFilters.exists { + case DynamicPruningExpression(e: InSubqueryExec) => + e.plan match { + case sab: SubqueryAdaptiveBroadcastExec => + getSubqueryBroadcastIndices(sab).length > 1 + case _ => false + } + case _ => false + } + if (unsupportedSubqueries.nonEmpty) { + fallbackReasons += + s"Unsupported DPP subquery types: ${unsupportedSubqueries.mkString(", ")}. " + + "CometIcebergNativeScanExec only supports InSubqueryExec for DPP" + false + } else if (multiIndexDpp) { + // See SPARK-46946 for context on multi-index DPP + fallbackReasons += + "Multi-index DPP (indices.length > 1) is not yet supported. " + + "See SPARK-46946 for context." + false + } else { + true + } + } + if (schemaSupported && fileIOCompatible && formatVersionSupported && allParquetFiles && allSupportedFilesystems && partitionTypesSupported && complexTypePredicatesSupported && transformFunctionsSupported && - deleteFileTypesSupported) { + deleteFileTypesSupported && dppSubqueriesSupported) { CometBatchScanExec( scanExec.clone().asInstanceOf[BatchScanExec], runtimeFilters = scanExec.runtimeFilters, diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 0ad82af8f8..957f621032 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -28,10 +28,11 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceRDD, DataSourceRDDPartition} import org.apache.spark.sql.types._ import org.apache.comet.ConfigEntry -import org.apache.comet.iceberg.IcebergReflection +import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} @@ -309,7 +310,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit contentScanTaskClass: Class[_], fileScanTaskClass: Class[_], taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, - icebergScanBuilder: OperatorOuterClass.IcebergScan.Builder, + commonBuilder: OperatorOuterClass.IcebergScanCommon.Builder, partitionTypeToPoolIndex: mutable.HashMap[String, Int], partitionSpecToPoolIndex: mutable.HashMap[String, Int], partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = { @@ -334,7 +335,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val specIdx = partitionSpecToPoolIndex.getOrElseUpdate( partitionSpecJson, { val idx = partitionSpecToPoolIndex.size - icebergScanBuilder.addPartitionSpecPool(partitionSpecJson) + commonBuilder.addPartitionSpecPool(partitionSpecJson) idx }) taskBuilder.setPartitionSpecIdx(specIdx) @@ -415,7 +416,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val typeIdx = partitionTypeToPoolIndex.getOrElseUpdate( partitionTypeJson, { val idx = partitionTypeToPoolIndex.size - icebergScanBuilder.addPartitionTypePool(partitionTypeJson) + commonBuilder.addPartitionTypePool(partitionTypeJson) idx }) taskBuilder.setPartitionTypeIdx(typeIdx) @@ -470,7 +471,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val partitionDataIdx = partitionDataToPoolIndex.getOrElseUpdate( partitionDataKey, { val idx = partitionDataToPoolIndex.size - icebergScanBuilder.addPartitionDataPool(partitionDataProto) + commonBuilder.addPartitionDataPool(partitionDataProto) idx }) taskBuilder.setPartitionDataIdx(partitionDataIdx) @@ -671,17 +672,59 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } /** - * Serializes a CometBatchScanExec wrapping an Iceberg SparkBatchQueryScan to protobuf. + * Converts a CometBatchScanExec to a minimal placeholder IcebergScan operator. * - * Uses pre-extracted metadata from CometScanRule to avoid redundant reflection operations. All - * reflection and validation was done during planning, so serialization failures here would - * indicate a programming error rather than an expected fallback condition. + * Returns a placeholder operator with only metadata_location for matching during partition + * injection. All other fields (catalog properties, required schema, pools, partition data) are + * set by serializePartitions() at execution time after DPP resolves. */ override def convert( scan: CometBatchScanExec, builder: Operator.Builder, childOp: Operator*): Option[OperatorOuterClass.Operator] = { + + val metadata = scan.nativeIcebergScanMetadata.getOrElse { + throw new IllegalStateException( + "Programming error: CometBatchScanExec.nativeIcebergScanMetadata is None. " + + "Metadata should have been extracted in CometScanRule.") + } + val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder() + val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() + + // Only set metadata_location - used for matching in PlanDataInjector. + // All other fields (catalog_properties, required_schema, pools) are set by + // serializePartitions() at execution time, so setting them here would be wasted work. + commonBuilder.setMetadataLocation(metadata.metadataLocation) + + icebergScanBuilder.setCommon(commonBuilder.build()) + // partition field intentionally empty - will be populated at execution time + + builder.clearChildren() + Some(builder.setIcebergScan(icebergScanBuilder).build()) + } + + /** + * Serializes partitions from inputRDD at execution time. + * + * Called after doPrepare() has resolved DPP subqueries. Builds pools and per-partition data in + * one pass from the DPP-filtered partitions. + * + * @param scanExec + * The BatchScanExec whose inputRDD contains the DPP-filtered partitions + * @param output + * The output attributes for the scan + * @param metadata + * Pre-extracted Iceberg metadata from CometScanRule + * @return + * Tuple of (commonBytes, perPartitionBytes) for native execution + */ + def serializePartitions( + scanExec: BatchScanExec, + output: Seq[Attribute], + metadata: CometIcebergNativeScanMetadata): (Array[Byte], Array[Array[Byte]]) = { + + val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() // Deduplication structures - map unique values to pool indices val schemaToPoolIndex = mutable.HashMap[AnyRef, Int]() @@ -689,300 +732,225 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val partitionSpecToPoolIndex = mutable.HashMap[String, Int]() val nameMappingToPoolIndex = mutable.HashMap[String, Int]() val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]() - val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64 bytes -> pool index + val partitionDataToPoolIndex = mutable.HashMap[String, Int]() val deleteFilesToPoolIndex = mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]() val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]() - var totalTasks = 0 + val perPartitionBuilders = mutable.ArrayBuffer[OperatorOuterClass.IcebergScan]() - // Get pre-extracted metadata from planning phase - // If metadata is None, this is a programming error - metadata should have been extracted - // in CometScanRule before creating CometBatchScanExec - val metadata = scan.nativeIcebergScanMetadata.getOrElse { - throw new IllegalStateException( - "Programming error: CometBatchScanExec.nativeIcebergScanMetadata is None. " + - "Metadata should have been extracted in CometScanRule.") - } - - // Use pre-extracted metadata (no reflection needed) - icebergScanBuilder.setMetadataLocation(metadata.metadataLocation) + var totalTasks = 0 + commonBuilder.setMetadataLocation(metadata.metadataLocation) metadata.catalogProperties.foreach { case (key, value) => - icebergScanBuilder.putCatalogProperties(key, value) + commonBuilder.putCatalogProperties(key, value) } - // Set required_schema from output - scan.output.foreach { attr => + output.foreach { attr => val field = SparkStructField .newBuilder() .setName(attr.name) .setNullable(attr.nullable) serializeDataType(attr.dataType).foreach(field.setDataType) - icebergScanBuilder.addRequiredSchema(field.build()) + commonBuilder.addRequiredSchema(field.build()) } - // Extract FileScanTasks from the InputPartitions in the RDD - try { - scan.wrapped.inputRDD match { - case rdd: org.apache.spark.sql.execution.datasources.v2.DataSourceRDD => - val partitions = rdd.partitions - partitions.foreach { partition => - val partitionBuilder = OperatorOuterClass.IcebergFilePartition.newBuilder() + // Load Iceberg classes once (avoid repeated class loading in loop) + // scalastyle:off classforname + val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) + val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) + val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) + val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) + val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) + // scalastyle:on classforname - val inputPartitions = partition - .asInstanceOf[org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition] - .inputPartitions + // Cache method lookups (avoid repeated getMethod in loop) + val fileMethod = contentScanTaskClass.getMethod("file") + val startMethod = contentScanTaskClass.getMethod("start") + val lengthMethod = contentScanTaskClass.getMethod("length") + val residualMethod = contentScanTaskClass.getMethod("residual") + val taskSchemaMethod = fileScanTaskClass.getMethod("schema") + val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) + toJsonMethod.setAccessible(true) + + // Access inputRDD - safe now, DPP is resolved + scanExec.inputRDD match { + case rdd: DataSourceRDD => + val partitions = rdd.partitions + partitions.foreach { partition => + val partitionBuilder = OperatorOuterClass.IcebergScan.newBuilder() + + val inputPartitions = partition + .asInstanceOf[DataSourceRDDPartition] + .inputPartitions + + inputPartitions.foreach { inputPartition => + val inputPartClass = inputPartition.getClass - inputPartitions.foreach { inputPartition => - val inputPartClass = inputPartition.getClass + try { + val taskGroupMethod = inputPartClass.getDeclaredMethod("taskGroup") + taskGroupMethod.setAccessible(true) + val taskGroup = taskGroupMethod.invoke(inputPartition) - try { - val taskGroupMethod = inputPartClass.getDeclaredMethod("taskGroup") - taskGroupMethod.setAccessible(true) - val taskGroup = taskGroupMethod.invoke(inputPartition) + val taskGroupClass = taskGroup.getClass + val tasksMethod = taskGroupClass.getMethod("tasks") + val tasksCollection = + tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]] - val taskGroupClass = taskGroup.getClass - val tasksMethod = taskGroupClass.getMethod("tasks") - val tasksCollection = - tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]] + tasksCollection.asScala.foreach { task => + totalTasks += 1 - tasksCollection.asScala.foreach { task => - totalTasks += 1 + val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder() - try { - val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder() - - // scalastyle:off classforname - val contentScanTaskClass = - Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) - val fileScanTaskClass = - Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val contentFileClass = - Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) - // scalastyle:on classforname - - val fileMethod = contentScanTaskClass.getMethod("file") - val dataFile = fileMethod.invoke(task) - - val filePathOpt = - IcebergReflection.extractFileLocation(contentFileClass, dataFile) - - filePathOpt match { - case Some(filePath) => - taskBuilder.setDataFilePath(filePath) - case None => - val msg = - "Iceberg reflection failure: Cannot extract file path from data file" - logError(msg) - throw new RuntimeException(msg) - } + val dataFile = fileMethod.invoke(task) - val startMethod = contentScanTaskClass.getMethod("start") - val start = startMethod.invoke(task).asInstanceOf[Long] - taskBuilder.setStart(start) - - val lengthMethod = contentScanTaskClass.getMethod("length") - val length = lengthMethod.invoke(task).asInstanceOf[Long] - taskBuilder.setLength(length) - - try { - // Equality deletes require the full table schema to resolve field IDs, - // even for columns not in the projection. Schema evolution requires - // using the snapshot's schema to correctly read old data files. - // These requirements conflict, so we choose based on delete presence. - - val taskSchemaMethod = fileScanTaskClass.getMethod("schema") - val taskSchema = taskSchemaMethod.invoke(task) - - val deletes = - IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - val hasDeletes = !deletes.isEmpty - - // Schema to pass to iceberg-rust's FileScanTask. - // This is used by RecordBatchTransformer for field type lookups (e.g., in - // constants_map) and default value generation. The actual projection is - // controlled by project_field_ids. - // - // Schema selection logic: - // 1. If hasDeletes=true: Use taskSchema (file-specific schema) because - // delete files reference specific schema versions and we need exact schema - // matching for MOR. - // 2. Else if scanSchema contains columns not in tableSchema: Use scanSchema - // because this is a VERSION AS OF query reading a historical snapshot with - // different schema (e.g., after column drop, scanSchema has old columns - // that tableSchema doesn't) - // 3. Else: Use tableSchema because scanSchema is the query OUTPUT schema - // (e.g., for aggregates like "SELECT count(*)", scanSchema only has - // aggregate fields and doesn't contain partition columns needed by - // constants_map) - val schema: AnyRef = - if (hasDeletes) { - taskSchema - } else { - // Check if scanSchema has columns that tableSchema doesn't have - // (VERSION AS OF case) - val scanSchemaFieldIds = IcebergReflection - .buildFieldIdMapping(metadata.scanSchema) - .values - .toSet - val tableSchemaFieldIds = IcebergReflection - .buildFieldIdMapping(metadata.tableSchema) - .values - .toSet - val hasHistoricalColumns = - scanSchemaFieldIds.exists(id => !tableSchemaFieldIds.contains(id)) - - if (hasHistoricalColumns) { - // VERSION AS OF: scanSchema has columns that current table doesn't have - metadata.scanSchema.asInstanceOf[AnyRef] - } else { - // Regular query: use tableSchema for partition field lookups - metadata.tableSchema.asInstanceOf[AnyRef] - } - } - - // scalastyle:off classforname - val schemaParserClass = - Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) - val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) - // scalastyle:on classforname - val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) - toJsonMethod.setAccessible(true) - - // Use object identity for deduplication: Iceberg Schema objects are immutable - // and reused across tasks, making identity-based deduplication safe - val schemaIdx = schemaToPoolIndex.getOrElseUpdate( - schema, { - val idx = schemaToPoolIndex.size - val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] - icebergScanBuilder.addSchemaPool(schemaJson) - idx - }) - taskBuilder.setSchemaIdx(schemaIdx) - - // Build field ID mapping from the schema we're using - val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema) - - // Extract project_field_ids for scan.output columns. - // For schema evolution: try task schema first, then fall back to - // global scan schema (pre-extracted in metadata). - val projectFieldIds = scan.output.flatMap { attr => - nameToFieldId - .get(attr.name) - .orElse(metadata.globalFieldIdMapping.get(attr.name)) - .orElse { - logWarning( - s"Column '${attr.name}' not found in task or scan schema," + - "skipping projection") - None - } - } - - // Deduplicate project field IDs - val projectFieldIdsIdx = projectFieldIdsToPoolIndex.getOrElseUpdate( - projectFieldIds, { - val idx = projectFieldIdsToPoolIndex.size - val listBuilder = OperatorOuterClass.ProjectFieldIdList.newBuilder() - projectFieldIds.foreach(id => listBuilder.addFieldIds(id)) - icebergScanBuilder.addProjectFieldIdsPool(listBuilder.build()) - idx - }) - taskBuilder.setProjectFieldIdsIdx(projectFieldIdsIdx) - } catch { - case e: Exception => - val msg = - "Iceberg reflection failure: " + - "Failed to extract schema from FileScanTask: " + - s"${e.getMessage}" - logError(msg) - throw new RuntimeException(msg, e) - } + val filePathOpt = + IcebergReflection.extractFileLocation(contentFileClass, dataFile) - // Deduplicate delete files - val deleteFilesList = - extractDeleteFilesList(task, contentFileClass, fileScanTaskClass) - if (deleteFilesList.nonEmpty) { - val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate( - deleteFilesList, { - val idx = deleteFilesToPoolIndex.size - val listBuilder = OperatorOuterClass.DeleteFileList.newBuilder() - deleteFilesList.foreach(df => listBuilder.addDeleteFiles(df)) - icebergScanBuilder.addDeleteFilesPool(listBuilder.build()) - idx - }) - taskBuilder.setDeleteFilesIdx(deleteFilesIdx) - } + filePathOpt match { + case Some(filePath) => + taskBuilder.setDataFilePath(filePath) + case None => + val msg = + "Iceberg reflection failure: Cannot extract file path from data file" + logError(msg) + throw new RuntimeException(msg) + } - // Extract and deduplicate residual expression - val residualExprOpt = - try { - val residualMethod = contentScanTaskClass.getMethod("residual") - val residualExpr = residualMethod.invoke(task) - - val catalystExpr = convertIcebergExpression(residualExpr, scan.output) - - catalystExpr.flatMap { expr => - exprToProto(expr, scan.output, binding = false) - } - } catch { - case e: Exception => - logWarning( - "Failed to extract residual expression from FileScanTask: " + - s"${e.getMessage}") - None - } - - residualExprOpt.foreach { residualExpr => - val residualIdx = residualToPoolIndex.getOrElseUpdate( - Some(residualExpr), { - val idx = residualToPoolIndex.size - icebergScanBuilder.addResidualPool(residualExpr) - idx - }) - taskBuilder.setResidualIdx(residualIdx) + val start = startMethod.invoke(task).asInstanceOf[Long] + taskBuilder.setStart(start) + + val length = lengthMethod.invoke(task).asInstanceOf[Long] + taskBuilder.setLength(length) + + val taskSchema = taskSchemaMethod.invoke(task) + + val deletes = + IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) + val hasDeletes = !deletes.isEmpty + + val schema: AnyRef = + if (hasDeletes) { + taskSchema + } else { + val scanSchemaFieldIds = IcebergReflection + .buildFieldIdMapping(metadata.scanSchema) + .values + .toSet + val tableSchemaFieldIds = IcebergReflection + .buildFieldIdMapping(metadata.tableSchema) + .values + .toSet + val hasHistoricalColumns = + scanSchemaFieldIds.exists(id => !tableSchemaFieldIds.contains(id)) + + if (hasHistoricalColumns) { + metadata.scanSchema.asInstanceOf[AnyRef] + } else { + metadata.tableSchema.asInstanceOf[AnyRef] } + } - // Serialize partition spec and data (field definitions, transforms, values) - serializePartitionData( - task, - contentScanTaskClass, - fileScanTaskClass, - taskBuilder, - icebergScanBuilder, - partitionTypeToPoolIndex, - partitionSpecToPoolIndex, - partitionDataToPoolIndex) - - // Deduplicate name mapping - metadata.nameMapping.foreach { nm => - val nmIdx = nameMappingToPoolIndex.getOrElseUpdate( - nm, { - val idx = nameMappingToPoolIndex.size - icebergScanBuilder.addNameMappingPool(nm) - idx - }) - taskBuilder.setNameMappingIdx(nmIdx) + val schemaIdx = schemaToPoolIndex.getOrElseUpdate( + schema, { + val idx = schemaToPoolIndex.size + val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] + commonBuilder.addSchemaPool(schemaJson) + idx + }) + taskBuilder.setSchemaIdx(schemaIdx) + + val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema) + + val projectFieldIds = output.flatMap { attr => + nameToFieldId + .get(attr.name) + .orElse(metadata.globalFieldIdMapping.get(attr.name)) + .orElse { + logWarning(s"Column '${attr.name}' not found in task or scan schema, " + + "skipping projection") + None } + } - partitionBuilder.addFileScanTasks(taskBuilder.build()) + val projectFieldIdsIdx = projectFieldIdsToPoolIndex.getOrElseUpdate( + projectFieldIds, { + val idx = projectFieldIdsToPoolIndex.size + val listBuilder = OperatorOuterClass.ProjectFieldIdList.newBuilder() + projectFieldIds.foreach(id => listBuilder.addFieldIds(id)) + commonBuilder.addProjectFieldIdsPool(listBuilder.build()) + idx + }) + taskBuilder.setProjectFieldIdsIdx(projectFieldIdsIdx) + + val deleteFilesList = + extractDeleteFilesList(task, contentFileClass, fileScanTaskClass) + if (deleteFilesList.nonEmpty) { + val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate( + deleteFilesList, { + val idx = deleteFilesToPoolIndex.size + val listBuilder = OperatorOuterClass.DeleteFileList.newBuilder() + deleteFilesList.foreach(df => listBuilder.addDeleteFiles(df)) + commonBuilder.addDeleteFilesPool(listBuilder.build()) + idx + }) + taskBuilder.setDeleteFilesIdx(deleteFilesIdx) + } + + val residualExprOpt = + try { + val residualExpr = residualMethod.invoke(task) + val catalystExpr = convertIcebergExpression(residualExpr, output) + catalystExpr.flatMap { expr => + exprToProto(expr, output, binding = false) + } + } catch { + case e: Exception => + logWarning( + "Failed to extract residual expression from FileScanTask: " + + s"${e.getMessage}") + None } + + residualExprOpt.foreach { residualExpr => + val residualIdx = residualToPoolIndex.getOrElseUpdate( + Some(residualExpr), { + val idx = residualToPoolIndex.size + commonBuilder.addResidualPool(residualExpr) + idx + }) + taskBuilder.setResidualIdx(residualIdx) + } + + serializePartitionData( + task, + contentScanTaskClass, + fileScanTaskClass, + taskBuilder, + commonBuilder, + partitionTypeToPoolIndex, + partitionSpecToPoolIndex, + partitionDataToPoolIndex) + + metadata.nameMapping.foreach { nm => + val nmIdx = nameMappingToPoolIndex.getOrElseUpdate( + nm, { + val idx = nameMappingToPoolIndex.size + commonBuilder.addNameMappingPool(nm) + idx + }) + taskBuilder.setNameMappingIdx(nmIdx) } + + partitionBuilder.addFileScanTasks(taskBuilder.build()) } } - - val builtPartition = partitionBuilder.build() - icebergScanBuilder.addFilePartitions(builtPartition) } - case _ => - } - } catch { - case e: Exception => - // CometScanRule already validated this scan should use native execution. - // Failure here is a programming error, not a graceful fallback scenario. - throw new IllegalStateException( - s"Native Iceberg scan serialization failed unexpectedly: ${e.getMessage}", - e) + + perPartitionBuilders += partitionBuilder.build() + } + case _ => + throw new IllegalStateException("Expected DataSourceRDD from BatchScanExec") } // Log deduplication summary @@ -999,7 +967,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val avgDedup = if (totalTasks == 0) { "0.0" } else { - // Filter out empty pools - they shouldn't count as 100% dedup val nonEmptyPools = allPoolSizes.filter(_ > 0) if (nonEmptyPools.isEmpty) { "0.0" @@ -1009,8 +976,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } - // Calculate partition data pool size in bytes (protobuf format) - val partitionDataPoolBytes = icebergScanBuilder.getPartitionDataPoolList.asScala + val partitionDataPoolBytes = commonBuilder.getPartitionDataPoolList.asScala .map(_.getSerializedSize) .sum @@ -1021,8 +987,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit s"$partitionDataPoolBytes bytes (protobuf)") } - builder.clearChildren() - Some(builder.setIcebergScan(icebergScanBuilder).build()) + val commonBytes = commonBuilder.build().toByteArray + val perPartitionBytes = perPartitionBuilders.map(_.toByteArray).toArray + + (commonBytes, perPartitionBytes) } override def createExec(nativeOp: Operator, op: CometBatchScanExec): CometNativeExec = { @@ -1035,10 +1003,11 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit "Metadata should have been extracted in CometScanRule.") } - // Extract metadataLocation from the native operator - val metadataLocation = nativeOp.getIcebergScan.getMetadataLocation + // Extract metadataLocation from the native operator's common data + val metadataLocation = nativeOp.getIcebergScan.getCommon.getMetadataLocation - // Create the CometIcebergNativeScanExec using the companion object's apply method + // Pass BatchScanExec reference for deferred serialization (DPP support) + // Serialization happens at execution time after doPrepare() resolves DPP subqueries CometIcebergNativeScanExec(nativeOp, op.wrapped, op.session, metadataLocation, metadata) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala index 2fd7f12c24..63a67e82f2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala @@ -19,39 +19,204 @@ package org.apache.spark.sql.comet -import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.rdd.{RDD, RDDOperationScope} +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.ScalarSubquery import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration + +import org.apache.comet.CometExecIterator +import org.apache.comet.serde.OperatorOuterClass + +/** + * Partition that carries per-partition planning data, avoiding closure capture of all partitions. + */ +private[spark] class CometExecPartition( + override val index: Int, + val inputPartitions: Array[Partition], + val planDataByKey: Map[String, Array[Byte]]) + extends Partition /** - * A RDD that executes Spark SQL query in Comet native execution to generate ColumnarBatch. + * Unified RDD for Comet native execution. + * + * Solves the closure capture problem: instead of capturing all partitions' data in the closure + * (which gets serialized to every task), each Partition object carries only its own data. + * + * Handles three cases: + * - With inputs + per-partition data: injects planning data into operator tree + * - With inputs + no per-partition data: just zips inputs (no injection overhead) + * - No inputs: uses numPartitions to create partitions + * + * NOTE: This RDD does not handle DPP (InSubqueryExec), which is resolved in + * CometIcebergNativeScanExec.serializedPartitionData before this RDD is created. It also handles + * ScalarSubquery expressions by registering them with CometScalarSubquery before execution. */ private[spark] class CometExecRDD( sc: SparkContext, - partitionNum: Int, - var f: (Seq[Iterator[ColumnarBatch]], Int, Int) => Iterator[ColumnarBatch]) - extends RDD[ColumnarBatch](sc, Nil) { + inputRDDs: Seq[RDD[ColumnarBatch]], + commonByKey: Map[String, Array[Byte]], + @transient perPartitionByKey: Map[String, Array[Array[Byte]]], + serializedPlan: Array[Byte], + defaultNumPartitions: Int, + numOutputCols: Int, + nativeMetrics: CometMetricNode, + subqueries: Seq[ScalarSubquery], + broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, + encryptedFilePaths: Seq[String] = Seq.empty) + extends RDD[ColumnarBatch](sc, inputRDDs.map(rdd => new OneToOneDependency(rdd))) { - override def compute(s: Partition, context: TaskContext): Iterator[ColumnarBatch] = { - f(Seq.empty, partitionNum, s.index) + // Determine partition count: from inputs if available, otherwise from parameter + private val numPartitions: Int = if (inputRDDs.nonEmpty) { + inputRDDs.head.partitions.length + } else if (perPartitionByKey.nonEmpty) { + perPartitionByKey.values.head.length + } else { + defaultNumPartitions } + // Validate all per-partition arrays have the same length to prevent + // ArrayIndexOutOfBoundsException in getPartitions (e.g., from broadcast scans with + // different partition counts after DPP filtering) + require( + perPartitionByKey.values.forall(_.length == numPartitions), + s"All per-partition arrays must have length $numPartitions, but found: " + + perPartitionByKey.map { case (key, arr) => s"$key -> ${arr.length}" }.mkString(", ")) + override protected def getPartitions: Array[Partition] = { - Array.tabulate(partitionNum)(i => - new Partition { - override def index: Int = i - }) + (0 until numPartitions).map { idx => + val inputParts = inputRDDs.map(_.partitions(idx)).toArray + val planData = perPartitionByKey.map { case (key, arr) => key -> arr(idx) } + new CometExecPartition(idx, inputParts, planData) + }.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val partition = split.asInstanceOf[CometExecPartition] + + val inputs = inputRDDs.zip(partition.inputPartitions).map { case (rdd, part) => + rdd.iterator(part, context) + } + + // Only inject if we have per-partition planning data + val actualPlan = if (commonByKey.nonEmpty) { + val basePlan = OperatorOuterClass.Operator.parseFrom(serializedPlan) + val injected = + PlanDataInjector.injectPlanData(basePlan, commonByKey, partition.planDataByKey) + PlanDataInjector.serializeOperator(injected) + } else { + serializedPlan + } + + val it = new CometExecIterator( + CometExec.newIterId, + inputs, + numOutputCols, + actualPlan, + nativeMetrics, + numPartitions, + partition.index, + broadcastedHadoopConfForEncryption, + encryptedFilePaths) + + // Register ScalarSubqueries so native code can look them up + subqueries.foreach(sub => CometScalarSubquery.setSubquery(it.id, sub)) + + Option(context).foreach { ctx => + ctx.addTaskCompletionListener[Unit] { _ => + it.close() + subqueries.foreach(sub => CometScalarSubquery.removeSubquery(it.id, sub)) + } + } + + it + } + + // Duplicates logic from Spark's ZippedPartitionsBaseRDD.getPreferredLocations + override def getPreferredLocations(split: Partition): Seq[String] = { + if (inputRDDs.isEmpty) return Nil + + val idx = split.index + val prefs = inputRDDs.map(rdd => rdd.preferredLocations(rdd.partitions(idx))) + // Prefer nodes where all inputs are local; fall back to any input's preferred location + val intersection = prefs.reduce((a, b) => a.intersect(b)) + if (intersection.nonEmpty) intersection else prefs.flatten.distinct } } object CometExecRDD { - def apply(sc: SparkContext, partitionNum: Int)( - f: (Seq[Iterator[ColumnarBatch]], Int, Int) => Iterator[ColumnarBatch]) - : RDD[ColumnarBatch] = - withScope(sc) { - new CometExecRDD(sc, partitionNum, f) - } - private[spark] def withScope[U](sc: SparkContext)(body: => U): U = - RDDOperationScope.withScope[U](sc)(body) + /** + * Creates an RDD for standalone Iceberg scan (no parent native operators). + */ + def apply( + sc: SparkContext, + commonData: Array[Byte], + perPartitionData: Array[Array[Byte]], + numOutputCols: Int, + nativeMetrics: CometMetricNode): CometExecRDD = { + + // Standalone mode needs a placeholder plan for PlanDataInjector to fill in. + // PlanDataInjector correlates common/partition data by key (metadata_location for Iceberg). + val common = OperatorOuterClass.IcebergScanCommon.parseFrom(commonData) + val metadataLocation = common.getMetadataLocation + + val placeholderCommon = OperatorOuterClass.IcebergScanCommon + .newBuilder() + .setMetadataLocation(metadataLocation) + .build() + val placeholderScan = OperatorOuterClass.IcebergScan + .newBuilder() + .setCommon(placeholderCommon) + .build() + val placeholderPlan = OperatorOuterClass.Operator + .newBuilder() + .setIcebergScan(placeholderScan) + .build() + .toByteArray + + new CometExecRDD( + sc, + inputRDDs = Seq.empty, + commonByKey = Map(metadataLocation -> commonData), + perPartitionByKey = Map(metadataLocation -> perPartitionData), + serializedPlan = placeholderPlan, + defaultNumPartitions = perPartitionData.length, + numOutputCols = numOutputCols, + nativeMetrics = nativeMetrics, + subqueries = Seq.empty) + } + + /** + * Creates an RDD for native execution with optional per-partition planning data. + */ + // scalastyle:off + def apply( + sc: SparkContext, + inputRDDs: Seq[RDD[ColumnarBatch]], + commonByKey: Map[String, Array[Byte]], + perPartitionByKey: Map[String, Array[Array[Byte]]], + serializedPlan: Array[Byte], + numPartitions: Int, + numOutputCols: Int, + nativeMetrics: CometMetricNode, + subqueries: Seq[ScalarSubquery], + broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, + encryptedFilePaths: Seq[String] = Seq.empty): CometExecRDD = { + // scalastyle:on + + new CometExecRDD( + sc, + inputRDDs, + commonByKey, + perPartitionByKey, + serializedPlan, + numPartitions, + numOutputCols, + nativeMetrics, + subqueries, + broadcastedHadoopConfForEncryption, + encryptedFilePaths) + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 223ae4fbb7..207d8555f0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -21,18 +21,23 @@ package org.apache.spark.sql.comet import scala.jdk.CollectionConverters._ +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, DynamicPruningExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.{InSubqueryExec, SubqueryAdaptiveBroadcastExec} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.AccumulatorV2 import com.google.common.base.Objects import org.apache.comet.iceberg.CometIcebergNativeScanMetadata import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.operator.CometIcebergNativeScan +import org.apache.comet.shims.ShimSubqueryBroadcast /** * Native Iceberg scan operator that delegates file reading to iceberg-rust. @@ -41,6 +46,10 @@ import org.apache.comet.serde.OperatorOuterClass.Operator * execution. Iceberg's catalog and planning run in Spark to produce FileScanTasks, which are * serialized to protobuf for the native side to execute using iceberg-rust's FileIO and * ArrowReader. This provides better performance than reading through Spark's abstraction layers. + * + * Supports Dynamic Partition Pruning (DPP) by deferring partition serialization to execution + * time. The doPrepare() method waits for DPP subqueries to resolve, then lazy + * serializedPartitionData serializes the DPP-filtered partitions from inputRDD. */ case class CometIcebergNativeScanExec( override val nativeOp: Operator, @@ -48,16 +57,128 @@ case class CometIcebergNativeScanExec( @transient override val originalPlan: BatchScanExec, override val serializedPlanOpt: SerializedPlan, metadataLocation: String, - numPartitions: Int, @transient nativeIcebergScanMetadata: CometIcebergNativeScanMetadata) - extends CometLeafExec { + extends CometLeafExec + with ShimSubqueryBroadcast { override val supportsColumnar: Boolean = true override val nodeName: String = "CometIcebergNativeScan" - override lazy val outputPartitioning: Partitioning = - UnknownPartitioning(numPartitions) + /** + * Prepare DPP subquery plans. Called by Spark's prepare() before doExecuteColumnar(). Only + * kicks off async work - doesn't wait for results (that happens in serializedPartitionData). + */ + override protected def doPrepare(): Unit = { + originalPlan.runtimeFilters.foreach { + case DynamicPruningExpression(e: InSubqueryExec) => + e.plan.prepare() + case _ => + } + super.doPrepare() + } + + /** + * Lazy partition serialization - computed after doPrepare() resolves DPP. + * + * DPP (Dynamic Partition Pruning) Flow: + * + * {{{ + * Planning time: + * CometIcebergNativeScanExec created + * - serializedPartitionData not evaluated (lazy) + * - No partition serialization yet + * + * Execution time: + * 1. Spark calls prepare() on the plan tree + * - doPrepare() calls e.plan.prepare() for each DPP filter + * - Broadcast exchange starts materializing + * + * 2. Spark calls doExecuteColumnar() + * - Accesses perPartitionData + * - Forces serializedPartitionData evaluation (here) + * - Waits for DPP values (updateResult or reflection) + * - Calls serializePartitions with DPP-filtered inputRDD + * - Only matching partitions are serialized + * }}} + */ + @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = { + // Ensure DPP subqueries are resolved before accessing inputRDD. + originalPlan.runtimeFilters.foreach { + case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty => + e.plan match { + case sab: SubqueryAdaptiveBroadcastExec => + // SubqueryAdaptiveBroadcastExec.executeCollect() throws, so we call + // child.executeCollect() directly. We use the index from SAB to find the + // right buildKey, then locate that key's column in child.output. + val rows = sab.child.executeCollect() + val indices = getSubqueryBroadcastIndices(sab) + + // SPARK-46946 changed index: Int to indices: Seq[Int] as a preparatory refactor + // for future features (Null Safe Equality DPP, multiple equality predicates). + // Currently indices always has one element. CometScanRule checks for multi-index + // DPP and falls back, so this assertion should never fail. + assert( + indices.length == 1, + s"Multi-index DPP not supported: indices=$indices. See SPARK-46946.") + val buildKeyIndex = indices.head + val buildKey = sab.buildKeys(buildKeyIndex) + + // Find column index in child.output by matching buildKey's exprId + val colIndex = buildKey match { + case attr: Attribute => + sab.child.output.indexWhere(_.exprId == attr.exprId) + // DPP may cast partition column to match join key type + case Cast(attr: Attribute, _, _, _) => + sab.child.output.indexWhere(_.exprId == attr.exprId) + case _ => buildKeyIndex + } + if (colIndex < 0) { + throw new IllegalStateException( + s"DPP build key '$buildKey' not found in ${sab.child.output.map(_.name)}") + } + + setInSubqueryResult(e, rows.map(_.get(colIndex, e.child.dataType))) + case _ => + e.updateResult() + } + case _ => + } + + CometIcebergNativeScan.serializePartitions(originalPlan, output, nativeIcebergScanMetadata) + } + + /** + * Sets InSubqueryExec's private result field via reflection. + * + * Reflection is required because: + * - SubqueryAdaptiveBroadcastExec.executeCollect() throws UnsupportedOperationException + * - InSubqueryExec has no public setter for result, only updateResult() which calls + * executeCollect() + * - We can't replace e.plan since it's a val + */ + private def setInSubqueryResult(e: InSubqueryExec, result: Array[_]): Unit = { + val fields = e.getClass.getDeclaredFields + // Field name is mangled by Scala compiler, e.g. "org$apache$...$InSubqueryExec$$result" + val resultField = fields + .find(f => f.getName.endsWith("$result") && !f.getName.contains("Broadcast")) + .getOrElse { + throw new IllegalStateException( + s"Cannot find 'result' field in ${e.getClass.getName}. " + + "Spark version may be incompatible with Comet's DPP implementation.") + } + resultField.setAccessible(true) + resultField.set(e, result) + } + + def commonData: Array[Byte] = serializedPartitionData._1 + def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2 + + // numPartitions for execution - derived from actual DPP-filtered partitions + // Only accessed during execution, not planning + def numPartitions: Int = perPartitionData.length + + override lazy val outputPartitioning: Partitioning = UnknownPartitioning(numPartitions) override lazy val outputOrdering: Seq[SortOrder] = Nil @@ -95,17 +216,26 @@ case class CometIcebergNativeScanExec( } } - private val capturedMetricValues: Seq[MetricValue] = { - originalPlan.metrics - .filterNot { case (name, _) => - // Filter out metrics that are now runtime metrics incremented on the native side - name == "numOutputRows" || name == "numDeletes" || name == "numSplits" - } - .map { case (name, metric) => - val mappedType = mapMetricType(name, metric.metricType) - MetricValue(name, metric.value, mappedType) - } - .toSeq + @transient private lazy val capturedMetricValues: Seq[MetricValue] = { + // Guard against null originalPlan (from doCanonicalize) + if (originalPlan == null) { + Seq.empty + } else { + // Force serializedPartitionData evaluation first - this triggers serializePartitions which + // accesses inputRDD, which triggers Iceberg planning and populates metrics + val _ = serializedPartitionData + + originalPlan.metrics + .filterNot { case (name, _) => + // Filter out metrics that are now runtime metrics incremented on the native side + name == "numOutputRows" || name == "numDeletes" || name == "numSplits" + } + .map { case (name, metric) => + val mappedType = mapMetricType(name, metric.metricType) + MetricValue(name, metric.value, mappedType) + } + .toSeq + } } /** @@ -146,62 +276,78 @@ case class CometIcebergNativeScanExec( baseMetrics ++ icebergMetrics + ("num_splits" -> numSplitsMetric) } + /** Executes using CometExecRDD - planning data is computed lazily on first access. */ + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val nativeMetrics = CometMetricNode.fromCometPlan(this) + CometExecRDD(sparkContext, commonData, perPartitionData, output.length, nativeMetrics) + } + + /** + * Override convertBlock to preserve @transient fields. The parent implementation uses + * makeCopy() which loses transient fields. + */ + override def convertBlock(): CometIcebergNativeScanExec = { + // Serialize the native plan if not already done + val newSerializedPlan = if (serializedPlanOpt.isEmpty) { + val bytes = CometExec.serializeNativePlan(nativeOp) + SerializedPlan(Some(bytes)) + } else { + serializedPlanOpt + } + + // Create new instance preserving transient fields + CometIcebergNativeScanExec( + nativeOp, + output, + originalPlan, + newSerializedPlan, + metadataLocation, + nativeIcebergScanMetadata) + } + override protected def doCanonicalize(): CometIcebergNativeScanExec = { CometIcebergNativeScanExec( nativeOp, output.map(QueryPlan.normalizeExpressions(_, output)), - originalPlan.doCanonicalize(), + null, // Don't need originalPlan for canonicalization SerializedPlan(None), metadataLocation, - numPartitions, - nativeIcebergScanMetadata) + null + ) // Don't need metadata for canonicalization } - override def stringArgs: Iterator[Any] = - Iterator(output, s"$metadataLocation, ${originalPlan.scan.description()}", numPartitions) + override def stringArgs: Iterator[Any] = { + // Use metadata task count to avoid triggering serializedPartitionData during planning + val hasMeta = nativeIcebergScanMetadata != null && nativeIcebergScanMetadata.tasks != null + val taskCount = if (hasMeta) nativeIcebergScanMetadata.tasks.size() else 0 + val scanDesc = if (originalPlan != null) originalPlan.scan.description() else "canonicalized" + // Include runtime filters (DPP) in string representation + val runtimeFiltersStr = if (originalPlan != null && originalPlan.runtimeFilters.nonEmpty) { + s", runtimeFilters=${originalPlan.runtimeFilters.mkString("[", ", ", "]")}" + } else { + "" + } + Iterator(output, s"$metadataLocation, $scanDesc$runtimeFiltersStr", taskCount) + } override def equals(obj: Any): Boolean = { obj match { case other: CometIcebergNativeScanExec => this.metadataLocation == other.metadataLocation && this.output == other.output && - this.serializedPlanOpt == other.serializedPlanOpt && - this.numPartitions == other.numPartitions + this.serializedPlanOpt == other.serializedPlanOpt case _ => false } } override def hashCode(): Int = - Objects.hashCode( - metadataLocation, - output.asJava, - serializedPlanOpt, - numPartitions: java.lang.Integer) + Objects.hashCode(metadataLocation, output.asJava, serializedPlanOpt) } object CometIcebergNativeScanExec { - /** - * Creates a CometIcebergNativeScanExec from a Spark BatchScanExec. - * - * Determines the number of partitions from Iceberg's output partitioning: - * - KeyGroupedPartitioning: Use Iceberg's partition count - * - Other cases: Use the number of InputPartitions from Iceberg's planning - * - * @param nativeOp - * The serialized native operator - * @param scanExec - * The original Spark BatchScanExec - * @param session - * The SparkSession - * @param metadataLocation - * Path to table metadata file - * @param nativeIcebergScanMetadata - * Pre-extracted Iceberg metadata from planning phase - * @return - * A new CometIcebergNativeScanExec - */ + /** Creates a CometIcebergNativeScanExec with deferred partition serialization. */ def apply( nativeOp: Operator, scanExec: BatchScanExec, @@ -209,21 +355,12 @@ object CometIcebergNativeScanExec { metadataLocation: String, nativeIcebergScanMetadata: CometIcebergNativeScanMetadata): CometIcebergNativeScanExec = { - // Determine number of partitions from Iceberg's output partitioning - val numParts = scanExec.outputPartitioning match { - case p: KeyGroupedPartitioning => - p.numPartitions - case _ => - scanExec.inputRDD.getNumPartitions - } - val exec = CometIcebergNativeScanExec( nativeOp, scanExec.output, scanExec, SerializedPlan(None), metadataLocation, - numParts, nativeIcebergScanMetadata) scanExec.logicalLink.foreach(exec.setLogicalLink) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala deleted file mode 100644 index fdf8bf393d..0000000000 --- a/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.comet - -import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.rdd.{RDD, RDDOperationScope, ZippedPartitionsBaseRDD, ZippedPartitionsPartition} -import org.apache.spark.sql.vectorized.ColumnarBatch - -/** - * Similar to Spark `ZippedPartitionsRDD[1-4]` classes, this class is used to zip partitions of - * the multiple RDDs into a single RDD. Spark `ZippedPartitionsRDD[1-4]` classes only support at - * most 4 RDDs. This class is used to support more than 4 RDDs. This ZipPartitionsRDD is used to - * zip the input sources of the Comet physical plan. So it only zips partitions of ColumnarBatch. - */ -private[spark] class ZippedPartitionsRDD( - sc: SparkContext, - var f: (Seq[Iterator[ColumnarBatch]], Int, Int) => Iterator[ColumnarBatch], - var zipRdds: Seq[RDD[ColumnarBatch]], - preservesPartitioning: Boolean = false) - extends ZippedPartitionsBaseRDD[ColumnarBatch](sc, zipRdds, preservesPartitioning) { - - // We need to get the number of partitions in `compute` but `getNumPartitions` is not available - // on the executors. So we need to capture it here. - private val numParts: Int = this.getNumPartitions - - override def compute(s: Partition, context: TaskContext): Iterator[ColumnarBatch] = { - val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions - val iterators = - zipRdds.zipWithIndex.map(pair => pair._1.iterator(partitions(pair._2), context)) - f(iterators, numParts, s.index) - } - - override def clearDependencies(): Unit = { - super.clearDependencies() - zipRdds = null - f = null - } -} - -object ZippedPartitionsRDD { - def apply(sc: SparkContext, rdds: Seq[RDD[ColumnarBatch]])( - f: (Seq[Iterator[ColumnarBatch]], Int, Int) => Iterator[ColumnarBatch]) - : RDD[ColumnarBatch] = - withScope(sc) { - new ZippedPartitionsRDD(sc, f, rdds) - } - - private[spark] def withScope[U](sc: SparkContext)(body: => U): U = - RDDOperationScope.withScope[U](sc)(body) -} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 6f33467efe..9fe5d730ca 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -25,7 +25,6 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ -import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -59,6 +58,126 @@ import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregat import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType} import org.apache.comet.serde.operator.CometSink +/** + * Trait for injecting per-partition planning data into operator nodes. + * + * Implementations handle specific operator types (e.g., Iceberg scans, Delta scans). + */ +private[comet] trait PlanDataInjector { + + /** Check if this injector can handle the given operator. */ + def canInject(op: Operator): Boolean + + /** Extract the key used to look up planning data for this operator. */ + def getKey(op: Operator): Option[String] + + /** Inject common + partition data into the operator node. */ + def inject(op: Operator, commonBytes: Array[Byte], partitionBytes: Array[Byte]): Operator +} + +/** + * Registry and utilities for injecting per-partition planning data into operator trees. + */ +private[comet] object PlanDataInjector { + + // Registry of injectors for different operator types + private val injectors: Seq[PlanDataInjector] = Seq( + IcebergPlanDataInjector + // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc. + ) + + /** + * Injects planning data into an Operator tree by finding nodes that need injection and applying + * the appropriate injector. + * + * Supports joins over multiple tables by matching each operator with its corresponding data + * based on a key (e.g., metadata_location for Iceberg). + */ + def injectPlanData( + op: Operator, + commonByKey: Map[String, Array[Byte]], + partitionByKey: Map[String, Array[Byte]]): Operator = { + val builder = op.toBuilder + + // Try each injector to see if it can handle this operator + for (injector <- injectors if injector.canInject(op)) { + injector.getKey(op) match { + case Some(key) => + (commonByKey.get(key), partitionByKey.get(key)) match { + case (Some(commonBytes), Some(partitionBytes)) => + val injectedOp = injector.inject(op, commonBytes, partitionBytes) + // Copy the injected operator's fields to our builder + builder.clear() + builder.mergeFrom(injectedOp) + case _ => + throw new CometRuntimeException(s"Missing planning data for key: $key") + } + case None => // No key, skip injection + } + } + + // Recursively process children + builder.clearChildren() + op.getChildrenList.asScala.foreach { child => + builder.addChildren(injectPlanData(child, commonByKey, partitionByKey)) + } + + builder.build() + } + + def serializeOperator(op: Operator): Array[Byte] = { + val size = op.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + op.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + bytes + } +} + +/** + * Injector for Iceberg scan operators. + */ +private[comet] object IcebergPlanDataInjector extends PlanDataInjector { + import java.nio.ByteBuffer + import java.util.concurrent.ConcurrentHashMap + + // Cache parsed IcebergScanCommon by content to avoid repeated deserialization + // ByteBuffer wrapper provides content-based equality and hashCode + // TODO: This is a static singleton on the executor, should we cap the size (proper LRU cache?) + private val commonCache = + new ConcurrentHashMap[ByteBuffer, OperatorOuterClass.IcebergScanCommon]() + + override def canInject(op: Operator): Boolean = + op.hasIcebergScan && + op.getIcebergScan.getFileScanTasksCount == 0 && + op.getIcebergScan.hasCommon + + override def getKey(op: Operator): Option[String] = + Some(op.getIcebergScan.getCommon.getMetadataLocation) + + override def inject( + op: Operator, + commonBytes: Array[Byte], + partitionBytes: Array[Byte]): Operator = { + val scan = op.getIcebergScan + + // Cache the parsed common data to avoid deserializing on every partition + val cacheKey = ByteBuffer.wrap(commonBytes) + val common = commonCache.computeIfAbsent( + cacheKey, + _ => OperatorOuterClass.IcebergScanCommon.parseFrom(commonBytes)) + + val tasksOnly = OperatorOuterClass.IcebergScan.parseFrom(partitionBytes) + + val scanBuilder = scan.toBuilder + scanBuilder.setCommon(common) + scanBuilder.addAllFileScanTasks(tasksOnly.getFileScanTasksList) + + op.toBuilder.setIcebergScan(scanBuilder).build() + } +} + /** * A Comet physical operator */ @@ -105,6 +224,15 @@ abstract class CometExec extends CometPlan { } } } + + /** Collects all ScalarSubquery expressions from a plan tree. */ + protected def collectSubqueries(sparkPlan: SparkPlan): Seq[ScalarSubquery] = { + val childSubqueries = sparkPlan.children.flatMap(collectSubqueries) + val planSubqueries = sparkPlan.expressions.flatMap { + _.collect { case sub: ScalarSubquery => sub } + } + childSubqueries ++ planSubqueries + } } object CometExec { @@ -290,32 +418,8 @@ abstract class CometNativeExec extends CometExec { case None => (None, Seq.empty) } - def createCometExecIter( - inputs: Seq[Iterator[ColumnarBatch]], - numParts: Int, - partitionIndex: Int): CometExecIterator = { - val it = new CometExecIterator( - CometExec.newIterId, - inputs, - output.length, - serializedPlanCopy, - nativeMetrics, - numParts, - partitionIndex, - broadcastedHadoopConfForEncryption, - encryptedFilePaths) - - setSubqueries(it.id, this) - - Option(TaskContext.get()).foreach { context => - context.addTaskCompletionListener[Unit] { _ => - it.close() - cleanSubqueries(it.id, this) - } - } - - it - } + // Find planning data within this stage (stops at shuffle boundaries). + val (commonByKey, perPartitionByKey) = findAllPlanData(this) // Collect the input ColumnarBatches from the child operators and create a CometExecIterator // to execute the native plan. @@ -395,12 +499,20 @@ abstract class CometNativeExec extends CometExec { throw new CometRuntimeException(s"No input for CometNativeExec:\n $this") } - if (inputs.nonEmpty) { - ZippedPartitionsRDD(sparkContext, inputs.toSeq)(createCometExecIter) - } else { - val partitionNum = firstNonBroadcastPlanNumPartitions - CometExecRDD(sparkContext, partitionNum)(createCometExecIter) - } + // Unified RDD creation - CometExecRDD handles all cases + val subqueries = collectSubqueries(this) + CometExecRDD( + sparkContext, + inputs.toSeq, + commonByKey, + perPartitionByKey, + serializedPlanCopy, + firstNonBroadcastPlanNumPartitions, + output.length, + nativeMetrics, + subqueries, + broadcastedHadoopConfForEncryption, + encryptedFilePaths) } } @@ -440,6 +552,49 @@ abstract class CometNativeExec extends CometExec { } } + /** + * Find all plan nodes with per-partition planning data in the plan tree. Returns two maps keyed + * by a unique identifier: one for common data (shared across partitions) and one for + * per-partition data. + * + * Currently supports Iceberg scans (keyed by metadata_location). Additional scan types can be + * added by extending this method. + * + * Stops at stage boundaries (shuffle exchanges, etc.) because partition indices are only valid + * within the same stage. + * + * @return + * (commonByKey, perPartitionByKey) - common data is shared, per-partition varies + */ + private def findAllPlanData( + plan: SparkPlan): (Map[String, Array[Byte]], Map[String, Array[Array[Byte]]]) = { + plan match { + // Found an Iceberg scan with planning data + case iceberg: CometIcebergNativeScanExec + if iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty => + ( + Map(iceberg.metadataLocation -> iceberg.commonData), + Map(iceberg.metadataLocation -> iceberg.perPartitionData)) + + // Broadcast stages are boundaries - don't collect per-partition data from inside them. + // After DPP filtering, broadcast scans may have different partition counts than the + // probe side, causing ArrayIndexOutOfBoundsException in CometExecRDD.getPartitions. + case _: BroadcastQueryStageExec | _: CometBroadcastExchangeExec => + (Map.empty, Map.empty) + + // Stage boundaries - stop searching (partition indices won't align after these) + case _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | + _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | + _: ReusedExchangeExec | _: CometSparkToColumnarExec => + (Map.empty, Map.empty) + + // Continue searching through other operators, combining results from all children + case _ => + val results = plan.children.map(findAllPlanData) + (results.flatMap(_._1).toMap, results.flatMap(_._2).toMap) + } + } + /** * Converts this native Comet operator and its children into a native block which can be * executed as a whole (i.e., in a single JNI call) from the native side. diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/ShimSubqueryBroadcast.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimSubqueryBroadcast.scala new file mode 100644 index 0000000000..1ff0935041 --- /dev/null +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimSubqueryBroadcast.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.execution.SubqueryAdaptiveBroadcastExec + +trait ShimSubqueryBroadcast { + + /** + * Gets the build key indices from SubqueryAdaptiveBroadcastExec. Spark 3.x has `index: Int`, + * Spark 4.x has `indices: Seq[Int]`. + */ + def getSubqueryBroadcastIndices(sab: SubqueryAdaptiveBroadcastExec): Seq[Int] = { + Seq(sab.index) + } +} diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSubqueryBroadcast.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSubqueryBroadcast.scala new file mode 100644 index 0000000000..1ff0935041 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSubqueryBroadcast.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.execution.SubqueryAdaptiveBroadcastExec + +trait ShimSubqueryBroadcast { + + /** + * Gets the build key indices from SubqueryAdaptiveBroadcastExec. Spark 3.x has `index: Int`, + * Spark 4.x has `indices: Seq[Int]`. + */ + def getSubqueryBroadcastIndices(sab: SubqueryAdaptiveBroadcastExec): Seq[Int] = { + Seq(sab.index) + } +} diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSubqueryBroadcast.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSubqueryBroadcast.scala new file mode 100644 index 0000000000..417dfd46b7 --- /dev/null +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSubqueryBroadcast.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.execution.SubqueryAdaptiveBroadcastExec + +trait ShimSubqueryBroadcast { + + /** + * Gets the build key indices from SubqueryAdaptiveBroadcastExec. Spark 3.x has `index: Int`, + * Spark 4.x has `indices: Seq[Int]`. + */ + def getSubqueryBroadcastIndices(sab: SubqueryAdaptiveBroadcastExec): Seq[Int] = { + sab.indices + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index f3c8a8b2a6..30521dbad7 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -2295,6 +2295,84 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + test("runtime filtering - multiple DPP filters on two partition columns") { + assume(icebergAvailable, "Iceberg not available") + withTempIcebergDir { warehouseDir => + val dimDir = new File(warehouseDir, "dim_parquet") + withSQLConf( + "spark.sql.catalog.runtime_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.runtime_cat.type" -> "hadoop", + "spark.sql.catalog.runtime_cat.warehouse" -> warehouseDir.getAbsolutePath, + "spark.sql.autoBroadcastJoinThreshold" -> "1KB", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by TWO columns: (data, bucket(8, id)) + // This mimics Iceberg's testMultipleRuntimeFilters + spark.sql(""" + CREATE TABLE runtime_cat.db.multi_dpp_fact ( + id BIGINT, + data STRING, + date DATE, + ts TIMESTAMP + ) USING iceberg + PARTITIONED BY (data, bucket(8, id)) + """) + + // Insert data - 99 rows with varying data and id values + val df = spark + .range(1, 100) + .selectExpr( + "id", + "CAST(DATE_ADD(DATE '1970-01-01', CAST(id % 4 AS INT)) AS STRING) as data", + "DATE_ADD(DATE '1970-01-01', CAST(id % 4 AS INT)) as date", + "CAST(DATE_ADD(DATE '1970-01-01', CAST(id % 4 AS INT)) AS TIMESTAMP) as ts") + df.coalesce(1) + .write + .format("iceberg") + .option("fanout-enabled", "true") + .mode("append") + .saveAsTable("runtime_cat.db.multi_dpp_fact") + + // Create dimension table with specific id=1, data='1970-01-02' + spark + .createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02"), "1970-01-02"))) + .toDF("id", "date", "data") + .write + .parquet(dimDir.getAbsolutePath) + spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("dim") + + // Join on BOTH partition columns - this creates TWO DPP filters + val query = + """SELECT /*+ BROADCAST(d) */ f.* + |FROM runtime_cat.db.multi_dpp_fact f + |JOIN dim d ON f.id = d.id AND f.data = d.data + |WHERE d.date = DATE '1970-01-02'""".stripMargin + + // Verify plan has 2 dynamic pruning expressions + val df2 = spark.sql(query) + val planStr = df2.queryExecution.executedPlan.toString + // Count "dynamicpruningexpression(" to avoid matching "dynamicpruning#N" references + val dppCount = "dynamicpruningexpression\\(".r.findAllIn(planStr).length + assert(dppCount == 2, s"Expected 2 DPP expressions but found $dppCount in:\n$planStr") + + // Verify native Iceberg scan is used and DPP actually pruned partitions + val (_, cometPlan) = checkSparkAnswer(query) + val icebergScans = collectIcebergNativeScans(cometPlan) + assert( + icebergScans.nonEmpty, + s"Expected CometIcebergNativeScanExec but found none. Plan:\n$cometPlan") + // With 4 data values x 8 buckets = up to 32 partitions total + // DPP on (data='1970-01-02', bucket(id=1)) should prune to 1 + val numPartitions = icebergScans.head.numPartitions + assert(numPartitions == 1, s"Expected DPP to prune to 1 partition but got $numPartitions") + + spark.sql("DROP TABLE runtime_cat.db.multi_dpp_fact") + } + } + } + test("runtime filtering - join with dynamic partition pruning") { assume(icebergAvailable, "Iceberg not available") withTempIcebergDir { warehouseDir => @@ -2303,11 +2381,14 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { "spark.sql.catalog.runtime_cat" -> "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.runtime_cat.type" -> "hadoop", "spark.sql.catalog.runtime_cat.warehouse" -> warehouseDir.getAbsolutePath, + // Prevent fact table from being broadcast (force dimension to be broadcast) + "spark.sql.autoBroadcastJoinThreshold" -> "1KB", CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { - // Create partitioned Iceberg table (fact table) + // Create partitioned Iceberg table (fact table) with 3 partitions + // Add enough data to prevent broadcast spark.sql(""" CREATE TABLE runtime_cat.db.fact_table ( id BIGINT, @@ -2323,7 +2404,11 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { (1, 'a', DATE '1970-01-01'), (2, 'b', DATE '1970-01-02'), (3, 'c', DATE '1970-01-02'), - (4, 'd', DATE '1970-01-03') + (4, 'd', DATE '1970-01-03'), + (5, 'e', DATE '1970-01-01'), + (6, 'f', DATE '1970-01-02'), + (7, 'g', DATE '1970-01-03'), + (8, 'h', DATE '1970-01-01') """) // Create dimension table (Parquet) in temp directory @@ -2335,8 +2420,9 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("dim") // This join should trigger dynamic partition pruning + // Use BROADCAST hint to force dimension table to be broadcast val query = - """SELECT f.* FROM runtime_cat.db.fact_table f + """SELECT /*+ BROADCAST(d) */ f.* FROM runtime_cat.db.fact_table f |JOIN dim d ON f.date = d.date AND d.id = 1 |ORDER BY f.id""".stripMargin @@ -2348,13 +2434,17 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { planStr.contains("dynamicpruning"), s"Expected dynamic pruning in plan but got:\n$planStr") - // Check results match Spark - // Note: AQE re-plans after subquery executes, converting dynamicpruningexpression(...) - // to dynamicpruningexpression(true), which allows native Iceberg scan to proceed. - // This is correct behavior - no actual subquery to wait for after AQE re-planning. - // However, the rest of the still contains non-native operators because CometExecRule - // doesn't run again. - checkSparkAnswer(df) + // Should now use native Iceberg scan with DPP + checkIcebergNativeScan(query) + + // Verify DPP actually pruned partitions (should only scan 1 of 3 partitions) + val (_, cometPlan) = checkSparkAnswer(query) + val icebergScans = collectIcebergNativeScans(cometPlan) + assert( + icebergScans.nonEmpty, + s"Expected CometIcebergNativeScanExec but found none. Plan:\n$cometPlan") + val numPartitions = icebergScans.head.numPartitions + assert(numPartitions == 1, s"Expected DPP to prune to 1 partition but got $numPartitions") spark.sql("DROP TABLE runtime_cat.db.fact_table") }