diff --git a/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala new file mode 100644 index 0000000000..3dd1a2ae2f --- /dev/null +++ b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector + +import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider} +import org.apache.arrow.memory.BufferAllocator +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.comet.CometArrowAllocator + +/** + * Import-only C Data Interface bridge for Comet's shaded Arrow side. + * + * The caller (in the spark module) provides an export callback that fills pre-allocated + * ArrowArray/ArrowSchema structs at given memory addresses. This object allocates those structs + * using the shaded allocator and imports the resulting vectors as CometVectors. + * + * This design eliminates the need for reflection to cross the shading boundary: the spark module + * calls unshaded Arrow directly, and the common module calls shaded Arrow directly. The two sides + * communicate through Long memory addresses only. + */ +object CDataUtil { + + /** + * Imports a columnar batch from the C Data Interface using a child of the global + * [[CometArrowAllocator]]. This is the preferred entry point from the spark module since it + * avoids passing a shaded allocator type across the shading boundary. + */ + def importBatch( + numCols: Int, + numRows: Int, + exportFn: (Int, Long, Long) => Unit): ColumnarBatch = { + val allocator = + CometArrowAllocator.newChildAllocator("CDataUtil-import", 0, Long.MaxValue) + importBatch(numCols, numRows, allocator, exportFn) + } + + /** + * Imports a columnar batch from the C Data Interface. + * + * Allocates shaded ArrowArray/ArrowSchema structs for each column, invokes the provided export + * function to fill them (using unshaded Arrow on the caller side), then imports the vectors + * into CometVectors. + * + * @param numCols + * number of columns to import + * @param numRows + * row count for the resulting ColumnarBatch + * @param allocator + * shaded BufferAllocator for struct and vector allocation + * @param exportFn + * callback (colIndex, arrayAddr, schemaAddr) => Unit that exports the unshaded vector into + * the struct memory at the given addresses + * @return + * a ColumnarBatch with CometVector columns + */ + def importBatch( + numCols: Int, + numRows: Int, + allocator: BufferAllocator, + exportFn: (Int, Long, Long) => Unit): ColumnarBatch = { + val cometVectors = (0 until numCols).map { idx => + val arrowArray = ArrowArray.allocateNew(allocator) + val arrowSchema = ArrowSchema.allocateNew(allocator) + try { + exportFn(idx, arrowArray.memoryAddress(), arrowSchema.memoryAddress()) + val importer = new ArrowImporter(allocator) + val dictionaryProvider = new CDataDictionaryProvider() + val vector = importer.importVector(arrowArray, arrowSchema, dictionaryProvider) + CometVector.getVector(vector, true, dictionaryProvider) + } catch { + case e: Exception => + arrowArray.close() + arrowSchema.close() + throw e + } + } + new ColumnarBatch(cometVectors.toArray, numRows) + } +} diff --git a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala index 6d52078181..0b0731c8f4 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch} import org.apache.comet.CometArrowAllocator -import org.apache.comet.vector.NativeUtil +import org.apache.comet.vector.{CDataUtil, NativeUtil} object CometArrowConverters extends Logging { // This is similar how Spark converts internal row to Arrow format except that it is transforming @@ -145,7 +145,8 @@ object CometArrowConverters extends Logging { schema: StructType, maxRecordsPerBatch: Int, timeZoneId: String, - context: TaskContext) + context: TaskContext, + zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None) extends ArrowBatchIterBase(schema, timeZoneId, context) with AutoCloseable { @@ -159,6 +160,21 @@ object CometArrowConverters extends Logging { override protected def nextBatch(): ColumnarBatch = { val rowsInBatch = colBatch.numRows() if (rowsProduced < rowsInBatch) { + // On the first call, try zero-copy if an export function was provided + if (rowsProduced == 0 && zeroCopyExportFn.isDefined) { + try { + val zeroCopy = CDataUtil.importBatch( + colBatch.numCols(), + rowsInBatch, + allocator, + zeroCopyExportFn.get) + rowsProduced = rowsInBatch + return zeroCopy + } catch { + case e: Exception => + logWarning("Zero-copy C Data import failed, falling back to copy", e) + } + } // the arrow writer shall be reset before writing the next batch arrowWriter.reset() val rowsToProduce = @@ -190,7 +206,14 @@ object CometArrowConverters extends Logging { schema: StructType, maxRecordsPerBatch: Int, timeZoneId: String, - context: TaskContext): Iterator[ColumnarBatch] = { - new ColumnBatchToArrowBatchIter(colBatch, schema, maxRecordsPerBatch, timeZoneId, context) + context: TaskContext, + zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None): Iterator[ColumnarBatch] = { + new ColumnBatchToArrowBatchIter( + colBatch, + schema, + maxRecordsPerBatch, + timeZoneId, + context, + zeroCopyExportFn) } } diff --git a/pom.xml b/pom.xml index 4bd0c9dc23..ba8e3161ea 100644 --- a/pom.xml +++ b/pom.xml @@ -1102,6 +1102,7 @@ under the License. dev/release/requirements.txt native/proto/src/generated/** benchmarks/tpc/queries/** + pixi.lock @@ -1168,6 +1169,20 @@ under the License. com.google.thirdparty.publicsuffix.PublicSuffixType + + org.apache.arrow + arrow-c-data + + + org.apache.arrow.c.jni.JniWrapper + org.apache.arrow.c.jni.PrivateData + org.apache.arrow.c.jni.CDataJniException + org.apache.arrow.c.ArrayStreamExporter$ExportedArrayStreamPrivateData + + true true diff --git a/spark/pom.xml b/spark/pom.xml index 1b207288c9..4292546a83 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -129,8 +129,10 @@ under the License. to provide InMemoryKMS class that is shaded below, to make Spark test happy. --> + depends on arrow-vector. However, arrow-c-data is used by ArrowCDataExport for zero-copy + conversion (provided scope since it may not be available in all runtime environments). + arrow-memory-unsafe is still needed for tests (Maven shading in common happens in + 'package' phase which is after 'test'). --> org.apache.arrow arrow-memory-unsafe @@ -139,7 +141,7 @@ under the License. org.apache.arrow arrow-c-data - test + provided org.apache.hadoop diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala b/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala new file mode 100644 index 0000000000..1b78e97651 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + +/** + * Creates export functions for zero-copy transfer of unshaded Arrow vectors through the C Data + * Interface. + * + * This lives in the spark module where unshaded Arrow types are directly available, eliminating + * the need for reflection. The export function writes into pre-allocated C Data struct addresses + * provided by the common module's shaded side. + * + * At runtime, arrow-c-data may not be on the classpath (Spark does not bundle it). The + * [[cDataAvailable]] check ensures graceful degradation to the copy-based path. + */ +object ArrowCDataExport { + + /** Whether unshaded arrow-c-data classes are available on the runtime classpath. */ + private lazy val cDataAvailable: Boolean = { + try { + Class.forName("org.apache.arrow.c.Data") // scalastyle:ignore classforname + true + } catch { + case _: ClassNotFoundException => false + } + } + + /** + * Returns an export function if the batch is entirely backed by [[ArrowColumnVector]] and the + * arrow-c-data library is available at runtime. Returns [[None]] otherwise. + * + * The returned function has signature `(colIndex, arrayAddr, schemaAddr) => Unit`. When called, + * it exports the unshaded Arrow vector at the given column index into the C Data structs at the + * provided memory addresses. + */ + def makeExportFn(batch: ColumnarBatch): Option[(Int, Long, Long) => Unit] = { + if (!cDataAvailable) return None + if (batch.numCols() == 0) return None + + var i = 0 + while (i < batch.numCols()) { + if (!batch.column(i).isInstanceOf[ArrowColumnVector]) return None + i += 1 + } + + Some(CDataExporter.exportFn(batch)) + } + + /** + * Isolated object that references arrow-c-data classes. The JVM will not load this object (and + * therefore will not attempt to resolve [[org.apache.arrow.c.Data]] etc.) until it is first + * accessed, which only happens after [[cDataAvailable]] confirms the classes exist. + */ + private object CDataExporter { + def exportFn(batch: ColumnarBatch): (Int, Long, Long) => Unit = { + (colIdx: Int, arrayAddr: Long, schemaAddr: Long) => + { + val arrowCol = batch.column(colIdx).asInstanceOf[ArrowColumnVector] + val fv = + arrowCol.getValueVector.asInstanceOf[org.apache.arrow.vector.FieldVector] + org.apache.arrow.c.Data.exportVector( + fv.getAllocator, + fv, + null, + org.apache.arrow.c.ArrowArray.wrap(arrayAddr), + org.apache.arrow.c.ArrowSchema.wrap(schemaAddr)) + } + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index a8a61e7a71..0df05375cf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -104,15 +104,17 @@ case class CometSparkToColumnarExec(child: SparkPlan) child .executeColumnar() .mapPartitionsInternal { sparkBatches => + val context = TaskContext.get() val arrowBatches = sparkBatches.flatMap { sparkBatch => - val context = TaskContext.get() + val exportFn = ArrowCDataExport.makeExportFn(sparkBatch) CometArrowConverters.columnarBatchToArrowBatchIter( sparkBatch, schema, maxRecordsPerBatch, timeZoneId, - context) + context, + exportFn) } createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime) } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala new file mode 100644 index 0000000000..44bbd68ea7 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.{IntVector, VarCharVector} +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.comet.ArrowCDataExport +import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + +import org.apache.comet.vector.{CDataUtil, CometVector} + +class CometArrowConvertersSuite extends CometTestBase { + + test("zero-copy import via ArrowCDataExport and CDataUtil.importBatch") { + val srcAllocator = new RootAllocator(Long.MaxValue) + + val intVector = new IntVector("intCol", srcAllocator) + intVector.allocateNew(3) + intVector.set(0, 10) + intVector.set(1, 20) + intVector.setNull(2) + intVector.setValueCount(3) + + val varcharVector = new VarCharVector("strCol", srcAllocator) + varcharVector.allocateNew() + varcharVector.setSafe(0, "hello".getBytes) + varcharVector.setSafe(1, "world".getBytes) + varcharVector.setNull(2) + varcharVector.setValueCount(3) + + val arrowCol0 = new ArrowColumnVector(intVector) + val arrowCol1 = new ArrowColumnVector(varcharVector) + val inputBatch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) + + val exportFn = ArrowCDataExport.makeExportFn(inputBatch) + assert(exportFn.isDefined, "Should detect ArrowColumnVector and return Some") + + // Use the no-allocator overload which uses the shaded CometArrowAllocator internally, + // avoiding the need to pass a shaded allocator type across the shading boundary. + val outputBatch = CDataUtil.importBatch(2, 3, exportFn.get) + assert(outputBatch.numRows() == 3) + assert(outputBatch.numCols() == 2) + assert(outputBatch.column(0).isInstanceOf[CometVector]) + assert(outputBatch.column(1).isInstanceOf[CometVector]) + + assert(outputBatch.column(0).getInt(0) == 10) + assert(outputBatch.column(0).getInt(1) == 20) + assert(outputBatch.column(0).isNullAt(2)) + assert(outputBatch.column(1).getUTF8String(0).toString == "hello") + assert(outputBatch.column(1).getUTF8String(1).toString == "world") + assert(outputBatch.column(1).isNullAt(2)) + + // Close the imported batch first (triggers C Data release callback), + // then close the source batch. The child allocator under CometArrowAllocator + // is not explicitly closed here, matching the production lifecycle in + // ColumnBatchToArrowBatchIter. + outputBatch.close() + inputBatch.close() + } + + test("ArrowCDataExport returns None for non-Arrow batches") { + val sparkCol = new OnHeapColumnVector(10, IntegerType) + val batch = new ColumnarBatch(Array(sparkCol), 10) + + try { + val result = ArrowCDataExport.makeExportFn(batch) + assert(result.isEmpty, "Should return None for non-ArrowColumnVector batches") + } finally { + batch.close() + } + } + + test("columnarBatchToArrowBatchIter works for ArrowColumnVector input") { + val srcAllocator = new RootAllocator(Long.MaxValue) + try { + val intVector = new IntVector("intCol", srcAllocator) + intVector.allocateNew(3) + intVector.set(0, 10) + intVector.set(1, 20) + intVector.setNull(2) + intVector.setValueCount(3) + + val varcharVector = new VarCharVector("strCol", srcAllocator) + varcharVector.allocateNew() + varcharVector.setSafe(0, "hello".getBytes) + varcharVector.setSafe(1, "world".getBytes) + varcharVector.setNull(2) + varcharVector.setValueCount(3) + + val arrowCol0 = new ArrowColumnVector(intVector) + val arrowCol1 = new ArrowColumnVector(varcharVector) + val batch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) + val schema = + StructType(Seq(StructField("intCol", IntegerType), StructField("strCol", StringType))) + + val exportFn = ArrowCDataExport.makeExportFn(batch) + val iter = CometArrowConverters.columnarBatchToArrowBatchIter( + batch, + schema, + maxRecordsPerBatch = 0, + "UTC", + context = null, + exportFn) + + assert(iter.hasNext) + val outputBatch = iter.next() + assert(outputBatch.numRows() == 3) + assert(outputBatch.numCols() == 2) + + assert(outputBatch.column(0).getInt(0) == 10) + assert(outputBatch.column(0).getInt(1) == 20) + assert(outputBatch.column(0).isNullAt(2)) + assert(outputBatch.column(1).getUTF8String(0).toString == "hello") + assert(outputBatch.column(1).getUTF8String(1).toString == "world") + assert(outputBatch.column(1).isNullAt(2)) + + assert(!iter.hasNext) + batch.close() + } finally { + srcAllocator.close() + } + } + + test("columnarBatchToArrowBatchIter falls back for non-Arrow batches") { + val sparkCol = new OnHeapColumnVector(10, IntegerType) + for (i <- 0 until 10) { + sparkCol.putInt(i, i * 100) + } + val batch = new ColumnarBatch(Array(sparkCol), 10) + + val iterSchema = StructType(Seq(StructField("col", IntegerType))) + val iter = CometArrowConverters.columnarBatchToArrowBatchIter( + batch, + iterSchema, + maxRecordsPerBatch = 0, + "UTC", + context = null) + + assert(iter.hasNext) + val outputBatch = iter.next() + assert(outputBatch.numRows() == 10) + assert(outputBatch.column(0).getInt(0) == 0) + assert(outputBatch.column(0).getInt(9) == 900) + + assert(!iter.hasNext) + batch.close() + } +}