From 87869dd5275d2cd7862abed4eb8fabe9704e3474 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Wed, 15 Apr 2026 19:40:05 +0800 Subject: [PATCH 1/6] [spark] Push down variant_get into Paimon shredded Variant scan --- .../apache/paimon/spark/sql/VariantTest.scala | 101 ++++++++++ .../org/apache/paimon/spark/PaimonScan.scala | 6 +- .../PaimonSparkSessionExtensions.scala | 22 +++ .../apache/paimon/spark/read/BaseScan.scala | 22 ++- .../spark/sql/paimon/shims/SparkShim.scala | 3 + .../PaimonLambdaFunctionfunction_test.java | 1 + .../paimon/spark/sql/VariantTestBase.scala | 78 ++++++++ .../optimizer/PushDownVariantExtract.scala | 173 ++++++++++++++++++ .../spark/sql/paimon/shims/Spark4Shim.scala | 5 + 9 files changed, 408 insertions(+), 3 deletions(-) create mode 100644 paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java create mode 100644 paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala index 94e9ac683f02..b232c3f7b4fa 100644 --- a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala @@ -18,7 +18,14 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.spark.PaimonSparkTestBase + import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GetStructField +import org.apache.spark.sql.catalyst.expressions.variant.VariantGet +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.types.{StructType, VariantType} class VariantTest extends VariantTestBase { override protected def sparkConf: SparkConf = { @@ -31,3 +38,97 @@ class VariantInferShreddingTest extends VariantTestBase { super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true") } } + +/** + * Spark 4-specific plan-shape tests for the PushDownVariantExtract optimizer rule. + * + * These tests verify the rewrite at the Catalyst expression level: VariantGet(col, Literal(path), + * targetType) → GetStructField(col, ordinal) + * + * and confirm that the variant column's output type in the scan relation changes from VariantType + * to StructType after the pushdown, which is the evidence that Parquet column pruning will kick in + * at read time. + */ +class VariantPushDownPlanTest extends PaimonSparkTestBase { + + override protected def sparkConf: SparkConf = + super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true") + + // Explicit 3-field shredding schema used across all tests in this class. + private val shreddedSchema3: String = + """{"type":"ROW","fields":[{"name":"v","type":{"type":"ROW","fields":[""" + + """{"name":"age","type":"INT"},""" + + """{"name":"city","type":"STRING"},""" + + """{"name":"score","type":"DOUBLE"}""" + + """]}}]}""" + + test("Paimon Variant: VariantGet is replaced by GetStructField in optimized plan") { + sql(s""" + |CREATE TABLE T (id INT, v VARIANT) + |TBLPROPERTIES ('parquet.variant.shreddingSchema' = '$shreddedSchema3') + |""".stripMargin) + sql(""" + |INSERT INTO T VALUES + | (1, parse_json('{"age":26,"city":"Beijing","score":9.5}')), + | (2, parse_json('{"age":27,"city":"Hangzhou","score":8.0}')) + |""".stripMargin) + + val q = + "SELECT variant_get(v, '$.age', 'int'), variant_get(v, '$.score', 'double') FROM T" + checkAnswer(sql(q), Seq(Row(26, 9.5d), Row(27, 8.0d))) + + val projectExprs = sql(q).queryExecution.optimizedPlan + .collectFirst { case p: Project => p } + .get + .projectList + + // After pushdown, no VariantGet should remain in the top-level project list. + assert( + !projectExprs.exists(_.exists(_.isInstanceOf[VariantGet])), + "VariantGet should have been replaced by GetStructField after PushDownVariantExtract") + + // GetStructField nodes must now be present in its place. + assert( + projectExprs.exists(_.exists(_.isInstanceOf[GetStructField])), + "expected GetStructField to appear in the optimized project list") + } + + test("Paimon Variant: scan output type changes from VariantType to StructType after pushdown") { + sql(s""" + |CREATE TABLE T (id INT, v VARIANT) + |TBLPROPERTIES ('parquet.variant.shreddingSchema' = '$shreddedSchema3') + |""".stripMargin) + sql("INSERT INTO T VALUES (1, parse_json('{\"age\":26,\"city\":\"Beijing\",\"score\":9.5}'))") + + // Without pushdown (direct reference): the variant column stays VariantType in the scan. + val qFull = "SELECT v FROM T" + val fullOutput = sql(qFull).queryExecution.optimizedPlan + .collectFirst { + case r: org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation => r + } + .get + .output + val variantColFull = fullOutput.find(_.name == "v").get + assert( + variantColFull.dataType == VariantType, + "without pushdown, the scan output type for 'v' should remain VariantType") + + // With pushdown (only VariantGet accesses): the variant column becomes a StructType + // whose fields correspond only to the accessed sub-columns (age + score, not city). + val qPushed = + "SELECT variant_get(v, '$.age', 'int'), variant_get(v, '$.score', 'double') FROM T" + val pushedOutput = sql(qPushed).queryExecution.optimizedPlan + .collectFirst { + case r: org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation => r + } + .get + .output + val variantColPushed = pushedOutput.find(_.name == "v").get + assert( + variantColPushed.dataType.isInstanceOf[StructType], + "after pushdown, the scan output type for 'v' should be StructType (shredded sub-schema)") + assert( + variantColPushed.dataType.asInstanceOf[StructType].length == 2, + "the projected StructType should contain exactly 2 fields (age, score), not all 3") + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala index 0fa4d73054e4..46195d8872cf 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala @@ -24,6 +24,7 @@ import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, VectorSearc import org.apache.paimon.spark.commands.BucketExpression.quote import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable} import org.apache.paimon.table.source.{DataSplit, Split} +import org.apache.paimon.types.RowType import org.apache.spark.sql.PaimonUtils.fieldReference import org.apache.spark.sql.connector.expressions._ @@ -43,11 +44,14 @@ case class PaimonScan( override val pushedTopN: Option[TopN], override val pushedVectorSearch: Option[VectorSearch], override val pushedFullTextSearch: Option[FullTextSearch] = None, - bucketedScanDisabled: Boolean = false) + bucketedScanDisabled: Boolean = false, + variantProjections: Map[String, RowType] = Map.empty) extends PaimonBaseScan(table) with SupportsReportPartitioning with SupportsReportOrdering { + override protected def variantProjectionMap: Map[String, RowType] = variantProjections + def disableBucketedScan(): PaimonScan = { copy(bucketedScanDisabled = true) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index 950b5797c7b4..ee1837827381 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -26,6 +26,8 @@ import org.apache.paimon.spark.execution.{OldCompatibleStrategy, PaimonStrategy} import org.apache.paimon.spark.execution.adaptive.DisableUnnecessaryPaimonBucketedScan import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.paimon.shims.SparkShimLoader /** Spark session extension to extends the syntax and adds the rules. */ @@ -66,6 +68,26 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { // optimization rules extensions.injectOptimizerRule(_ => OptimizeMetadataOnlyDeleteFromPaimonTable) extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries) + SparkShimLoader.shim.variantExtractRule().foreach { + rule => + // PushDownVariantExtract must run AFTER V2ScanRelationPushDown converts + // DataSourceV2Relation to DataSourceV2ScanRelation in the "Early Filter and Projection + // Push-Down" batch. injectOptimizerRule places rules in the "Operator Optimization" batch + // (part of super.defaultBatches), which runs BEFORE the scan push-down. The only batch + // that runs after scan building is "User Provided Optimizers", populated via + // experimentalMethods.extraOptimizations. We register there via a side effect and return + // a no-op placeholder for the injectOptimizerRule slot. + extensions.injectOptimizerRule { + session => + if (!session.experimental.extraOptimizations.exists(_ eq rule)) { + session.experimental.extraOptimizations = + session.experimental.extraOptimizations :+ rule + } + new Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan + } + } + } // planner extensions extensions.injectPlannerStrategy(spark => PaimonStrategy(spark)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala index 3237475d69e0..66065946fd06 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala @@ -78,13 +78,31 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging { } } - private[paimon] val (readTableRowType, metadataFields) = { + /** Hook for subclasses to provide variant column projections (colName -> VariantRowType). */ + protected def variantProjectionMap: Map[String, RowType] = Map.empty + + private[paimon] lazy val (readTableRowType, metadataFields) = { requiredSchema.fields.foreach(f => checkMetadataColumn(f.name)) val (_requiredTableFields, _metadataFields) = requiredSchema.fields.partition(field => tableRowType.containsField(field.name)) val _readTableRowType = SparkTypeUtils.prunePaimonRowType(StructType(_requiredTableFields), tableRowType) - (_readTableRowType, _metadataFields) + val _finalReadType = applyVariantProjections(_readTableRowType, variantProjectionMap) + (_finalReadType, _metadataFields) + } + + private def applyVariantProjections( + rowType: RowType, + projections: Map[String, RowType]): RowType = { + if (projections.isEmpty) return rowType + val newFields = rowType.getFields.asScala.map { + field => + projections.get(field.name()) match { + case Some(variantRowType) => field.newType(variantRowType) + case None => field + } + } + rowType.copy(newFields.asJava) } private def checkMetadataColumn(fieldName: String): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index d85fd9a42790..1cb2e0c9f88c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -88,6 +88,9 @@ trait SparkShim { notMatchedBySourceActions: Seq[MergeAction], withSchemaEvolution: Boolean): MergeIntoTable + // for variant_get pushdown (Spark 4 only; returns None on Spark 3) + def variantExtractRule(): Option[Rule[LogicalPlan]] = None + // for variant def toPaimonVariant(o: Object): Variant diff --git a/paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java b/paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java new file mode 100644 index 000000000000..65f2ab9d9fa4 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java @@ -0,0 +1 @@ +public class PaimonLambdaFunctionfunction_test { public static java.lang.Long apply(Integer length, Integer width){ return (long) length * width; } } \ No newline at end of file diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala index e7c3e7a2f4ff..de5c2892250d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala @@ -982,4 +982,82 @@ abstract class VariantTestBase extends PaimonSparkTestBase { Seq(Row(2, 2)) ) } + + // ---- variant_get pushdown tests ---- + // The shreddingSchema below declares 3 sub-fields (age, city, score) for column v. + // Queries that only touch a subset of them via variant_get should cause the optimizer + // to project only the needed sub-columns into the scan (PushDownVariantExtract rule). + private val shreddedSchema3 = + """{"type":"ROW","fields":[{"name":"v","type":{"type":"ROW","fields":[""" + + """{"name":"age","type":"INT"},""" + + """{"name":"city","type":"STRING"},""" + + """{"name":"score","type":"DOUBLE"}""" + + """]}}]}""" + + test("Paimon Variant: variant_get pushdown reduces projected sub-columns for shredded variant") { + sql(s""" + |CREATE TABLE T (id INT, v VARIANT) + |TBLPROPERTIES ('parquet.variant.shreddingSchema' = '$shreddedSchema3') + |""".stripMargin) + sql(""" + |INSERT INTO T VALUES + | (1, parse_json('{"age":26,"city":"Beijing","score":9.5}')), + | (2, parse_json('{"age":27,"city":"Hangzhou","score":8.0}')) + |""".stripMargin) + + // Query only uses 2 of 3 shredded fields (age + score, city is intentionally skipped). + val q = + "SELECT variant_get(v, '$.age', 'int'), variant_get(v, '$.score', 'double') FROM T ORDER BY id" + checkAnswer(sql(q), Seq(Row(26, 9.5d), Row(27, 8.0d))) + + // Performance assertion: the scan should carry a projection with exactly 2 sub-fields, + // not all 3. Fewer projected sub-fields means fewer Parquet column reads at runtime. + val scan = getPaimonScan(q) + assert( + scan.variantProjections.contains("v"), + "PushDownVariantExtract should have populated variantProjections for column 'v'") + assert( + scan.variantProjections("v").getFieldCount == 2, + s"expected 2 projected sub-fields (age, score) out of 3 total, " + + s"got ${scan.variantProjections("v").getFieldCount}" + ) + } + + test("Paimon Variant: variant_get pushdown does not fire when variant column is read directly") { + sql(s""" + |CREATE TABLE T (id INT, v VARIANT) + |TBLPROPERTIES ('parquet.variant.shreddingSchema' = '$shreddedSchema3') + |""".stripMargin) + sql("INSERT INTO T VALUES (1, parse_json('{\"age\":26,\"city\":\"Beijing\",\"score\":9.5}'))") + + // Direct reference to the full variant column must prevent pushdown for that column. + val q = "SELECT v, variant_get(v, '$.age', 'int') FROM T" + checkAnswer( + sql(q), + sql("SELECT parse_json('{\"age\":26,\"city\":\"Beijing\",\"score\":9.5}'), 26")) + + val scan = getPaimonScan(q) + assert( + scan.variantProjections.isEmpty, + "pushdown must NOT fire when the variant column itself is projected") + } + + test("Paimon Variant: variant_get pushdown deduplicates repeated access to the same path") { + sql(s""" + |CREATE TABLE T (id INT, v VARIANT) + |TBLPROPERTIES ('parquet.variant.shreddingSchema' = '$shreddedSchema3') + |""".stripMargin) + sql("INSERT INTO T VALUES (1, parse_json('{\"age\":26,\"city\":\"Beijing\",\"score\":9.5}'))") + + // The same path accessed twice should still map to a single sub-column in the projection. + val q = + "SELECT variant_get(v, '$.age', 'int') AS a1, variant_get(v, '$.age', 'int') AS a2 FROM T" + checkAnswer(sql(q), Seq(Row(26, 26))) + + val scan = getPaimonScan(q) + assert(scan.variantProjections.contains("v")) + assert( + scan.variantProjections("v").getFieldCount == 1, + "two accesses to the same path should deduplicate to a single projected sub-column") + } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala new file mode 100644 index 000000000000..f762ed107af2 --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.optimizer + +import org.apache.paimon.data.variant.VariantMetadataUtils +import org.apache.paimon.spark.{PaimonScan, SparkTypeUtils} + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.variant.VariantGet +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation +import org.apache.spark.sql.types.VariantType + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Pushes down [[VariantGet]] extraction into Paimon's shredded Variant reading, enabling Parquet + * column pruning for shredded Variant columns. + * + * When a Variant column is stored in shredded format (typed_value sub-columns per field), and the + * query only accesses specific fields via variant_get / try_variant_get, this rule replaces the + * full Variant read with a struct-typed read that only scans the needed typed_value sub-columns. + * + * Transforms: + * {{{ + * Project [variant_get(v, "$.age", IntegerType)] + * └─ DataSourceV2ScanRelation [v: VariantType] + * }}} + * Into: + * {{{ + * Project [GetStructField(v, 0)] + * └─ DataSourceV2ScanRelation [v: StructType<0: IntegerType>] + * (PaimonScan with variantProjections = Map("v" -> VariantRowType{0:INT for $.age})) + * }}} + * + * Only applies when: + * - The underlying scan is a [[PaimonScan]] + * - All usages of the variant column in the projection are through [[VariantGet]] (no direct + * attribute reference) + * - The path argument of [[VariantGet]] is foldable (constant-evaluable at planning time) + */ +object PushDownVariantExtract extends Rule[LogicalPlan] { + + /** Deduplication key for a single VariantGet operation. */ + private case class GetKey( + path: String, + targetType: org.apache.spark.sql.types.DataType, + failOnError: Boolean, + timeZoneId: String) + + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { + case project @ Project(projectList, rel: DataSourceV2ScanRelation) + if rel.scan.isInstanceOf[PaimonScan] => + tryPushDown(project, projectList, rel).getOrElse(project) + } + + private def tryPushDown( + project: Project, + projectList: Seq[NamedExpression], + rel: DataSourceV2ScanRelation): Option[Project] = { + + val scan = rel.scan.asInstanceOf[PaimonScan] + + // Collect VariantType output attributes (by name) + val variantAttrNames: Set[String] = rel.output + .filter(_.dataType == VariantType) + .map(_.name) + .toSet + if (variantAttrNames.isEmpty) return None + + // Walk the project list to find VariantGet usages and direct references + val variantGetsByCol = mutable.Map[String, mutable.ListBuffer[VariantGet]]() + val colUsedDirectly = mutable.Set[String]() + + def collectUsages(expr: Expression): Unit = expr match { + case vg: VariantGet if vg.path.foldable => + vg.child match { + case ar: AttributeReference if variantAttrNames.contains(ar.name) => + variantGetsByCol.getOrElseUpdate(ar.name, mutable.ListBuffer.empty) += vg + case _ => + // Nested or non-column child: treat child refs as direct usages + vg.child.foreach(collectUsages) + } + case ar: AttributeReference if variantAttrNames.contains(ar.name) => + colUsedDirectly += ar.name + case other => + other.children.foreach(collectUsages) + } + + projectList.foreach(collectUsages) + + // Only push down columns that are exclusively used via VariantGet with literal paths + val pushableCols = variantGetsByCol.keys.filterNot(colUsedDirectly.contains).toSet + if (pushableCols.isEmpty) return None + + // For each pushable column, build a VariantRowType (deduplicated by GetKey) + val newVariantRowTypes = mutable.Map[String, org.apache.paimon.types.RowType]() + val getKeyToIndex = mutable.Map[(String, GetKey), Int]() + + pushableCols.foreach { + colName => + val builder = VariantMetadataUtils.VariantRowTypeBuilder.builder() + var nextIdx = 0 + + variantGetsByCol(colName).foreach { + vg => + val path = vg.path.eval().toString + val tz = vg.timeZoneId.getOrElse("UTC") + val key = GetKey(path, vg.targetType, vg.failOnError, tz) + val globalKey = (colName, key) + if (!getKeyToIndex.contains(globalKey)) { + getKeyToIndex(globalKey) = nextIdx + nextIdx += 1 + builder.field(SparkTypeUtils.toPaimonType(vg.targetType), path, vg.failOnError, tz) + } + } + newVariantRowTypes(colName) = builder.build() + } + + // Create new PaimonScan with variant projections + val newScan = scan.copy(variantProjections = newVariantRowTypes.toMap) + + // Update output attributes: variant columns become StructType (field names "0","1",...) + val newOutput = rel.output.map { + attr => + newVariantRowTypes.get(attr.name) match { + case Some(vrt) => attr.withDataType(SparkTypeUtils.fromPaimonRowType(vrt)) + case None => attr + } + } + val newRel = rel.copy(scan = newScan, output = newOutput) + val newAttrByName = newOutput.map(a => a.name -> a).toMap + + // Rewrite project list: replace VariantGet with GetStructField(newAttr, ordinal) + val newProjectList: Seq[NamedExpression] = projectList.map { + expr => + expr + .transform { + case vg: VariantGet if vg.path.foldable => + vg.child match { + case ar: AttributeReference if newVariantRowTypes.contains(ar.name) => + val path = vg.path.eval().toString + val tz = vg.timeZoneId.getOrElse("UTC") + val key = GetKey(path, vg.targetType, vg.failOnError, tz) + val idx = getKeyToIndex((ar.name, key)) + GetStructField(newAttrByName(ar.name), idx) + case _ => vg + } + } + .asInstanceOf[NamedExpression] + } + + Some(Project(newProjectList, newRel)) + } +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 24782ffdd2d0..6bb214c232a1 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.paimon.shims import org.apache.paimon.data.variant.{GenericVariant, Variant} import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules +import org.apache.paimon.spark.catalyst.optimizer.PushDownVariantExtract import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} @@ -132,4 +133,8 @@ class Spark4Shim extends SparkShim { dataType.isInstanceOf[VariantType] override def SparkVariantType(): org.apache.spark.sql.types.DataType = DataTypes.VariantType + + override def variantExtractRule() + : Option[org.apache.spark.sql.catalyst.rules.Rule[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]] = + Some(PushDownVariantExtract) } From 7556aa1f78ea32321854b442c4004e50ca828c0e Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Wed, 15 Apr 2026 19:50:36 +0800 Subject: [PATCH 2/6] [spark] Push down variant_get into Paimon shredded Variant scan --- .../paimon-spark-ut/PaimonLambdaFunctionfunction_test.java | 1 - 1 file changed, 1 deletion(-) delete mode 100644 paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java diff --git a/paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java b/paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java deleted file mode 100644 index 65f2ab9d9fa4..000000000000 --- a/paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java +++ /dev/null @@ -1 +0,0 @@ -public class PaimonLambdaFunctionfunction_test { public static java.lang.Long apply(Integer length, Integer width){ return (long) length * width; } } \ No newline at end of file From 735066c9ebb23e331a07df02b76ee09299e85149 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Wed, 15 Apr 2026 23:23:39 +0800 Subject: [PATCH 3/6] [spark] Push down variant_get into Paimon shredded Variant scan --- .../src/main/scala/org/apache/paimon/spark/PaimonScan.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala index 707ee1345953..1e7d4dd9c2bb 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala @@ -22,6 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, VectorSearch} import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable} import org.apache.paimon.table.source.{DataSplit, Split} +import org.apache.paimon.types.RowType import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.read.SupportsReportPartitioning @@ -37,7 +38,8 @@ case class PaimonScan( override val pushedTopN: Option[TopN], override val pushedVectorSearch: Option[VectorSearch], override val pushedFullTextSearch: Option[FullTextSearch] = None, - bucketedScanDisabled: Boolean = false) + bucketedScanDisabled: Boolean = false, + variantProjections: Map[String, RowType] = Map.empty) extends PaimonBaseScan(table) with SupportsReportPartitioning { From 2d19f7320de292b9448525b4111cd35ceacd589c Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Thu, 16 Apr 2026 00:45:17 +0800 Subject: [PATCH 4/6] [spark] Push down variant_get into Paimon shredded Variant scan --- .../spark/catalyst/optimizer/PushDownVariantExtract.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala index f762ed107af2..278cef215cd9 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala @@ -56,6 +56,10 @@ import scala.collection.mutable * - All usages of the variant column in the projection are through [[VariantGet]] (no direct * attribute reference) * - The path argument of [[VariantGet]] is foldable (constant-evaluable at planning time) + * + * Note: the rule does not check whether the column is physically shredded. If it is not, the + * format layer falls back to extracting fields from the binary blob, so correctness is preserved + * — just without the IO benefit. */ object PushDownVariantExtract extends Rule[LogicalPlan] { From fad2defc873c200176a51cd44b8ddc27fcf09031 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Thu, 16 Apr 2026 09:56:40 +0800 Subject: [PATCH 5/6] [spark] Push down variant_get into Paimon shredded Variant scan --- .../spark/catalyst/optimizer/PushDownVariantExtract.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala index 278cef215cd9..10c6b0280b57 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala @@ -57,9 +57,9 @@ import scala.collection.mutable * attribute reference) * - The path argument of [[VariantGet]] is foldable (constant-evaluable at planning time) * - * Note: the rule does not check whether the column is physically shredded. If it is not, the - * format layer falls back to extracting fields from the binary blob, so correctness is preserved - * — just without the IO benefit. + * Note: the rule does not check whether the column is physically shredded. If it is not, the format + * layer falls back to extracting fields from the binary blob, so correctness is preserved — just + * without the IO benefit. */ object PushDownVariantExtract extends Rule[LogicalPlan] { From 3c47c869149b281f22da047084008bd77bccfeda Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:04:01 +0800 Subject: [PATCH 6/6] [spark] Push down variant_get into Paimon shredded Variant scan --- .../spark/catalyst/optimizer/PushDownVariantExtract.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala index 10c6b0280b57..d2c5c80baf6a 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/PushDownVariantExtract.scala @@ -57,7 +57,7 @@ import scala.collection.mutable * attribute reference) * - The path argument of [[VariantGet]] is foldable (constant-evaluable at planning time) * - * Note: the rule does not check whether the column is physically shredded. If it is not, the format + * Note: the rule does not check whether the column is physically shredded. If it is not, the Paimon * layer falls back to extracting fields from the binary blob, so correctness is preserved — just * without the IO benefit. */