Skip to content

Commit 432c277

Browse files
andygroveclaude
andcommitted
fix: fall back scan when plan uses input_file_name expressions
CometScanExec does not populate InputFileBlockHolder (the thread-local that Spark's FileScanRDD sets), so input_file_name(), input_file_block_start(), and input_file_block_length() return empty or default values when Comet replaces the scan. Detect these expressions in the plan and fall back to Spark's FileSourceScanExec. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 188cd86 commit 432c277

1 file changed

Lines changed: 18 additions & 1 deletion

File tree

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._
2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql.SparkSession
31-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression}
31+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression}
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper}
3434
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
@@ -90,6 +90,17 @@ case class CometScanRule(session: SparkSession)
9090
})
9191
}
9292

93+
// Check if the plan uses expressions that read from InputFileBlockHolder
94+
// (input_file_name, input_file_block_start, input_file_block_length).
95+
// These rely on Spark's FileScanRDD to set a thread-local, which Comet scans don't do.
96+
lazy val usesInputFileBlock = plan.exists(node =>
97+
node.expressions.exists(_.exists {
98+
case _: InputFileName => true
99+
case _: InputFileBlockStart => true
100+
case _: InputFileBlockLength => true
101+
case _ => false
102+
}))
103+
93104
def isIcebergMetadataTable(scanExec: BatchScanExec): Boolean = {
94105
// List of Iceberg metadata tables:
95106
// https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables
@@ -117,6 +128,12 @@ case class CometScanRule(session: SparkSession)
117128
case scan if hasMetadataCol(scan) =>
118129
withInfo(scan, "Metadata column is not supported")
119130

131+
case scan if usesInputFileBlock =>
132+
withInfo(
133+
scan,
134+
"Comet scans are not compatible with input_file_name, " +
135+
"input_file_block_start, or input_file_block_length")
136+
120137
// data source V1
121138
case scanExec: FileSourceScanExec =>
122139
transformV1Scan(scanExec)

0 commit comments

Comments
 (0)