Skip to content
21 changes: 18 additions & 3 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,24 @@ case class CometScanRule(session: SparkSession)

private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = {

if (COMET_DPP_FALLBACK_ENABLED.get() &&
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
val hasDPP = scanExec.partitionFilters.exists(isDynamicPruningFilter)
val aqeEnabled = session.sessionState.conf.adaptiveExecutionEnabled
val scanImpl = COMET_NATIVE_SCAN_IMPL.get()

// native_datafusion + DPP requires AQE. Without AQE, DPP subqueries aren't prepared
// before the scan tries to use their results, causing "has not finished" errors.
// This is a hard requirement, not controlled by COMET_DPP_FALLBACK_ENABLED.
if (scanImpl == SCAN_NATIVE_DATAFUSION && hasDPP && !aqeEnabled) {
return withInfo(
scanExec,
"native_datafusion scan with DPP requires AQE to be enabled. " +
"DPP subqueries are not properly prepared in non-AQE mode.")
}

// For other scan types, respect COMET_DPP_FALLBACK_ENABLED config
val shouldFallbackForDPP = COMET_DPP_FALLBACK_ENABLED.get() && hasDPP
if (shouldFallbackForDPP) {
return withInfo(scanExec, "Dynamic Partition Pruning is not supported for this scan type")
}

scanExec.relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled")
}

// Native DataFusion doesn't support subqueries/dynamic pruning
if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning")
}
// DPP (Dynamic Partition Pruning) is now supported via lazy partition serialization
// in CometNativeScanExec. DPP subqueries are resolved at execution time before
// partition data is serialized, following the pattern from CometIcebergNativeScanExec.

if (SQLConf.get.ignoreCorruptFiles ||
scanExec.relation.options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import com.google.common.base.Objects
import org.apache.comet.CometConf
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.shims.ShimSubqueryBroadcast

/**
* Native scan operator for DataSource V1 Parquet files using DataFusion's ParquetExec.
Expand All @@ -54,6 +55,11 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
* projections) is serialized once at planning time, while per-partition file lists are lazily
* serialized at execution time. This reduces memory when scanning tables with many partitions, as
* each executor task receives only its partition's file list rather than all files.
*
* Supports Dynamic Partition Pruning (DPP) by deferring partition serialization to execution
* time. The doPrepare() method waits for DPP subqueries to resolve, then lazy
* serializedPartitionData serializes the DPP-filtered partitions from
* CometScanExec.getFilePartitions().
*/
case class CometNativeScanExec(
override val nativeOp: Operator,
Expand All @@ -72,7 +78,8 @@ case class CometNativeScanExec(
sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime
extends CometLeafExec
with DataSourceScanExec
with ShimStreamSourceAwareSparkPlan {
with ShimStreamSourceAwareSparkPlan
with ShimSubqueryBroadcast {

override lazy val metadata: Map[String, String] = originalPlan.metadata

Expand All @@ -93,31 +100,98 @@ case class CometNativeScanExec(
override lazy val outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering

/**
* Lazy partition serialization - deferred until execution time to reduce driver memory.
* Prepare DPP subquery plans. Called by Spark's prepare() before doExecuteColumnar().
*
* Split-mode serialization pattern:
* This follows Spark's convention of preparing subqueries in doPrepare() rather than
* doExecuteColumnar(). While the actual waiting for DPP results happens later in
* serializedPartitionData, calling prepare() here ensures subquery plans are set up before
* execution begins.
*/
override protected def doPrepare(): Unit = {
partitionFilters.foreach {
case DynamicPruningExpression(e: InSubqueryExec) =>
e.plan.prepare()
case _ =>
}
super.doPrepare()
}

/**
* Lazy partition serialization - deferred until execution time for DPP support.
*
* DPP (Dynamic Partition Pruning) Flow:
* {{{
* Planning time:
* - CometNativeScan.convert() serializes common data (schemas, filters, projections)
* - commonData embedded in nativeOp protobuf
* - File partitions NOT serialized yet
* - CometNativeScanExec created with partitionFilters containing DynamicPruningExpression
* - serializedPartitionData not evaluated (lazy)
* - No partition serialization yet
*
* Execution time:
* - doExecuteColumnar() accesses commonData and perPartitionData
* - Forces serializedPartitionData evaluation (here)
* - Each partition's file list serialized separately
* - CometExecRDD receives per-partition data and injects at runtime
* 1. Spark calls prepare() on the plan tree
* - doPrepare() calls e.plan.prepare() for each DPP filter
* - Subquery plans are set up (but not yet executed)
*
* 2. Spark calls doExecuteColumnar()
* - Accesses perPartitionData
* - Forces serializedPartitionData evaluation (here)
* - Waits for DPP values (updateResult or reflection)
* - Calls scan.getFilePartitions() with DPP-filtered partitions
* - Only matching partitions are serialized
* }}}
*
* This pattern reduces memory usage for tables with many partitions - instead of serializing
* all files for all partitions in the driver, we serialize only common metadata (once) and each
* partition's files (lazily, as tasks are scheduled).
*/
@transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = {
// Ensure DPP subqueries are resolved before accessing file partitions.
// This follows the pattern from CometIcebergNativeScanExec.
partitionFilters.foreach {
case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty =>
e.plan match {
case sab: SubqueryAdaptiveBroadcastExec =>
// SubqueryAdaptiveBroadcastExec.executeCollect() throws, so we call
// child.executeCollect() directly. We use the index from SAB to find the
// right buildKey, then locate that key's column in child.output.
val rows = sab.child.executeCollect()
val indices = getSubqueryBroadcastIndices(sab)

// SPARK-46946 changed index: Int to indices: Seq[Int] as a preparatory refactor
// for future features (Null Safe Equality DPP, multiple equality predicates).
// Currently indices always has one element.
assert(
indices.length == 1,
s"Multi-index DPP not supported: indices=$indices. See SPARK-46946.")
val buildKeyIndex = indices.head
val buildKey = sab.buildKeys(buildKeyIndex)

// Find column index in child.output by matching buildKey's exprId
val colIndex = buildKey match {
case attr: Attribute =>
sab.child.output.indexWhere(_.exprId == attr.exprId)
// DPP may cast partition column to match join key type
case Cast(attr: Attribute, _, _, _) =>
sab.child.output.indexWhere(_.exprId == attr.exprId)
case _ => buildKeyIndex
}
if (colIndex < 0) {
throw new IllegalStateException(
s"DPP build key '$buildKey' not found in ${sab.child.output.map(_.name)}")
}

setInSubqueryResult(e, rows.map(_.get(colIndex, e.child.dataType)))
case _ =>
e.updateResult()
}
case _ =>
}

// Extract common data from nativeOp
val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray

// Get file partitions from CometScanExec (handles bucketing, etc.)
// Get file partitions from CometScanExec (handles bucketing, DPP filtering, etc.)
// CometScanExec.getFilePartitions() uses dynamicallySelectedPartitions which
// evaluates DPP filters against partition values.
val filePartitions = scan.getFilePartitions()

// Serialize each partition's files
Expand All @@ -135,6 +209,29 @@ case class CometNativeScanExec(
(commonBytes, perPartitionBytes)
}

/**
* Sets InSubqueryExec's private result field via reflection.
*
* Reflection is required because:
* - SubqueryAdaptiveBroadcastExec.executeCollect() throws UnsupportedOperationException
* - InSubqueryExec has no public setter for result, only updateResult() which calls
* executeCollect()
* - We can't replace e.plan since it's a val
*/
private def setInSubqueryResult(e: InSubqueryExec, result: Array[_]): Unit = {
val fields = e.getClass.getDeclaredFields
// Field name is mangled by Scala compiler, e.g. "org$apache$...$InSubqueryExec$$result"
val resultField = fields
.find(f => f.getName.endsWith("$result") && !f.getName.contains("Broadcast"))
.getOrElse {
throw new IllegalStateException(
s"Cannot find 'result' field in ${e.getClass.getName}. " +
"Spark version may be incompatible with Comet's DPP implementation.")
}
resultField.setAccessible(true)
resultField.set(e, result)
}

def commonData: Array[Byte] = serializedPartitionData._1
def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2

Expand Down
Loading
Loading