diff --git a/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala
new file mode 100644
index 0000000000..3dd1a2ae2f
--- /dev/null
+++ b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.vector
+
+import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider}
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.comet.CometArrowAllocator
+
+/**
+ * Import-only C Data Interface bridge for Comet's shaded Arrow side.
+ *
+ * The caller (in the spark module) provides an export callback that fills pre-allocated
+ * ArrowArray/ArrowSchema structs at given memory addresses. This object allocates those structs
+ * using the shaded allocator and imports the resulting vectors as CometVectors.
+ *
+ * This design eliminates the need for reflection to cross the shading boundary: the spark module
+ * calls unshaded Arrow directly, and the common module calls shaded Arrow directly. The two sides
+ * communicate through Long memory addresses only.
+ */
+object CDataUtil {
+
+ /**
+ * Imports a columnar batch from the C Data Interface using a child of the global
+ * [[CometArrowAllocator]]. This is the preferred entry point from the spark module since it
+ * avoids passing a shaded allocator type across the shading boundary.
+ */
+ def importBatch(
+ numCols: Int,
+ numRows: Int,
+ exportFn: (Int, Long, Long) => Unit): ColumnarBatch = {
+ val allocator =
+ CometArrowAllocator.newChildAllocator("CDataUtil-import", 0, Long.MaxValue)
+ importBatch(numCols, numRows, allocator, exportFn)
+ }
+
+ /**
+ * Imports a columnar batch from the C Data Interface.
+ *
+ * Allocates shaded ArrowArray/ArrowSchema structs for each column, invokes the provided export
+ * function to fill them (using unshaded Arrow on the caller side), then imports the vectors
+ * into CometVectors.
+ *
+ * @param numCols
+ * number of columns to import
+ * @param numRows
+ * row count for the resulting ColumnarBatch
+ * @param allocator
+ * shaded BufferAllocator for struct and vector allocation
+ * @param exportFn
+ * callback (colIndex, arrayAddr, schemaAddr) => Unit that exports the unshaded vector into
+ * the struct memory at the given addresses
+ * @return
+ * a ColumnarBatch with CometVector columns
+ */
+ def importBatch(
+ numCols: Int,
+ numRows: Int,
+ allocator: BufferAllocator,
+ exportFn: (Int, Long, Long) => Unit): ColumnarBatch = {
+ val cometVectors = (0 until numCols).map { idx =>
+ val arrowArray = ArrowArray.allocateNew(allocator)
+ val arrowSchema = ArrowSchema.allocateNew(allocator)
+ try {
+ exportFn(idx, arrowArray.memoryAddress(), arrowSchema.memoryAddress())
+ val importer = new ArrowImporter(allocator)
+ val dictionaryProvider = new CDataDictionaryProvider()
+ val vector = importer.importVector(arrowArray, arrowSchema, dictionaryProvider)
+ CometVector.getVector(vector, true, dictionaryProvider)
+ } catch {
+ case e: Exception =>
+ arrowArray.close()
+ arrowSchema.close()
+ throw e
+ }
+ }
+ new ColumnarBatch(cometVectors.toArray, numRows)
+ }
+}
diff --git a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala
index 6d52078181..0b0731c8f4 100644
--- a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala
+++ b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch}
import org.apache.comet.CometArrowAllocator
-import org.apache.comet.vector.NativeUtil
+import org.apache.comet.vector.{CDataUtil, NativeUtil}
object CometArrowConverters extends Logging {
// This is similar how Spark converts internal row to Arrow format except that it is transforming
@@ -145,7 +145,8 @@ object CometArrowConverters extends Logging {
schema: StructType,
maxRecordsPerBatch: Int,
timeZoneId: String,
- context: TaskContext)
+ context: TaskContext,
+ zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None)
extends ArrowBatchIterBase(schema, timeZoneId, context)
with AutoCloseable {
@@ -159,6 +160,21 @@ object CometArrowConverters extends Logging {
override protected def nextBatch(): ColumnarBatch = {
val rowsInBatch = colBatch.numRows()
if (rowsProduced < rowsInBatch) {
+ // On the first call, try zero-copy if an export function was provided
+ if (rowsProduced == 0 && zeroCopyExportFn.isDefined) {
+ try {
+ val zeroCopy = CDataUtil.importBatch(
+ colBatch.numCols(),
+ rowsInBatch,
+ allocator,
+ zeroCopyExportFn.get)
+ rowsProduced = rowsInBatch
+ return zeroCopy
+ } catch {
+ case e: Exception =>
+ logWarning("Zero-copy C Data import failed, falling back to copy", e)
+ }
+ }
// the arrow writer shall be reset before writing the next batch
arrowWriter.reset()
val rowsToProduce =
@@ -190,7 +206,14 @@ object CometArrowConverters extends Logging {
schema: StructType,
maxRecordsPerBatch: Int,
timeZoneId: String,
- context: TaskContext): Iterator[ColumnarBatch] = {
- new ColumnBatchToArrowBatchIter(colBatch, schema, maxRecordsPerBatch, timeZoneId, context)
+ context: TaskContext,
+ zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None): Iterator[ColumnarBatch] = {
+ new ColumnBatchToArrowBatchIter(
+ colBatch,
+ schema,
+ maxRecordsPerBatch,
+ timeZoneId,
+ context,
+ zeroCopyExportFn)
}
}
diff --git a/pom.xml b/pom.xml
index 4bd0c9dc23..ba8e3161ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1102,6 +1102,7 @@ under the License.
dev/release/requirements.txt
native/proto/src/generated/**
benchmarks/tpc/queries/**
+ pixi.lock
@@ -1168,6 +1169,20 @@ under the License.
com.google.thirdparty.publicsuffix.PublicSuffixType
+
+ org.apache.arrow
+ arrow-c-data
+
+
+ org.apache.arrow.c.jni.JniWrapper
+ org.apache.arrow.c.jni.PrivateData
+ org.apache.arrow.c.jni.CDataJniException
+ org.apache.arrow.c.ArrayStreamExporter$ExportedArrayStreamPrivateData
+
+
true
true
diff --git a/spark/pom.xml b/spark/pom.xml
index 1b207288c9..4292546a83 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -129,8 +129,10 @@ under the License.
to provide InMemoryKMS class that is shaded below, to make Spark test happy. -->
+ depends on arrow-vector. However, arrow-c-data is used by ArrowCDataExport for zero-copy
+ conversion (provided scope since it may not be available in all runtime environments).
+ arrow-memory-unsafe is still needed for tests (Maven shading in common happens in
+ 'package' phase which is after 'test'). -->
org.apache.arrow
arrow-memory-unsafe
@@ -139,7 +141,7 @@ under the License.
org.apache.arrow
arrow-c-data
- test
+ provided
org.apache.hadoop
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala b/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala
new file mode 100644
index 0000000000..1b78e97651
--- /dev/null
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet
+
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+/**
+ * Creates export functions for zero-copy transfer of unshaded Arrow vectors through the C Data
+ * Interface.
+ *
+ * This lives in the spark module where unshaded Arrow types are directly available, eliminating
+ * the need for reflection. The export function writes into pre-allocated C Data struct addresses
+ * provided by the common module's shaded side.
+ *
+ * At runtime, arrow-c-data may not be on the classpath (Spark does not bundle it). The
+ * [[cDataAvailable]] check ensures graceful degradation to the copy-based path.
+ */
+object ArrowCDataExport {
+
+ /** Whether unshaded arrow-c-data classes are available on the runtime classpath. */
+ private lazy val cDataAvailable: Boolean = {
+ try {
+ Class.forName("org.apache.arrow.c.Data") // scalastyle:ignore classforname
+ true
+ } catch {
+ case _: ClassNotFoundException => false
+ }
+ }
+
+ /**
+ * Returns an export function if the batch is entirely backed by [[ArrowColumnVector]] and the
+ * arrow-c-data library is available at runtime. Returns [[None]] otherwise.
+ *
+ * The returned function has signature `(colIndex, arrayAddr, schemaAddr) => Unit`. When called,
+ * it exports the unshaded Arrow vector at the given column index into the C Data structs at the
+ * provided memory addresses.
+ */
+ def makeExportFn(batch: ColumnarBatch): Option[(Int, Long, Long) => Unit] = {
+ if (!cDataAvailable) return None
+ if (batch.numCols() == 0) return None
+
+ var i = 0
+ while (i < batch.numCols()) {
+ if (!batch.column(i).isInstanceOf[ArrowColumnVector]) return None
+ i += 1
+ }
+
+ Some(CDataExporter.exportFn(batch))
+ }
+
+ /**
+ * Isolated object that references arrow-c-data classes. The JVM will not load this object (and
+ * therefore will not attempt to resolve [[org.apache.arrow.c.Data]] etc.) until it is first
+ * accessed, which only happens after [[cDataAvailable]] confirms the classes exist.
+ */
+ private object CDataExporter {
+ def exportFn(batch: ColumnarBatch): (Int, Long, Long) => Unit = {
+ (colIdx: Int, arrayAddr: Long, schemaAddr: Long) =>
+ {
+ val arrowCol = batch.column(colIdx).asInstanceOf[ArrowColumnVector]
+ val fv =
+ arrowCol.getValueVector.asInstanceOf[org.apache.arrow.vector.FieldVector]
+ org.apache.arrow.c.Data.exportVector(
+ fv.getAllocator,
+ fv,
+ null,
+ org.apache.arrow.c.ArrowArray.wrap(arrayAddr),
+ org.apache.arrow.c.ArrowSchema.wrap(schemaAddr))
+ }
+ }
+ }
+}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index a8a61e7a71..0df05375cf 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -104,15 +104,17 @@ case class CometSparkToColumnarExec(child: SparkPlan)
child
.executeColumnar()
.mapPartitionsInternal { sparkBatches =>
+ val context = TaskContext.get()
val arrowBatches =
sparkBatches.flatMap { sparkBatch =>
- val context = TaskContext.get()
+ val exportFn = ArrowCDataExport.makeExportFn(sparkBatch)
CometArrowConverters.columnarBatchToArrowBatchIter(
sparkBatch,
schema,
maxRecordsPerBatch,
timeZoneId,
- context)
+ context,
+ exportFn)
}
createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime)
}
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala
new file mode 100644
index 0000000000..44bbd68ea7
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.exec
+
+import org.apache.arrow.memory.RootAllocator
+import org.apache.arrow.vector.{IntVector, VarCharVector}
+import org.apache.spark.sql.CometTestBase
+import org.apache.spark.sql.comet.ArrowCDataExport
+import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+import org.apache.comet.vector.{CDataUtil, CometVector}
+
+class CometArrowConvertersSuite extends CometTestBase {
+
+ test("zero-copy import via ArrowCDataExport and CDataUtil.importBatch") {
+ val srcAllocator = new RootAllocator(Long.MaxValue)
+
+ val intVector = new IntVector("intCol", srcAllocator)
+ intVector.allocateNew(3)
+ intVector.set(0, 10)
+ intVector.set(1, 20)
+ intVector.setNull(2)
+ intVector.setValueCount(3)
+
+ val varcharVector = new VarCharVector("strCol", srcAllocator)
+ varcharVector.allocateNew()
+ varcharVector.setSafe(0, "hello".getBytes)
+ varcharVector.setSafe(1, "world".getBytes)
+ varcharVector.setNull(2)
+ varcharVector.setValueCount(3)
+
+ val arrowCol0 = new ArrowColumnVector(intVector)
+ val arrowCol1 = new ArrowColumnVector(varcharVector)
+ val inputBatch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3)
+
+ val exportFn = ArrowCDataExport.makeExportFn(inputBatch)
+ assert(exportFn.isDefined, "Should detect ArrowColumnVector and return Some")
+
+ // Use the no-allocator overload which uses the shaded CometArrowAllocator internally,
+ // avoiding the need to pass a shaded allocator type across the shading boundary.
+ val outputBatch = CDataUtil.importBatch(2, 3, exportFn.get)
+ assert(outputBatch.numRows() == 3)
+ assert(outputBatch.numCols() == 2)
+ assert(outputBatch.column(0).isInstanceOf[CometVector])
+ assert(outputBatch.column(1).isInstanceOf[CometVector])
+
+ assert(outputBatch.column(0).getInt(0) == 10)
+ assert(outputBatch.column(0).getInt(1) == 20)
+ assert(outputBatch.column(0).isNullAt(2))
+ assert(outputBatch.column(1).getUTF8String(0).toString == "hello")
+ assert(outputBatch.column(1).getUTF8String(1).toString == "world")
+ assert(outputBatch.column(1).isNullAt(2))
+
+ // Close the imported batch first (triggers C Data release callback),
+ // then close the source batch. The child allocator under CometArrowAllocator
+ // is not explicitly closed here, matching the production lifecycle in
+ // ColumnBatchToArrowBatchIter.
+ outputBatch.close()
+ inputBatch.close()
+ }
+
+ test("ArrowCDataExport returns None for non-Arrow batches") {
+ val sparkCol = new OnHeapColumnVector(10, IntegerType)
+ val batch = new ColumnarBatch(Array(sparkCol), 10)
+
+ try {
+ val result = ArrowCDataExport.makeExportFn(batch)
+ assert(result.isEmpty, "Should return None for non-ArrowColumnVector batches")
+ } finally {
+ batch.close()
+ }
+ }
+
+ test("columnarBatchToArrowBatchIter works for ArrowColumnVector input") {
+ val srcAllocator = new RootAllocator(Long.MaxValue)
+ try {
+ val intVector = new IntVector("intCol", srcAllocator)
+ intVector.allocateNew(3)
+ intVector.set(0, 10)
+ intVector.set(1, 20)
+ intVector.setNull(2)
+ intVector.setValueCount(3)
+
+ val varcharVector = new VarCharVector("strCol", srcAllocator)
+ varcharVector.allocateNew()
+ varcharVector.setSafe(0, "hello".getBytes)
+ varcharVector.setSafe(1, "world".getBytes)
+ varcharVector.setNull(2)
+ varcharVector.setValueCount(3)
+
+ val arrowCol0 = new ArrowColumnVector(intVector)
+ val arrowCol1 = new ArrowColumnVector(varcharVector)
+ val batch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3)
+ val schema =
+ StructType(Seq(StructField("intCol", IntegerType), StructField("strCol", StringType)))
+
+ val exportFn = ArrowCDataExport.makeExportFn(batch)
+ val iter = CometArrowConverters.columnarBatchToArrowBatchIter(
+ batch,
+ schema,
+ maxRecordsPerBatch = 0,
+ "UTC",
+ context = null,
+ exportFn)
+
+ assert(iter.hasNext)
+ val outputBatch = iter.next()
+ assert(outputBatch.numRows() == 3)
+ assert(outputBatch.numCols() == 2)
+
+ assert(outputBatch.column(0).getInt(0) == 10)
+ assert(outputBatch.column(0).getInt(1) == 20)
+ assert(outputBatch.column(0).isNullAt(2))
+ assert(outputBatch.column(1).getUTF8String(0).toString == "hello")
+ assert(outputBatch.column(1).getUTF8String(1).toString == "world")
+ assert(outputBatch.column(1).isNullAt(2))
+
+ assert(!iter.hasNext)
+ batch.close()
+ } finally {
+ srcAllocator.close()
+ }
+ }
+
+ test("columnarBatchToArrowBatchIter falls back for non-Arrow batches") {
+ val sparkCol = new OnHeapColumnVector(10, IntegerType)
+ for (i <- 0 until 10) {
+ sparkCol.putInt(i, i * 100)
+ }
+ val batch = new ColumnarBatch(Array(sparkCol), 10)
+
+ val iterSchema = StructType(Seq(StructField("col", IntegerType)))
+ val iter = CometArrowConverters.columnarBatchToArrowBatchIter(
+ batch,
+ iterSchema,
+ maxRecordsPerBatch = 0,
+ "UTC",
+ context = null)
+
+ assert(iter.hasNext)
+ val outputBatch = iter.next()
+ assert(outputBatch.numRows() == 10)
+ assert(outputBatch.column(0).getInt(0) == 0)
+ assert(outputBatch.column(0).getInt(9) == 900)
+
+ assert(!iter.hasNext)
+ batch.close()
+ }
+}