diff --git a/lib/kernel/KernelArrowImport.ts b/lib/kernel/KernelArrowImport.ts new file mode 100644 index 00000000..00240feb --- /dev/null +++ b/lib/kernel/KernelArrowImport.ts @@ -0,0 +1,211 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Copy-into-cage Arrow import: reconstruct an `apache-arrow` `RecordBatch` + * from the buffer tree the kernel hands across the napi boundary + * (`databricks-sql-kernel/napi/src/zerocopy.rs`), WITHOUT re-decoding an + * Arrow IPC stream. + * + * The kernel ships each `RecordBatch` column as a node tree of in-cage + * (V8-allocated) `ArrayBuffer`s — one `memcpy` per Arrow buffer. We rebuild + * `arrow.Data` for each node via `makeData`, driven by the result schema's + * per-field `DataType` (decoded once, elsewhere). Because the reconstructed + * batch goes through the exact same `apache-arrow` `Data`/`Vector` machinery + * that `RecordBatchReader` produces on the IPC path, the downstream + * `ArrowResultConverter` is byte-for-byte unaffected — only the *route* the + * batch took to become a `RecordBatch` changed. + * + * Buffer→slot mapping mirrors Arrow's columnar layout: variable-binary + * (`Utf8`/`Binary`) carries `[offsets, values]`; list/map carry + * `[offsets]` + a child; struct/fixed-size-list carry only children; + * everything else (bool, ints, floats, decimal, date/time/timestamp, + * duration, fixed-size-binary) is a single value buffer. + * + * The historical name `importZeroCopyBatch` is retained because the kernel's + * copycage descriptor is byte-for-byte identical to the borrowed (zero-copy) + * descriptor — the importer is agnostic to whether the backing `ArrayBuffer`s + * were borrowed or cage-allocated. + */ + +import { makeData, Data, RecordBatch, Struct, Schema, DataType, TypeMap } from 'apache-arrow'; +import HiveDriverError from '../errors/HiveDriverError'; + +/** One `ArrayData` node as exported by `zerocopy.rs::build_node`. */ +export interface KernelArrayNode { + length: number; + offset: number; + nullCount: number; + /** Bit-packed validity buffer; absent when the node has no null buffer. */ + validity?: ArrayBuffer; + /** The type's data buffers, in canonical Arrow order. */ + buffers: ArrayBuffer[]; + /** Child nodes (struct fields, list/map values). */ + children: KernelArrayNode[]; +} + +/** A whole batch: one `KernelArrayNode` per top-level column. */ +export interface KernelZeroCopyBatch { + numRows: number; + columns: KernelArrayNode[]; +} + +/** + * Can this Arrow type be reconstructed by the buffer-handoff importer? + * + * This is an *allowlist*: only the layouts `buildData` knows how to wire are + * accepted, so any type the importer doesn't positively recognise (today + * Dictionary and Union — whose dictionary side-vector / union type-id + + * offset side-buffers the single `{ buffers, children }` tree cannot carry — + * and any future / 64-bit-offset "Large" variant the pinned apache-arrow + * version may add) defaults to *unsupported* and routes the whole result + * through IPC. The kernel does not emit Dictionary/Union/Large types for + * Databricks result sets today; the allowlist guarantees that if one ever + * slips through it degrades to a correct (IPC) decode rather than a silent + * mis-decode. Recurses into struct / list / map / fixed-size-list children. + */ +export function isZeroCopySupported(type: DataType): boolean { + // Composite types: supported iff every child is supported. + if ( + DataType.isStruct(type) || + DataType.isList(type) || + DataType.isMap(type) || + DataType.isFixedSizeList(type) + ) { + return type.children.every((f) => isZeroCopySupported(f.type)); + } + // Leaf types `buildData` handles via a single value buffer (or + // offsets+values for variable-binary). Anything not on this list — notably + // Dictionary, Union/DenseUnion/SparseUnion, and any 64-bit-offset Large + // variant — is treated as unsupported and forced onto the IPC path. + return ( + DataType.isNull(type) || + DataType.isBool(type) || + DataType.isInt(type) || + DataType.isFloat(type) || + DataType.isDecimal(type) || + DataType.isDate(type) || + DataType.isTime(type) || + DataType.isTimestamp(type) || + DataType.isInterval(type) || + DataType.isUtf8(type) || + DataType.isBinary(type) || + DataType.isFixedSizeBinary(type) + ); +} + +/** + * Does every field of `schema` reconstruct under the buffer-handoff path? + * The provider gates the whole result on this — a single unsupported column + * forces IPC for the entire fetch. + */ +export function isSchemaZeroCopySupported(schema: Schema): boolean { + return schema.fields.every((f) => isZeroCopySupported(f.type)); +} + +/** + * Build `arrow.Data` for one node against its `DataType`. + * + * `makeData` coerces each raw `ArrayBuffer` to the type's backing + * TypedArray (e.g. `Float64Array`, `Int32Array`) by reinterpreting the + * same bytes — no copy — so this stays type-agnostic for the common + * fixed-width case and only special-cases the structural layouts. + */ +function buildData(type: DataType, node: KernelArrayNode): Data { + // The kernel compacts every batch to offset 0 before the handoff + // (`zerocopy::compact_to_offset_zero`), because `makeData` always builds at + // offset 0 — a non-zero offset here would mean the kernel contract was + // violated, so assert defensively rather than silently read wrong rows. + if (node.offset !== 0) { + throw new HiveDriverError( + `kernel buffer-handoff import: unexpected non-zero array offset ${node.offset} — ` + + `the kernel must compact to offset 0 (zerocopy::compact_to_offset_zero) before handoff`, + ); + } + + const common = { + length: node.length, + nullCount: node.nullCount, + nullBitmap: node.validity ? new Uint8Array(node.validity) : undefined, + }; + + // Defence-in-depth: the schema-level `isSchemaZeroCopySupported` gate should + // already have routed any dictionary/union result through IPC, so reaching + // here means a contract violation — reject loudly rather than mis-decode. + if (DataType.isDictionary(type) || DataType.isUnion(type)) { + throw new HiveDriverError( + `kernel buffer-handoff import: unsupported Arrow type ${type} (dictionary/union) — ` + + `should have fallen back to IPC`, + ); + } + + if (DataType.isNull(type)) { + return makeData({ type, length: node.length }); + } + + if (DataType.isStruct(type)) { + const children = type.children.map((f, i) => buildData(f.type, node.children[i])); + return makeData({ ...common, type, children }); + } + + if (DataType.isList(type) || DataType.isMap(type)) { + // List: [offsets] + value child. Map: [offsets] + entries-struct child. + const child = buildData(type.children[0].type, node.children[0]); + return makeData({ + ...common, + type: type as any, + valueOffsets: new Int32Array(node.buffers[0]), + child, + }); + } + + if (DataType.isFixedSizeList(type)) { + const child = buildData(type.children[0].type, node.children[0]); + return makeData({ ...common, type, child }); + } + + if (DataType.isUtf8(type) || DataType.isBinary(type)) { + return makeData({ + ...common, + type: type as any, + valueOffsets: new Int32Array(node.buffers[0]), + data: new Uint8Array(node.buffers[1]), + }); + } + + // Single value-buffer fixed-width types: bool, int, float, decimal, + // date, time, timestamp, duration, interval, fixed-size-binary. + return makeData({ ...common, type: type as any, data: new Uint8Array(node.buffers[0]) }); +} + +/** + * Reconstruct a `RecordBatch` from the kernel's buffer tree, using `schema` + * (decoded once via `decodeIpcSchema`) for the per-column `DataType`s. + */ +export function importZeroCopyBatch(schema: Schema, batch: KernelZeroCopyBatch): RecordBatch { + if (batch.columns.length !== schema.fields.length) { + throw new HiveDriverError( + `kernel buffer-handoff import: column count ${batch.columns.length} ` + + `does not match schema field count ${schema.fields.length}`, + ); + } + const columns = schema.fields.map((field, i) => buildData(field.type, batch.columns[i])); + const structData = makeData({ + type: new Struct(schema.fields), + length: batch.numRows, + nullCount: 0, + children: columns, + }); + return new RecordBatch(schema, structData); +} diff --git a/lib/kernel/KernelOperationBackend.ts b/lib/kernel/KernelOperationBackend.ts index 01fd1a00..de700e33 100644 --- a/lib/kernel/KernelOperationBackend.ts +++ b/lib/kernel/KernelOperationBackend.ts @@ -37,6 +37,7 @@ */ import { v4 as uuidv4 } from 'uuid'; +import { Schema, TypeMap } from 'apache-arrow'; import { TTableSchema } from '../../thrift/TCLIService_types'; import IOperationBackend, { IOperationBackendWaitOptions } from '../contracts/IOperationBackend'; import { OperationStatus, OperationState } from '../contracts/OperationStatus'; @@ -48,8 +49,9 @@ import HiveDriverError from '../errors/HiveDriverError'; import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; import ArrowResultConverter from '../result/ArrowResultConverter'; import ResultSlicer from '../result/ResultSlicer'; -import KernelResultsProvider from './KernelResultsProvider'; +import KernelResultsProvider, { KernelBufferHandoffOptions, KernelFetchMode } from './KernelResultsProvider'; import { arrowSchemaToThriftSchema, decodeIpcSchema, patchIpcBytes } from './KernelArrowIpc'; +import { isSchemaZeroCopySupported } from './KernelArrowImport'; import { decodeNapiKernelError } from './KernelErrorMapping'; import { KernelStatement, @@ -83,7 +85,8 @@ export type KernelOperationStatement = KernelStatementHandle & Partial; +type KernelFetchHandle = Pick & + Partial>; /** * The rich operation-status surface the kernel exposes on a terminal sync @@ -115,6 +118,27 @@ interface KernelRichStatusFields { /** Poll cadence for the async `status()` loop — matches the Thrift backend's 100ms. */ const STATUS_POLL_INTERVAL_MS = 100; +/** + * Resolve the kernel fetch mode from the environment and, for the copycage + * mode, package it with the decoded Arrow `schema` the buffer-handoff import + * needs. + * + * `KERNEL_FETCH_MODE` ∈ {ipc, copycage} (default ipc). Returns `undefined` + * for the IPC path (the provider then re-encodes IPC). + * + * Copycage is gated on `isSchemaZeroCopySupported`: if the result schema + * carries any type the buffer-handoff importer cannot reconstruct + * (dictionary / union / 64-bit-offset Large variant), the whole result falls + * back to IPC so it still decodes correctly rather than risk a mis-decode. + */ +function resolveFetchMode(schema: Schema): KernelBufferHandoffOptions | undefined { + const mode = (process.env.KERNEL_FETCH_MODE ?? '').toLowerCase() as KernelFetchMode | ''; + if (mode === 'copycage' && isSchemaZeroCopySupported(schema)) { + return { mode, schema }; + } + return undefined; +} + function delay(ms: number): Promise { return new Promise((resolve) => { setTimeout(resolve, ms); @@ -208,6 +232,11 @@ export default class KernelOperationBackend implements IOperationBackend { private metadata?: ResultMetadata; + // The result's decoded Arrow schema, captured in `getResultMetadata`. The + // copycage buffer-handoff path needs it to drive `makeData` reconstruction + // (`KernelArrowImport`); undefined until metadata has been fetched. + private arrowSchema?: Schema; + private metadataPromise?: Promise; // Memoised fetch handle: on the async path it is `awaitResult()`'s result @@ -349,6 +378,10 @@ export default class KernelOperationBackend implements IOperationBackend { // Promise) — no `await` needed. const arrowSchemaIpc = handle.schema(); const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes); + // Cache the decoded Arrow schema for the copycage buffer-handoff path + // (`getResultSlicer` → `resolveFetchMode`), which rebuilds `RecordBatch`es + // from the kernel's in-cage buffers via `makeData`. + this.arrowSchema = arrowSchema; // `ResultMetadata.schema` keeps the Thrift `TTableSchema` shape for // back-compat with the public `IOperation.getSchema()` surface. const thriftSchema: TTableSchema = arrowSchemaToThriftSchema(arrowSchema); @@ -779,9 +812,17 @@ export default class KernelOperationBackend implements IOperationBackend { } const metadata = await this.getResultMetadata(); const handle = await this.getFetchHandle(); - // KernelResultsProvider consumes only `fetchNextBatch`; both the async result - // handle and the blocking statement satisfy that surface. - this.resultsProvider = new KernelResultsProvider(handle as unknown as KernelStatement); + // KernelResultsProvider consumes `fetchNextBatch` (IPC) plus an optional + // `fetchNextBatchCopycage`; both the async result handle and the blocking + // statement satisfy that surface. + // + // Copycage fetch (in-cage Arrow buffer copies, no IPC re-encode) is opt-in + // via `KERNEL_FETCH_MODE=copycage` (default ipc). It needs the decoded + // Arrow schema to rebuild `RecordBatch`es and is gated on the schema being + // reconstructible (`resolveFetchMode` → `isSchemaZeroCopySupported`); the + // provider further gates on the binding exposing `fetchNextBatchCopycage`. + const bufferHandoff = this.arrowSchema !== undefined ? resolveFetchMode(this.arrowSchema) : undefined; + this.resultsProvider = new KernelResultsProvider(handle as unknown as KernelStatement, bufferHandoff); // DECIMAL/BIGINT precision preservation is opt-in via the // `preserveBigNumericPrecision` connection option (default off). The kernel // always delivers native Arrow Decimal128 / Int64, so when enabled the diff --git a/lib/kernel/KernelResultsProvider.ts b/lib/kernel/KernelResultsProvider.ts index b84fdca4..6f7ef1d0 100644 --- a/lib/kernel/KernelResultsProvider.ts +++ b/lib/kernel/KernelResultsProvider.ts @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { Schema, TypeMap } from 'apache-arrow'; import IResultsProvider, { ResultsProviderFetchNextOptions } from '../result/IResultsProvider'; import { ArrowBatch } from '../result/utils'; import { countRowsInIpc, patchIpcBytes } from './KernelArrowIpc'; +import { importZeroCopyBatch, KernelZeroCopyBatch } from './KernelArrowImport'; /** * The minimal slice of the napi-binding `Statement` class that we @@ -24,6 +26,31 @@ import { countRowsInIpc, patchIpcBytes } from './KernelArrowIpc'; */ export interface KernelFetchHandle { fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>; + // Copy-into-cage fetch (Flavour A): returns the batch's Arrow buffers as + // V8-allocated (in-cage) copies of the Arrow bytes — cage-safe and + // finalizer-free, one memcpy per buffer. Optional so stubs / older bindings + // without the method still satisfy the interface (the provider then falls + // back to the IPC path). + fetchNextBatchCopycage?(): Promise; +} + +/** + * Which native fetch path the provider drives. `ipc` re-encodes each batch + * to Arrow IPC bytes (default, oldest path). `copycage` hands V8-allocated + * in-cage copies of the kernel's Arrow buffers (Flavour A) — cage-safe, one + * memcpy/buffer, no IPC serialize+decode. + */ +export type KernelFetchMode = 'ipc' | 'copycage'; + +/** + * Enables the copycage buffer-handoff fetch path. Carries the decoded Arrow + * `schema` needed to drive `makeData` reconstruction. Only takes effect when + * the binding also exposes `fetchNextBatchCopycage`; otherwise the provider + * falls back to the IPC path. + */ +export interface KernelBufferHandoffOptions { + mode: Exclude; + schema: Schema; } /** @@ -57,8 +84,20 @@ export default class KernelResultsProvider implements IResultsProvider { @@ -93,6 +132,24 @@ export default class KernelResultsProvider implements IResultsProvider 0) { + const recordBatch = importZeroCopyBatch(this.bufferHandoff.schema, desc); + this.prefetched = { batches: [], recordBatches: [recordBatch], rowCount: desc.numRows }; + } + // eslint-disable-next-line no-continue + continue; + } // eslint-disable-next-line no-await-in-loop const next = await this.statement.fetchNextBatch(); if (next === null) { diff --git a/lib/result/ArrowResultConverter.ts b/lib/result/ArrowResultConverter.ts index 739f691f..ce940c99 100644 --- a/lib/result/ArrowResultConverter.ts +++ b/lib/result/ArrowResultConverter.ts @@ -300,7 +300,14 @@ export default class ArrowResultConverter implements IResultsProvider } const arrowBatch = await this.source.fetchNext(options); // eslint-disable-line no-await-in-loop - if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) { + if (arrowBatch.recordBatches !== undefined && arrowBatch.rowCount > 0) { + // Kernel copycage path: batches are already decoded `RecordBatch` + // objects (rebuilt from the kernel's in-cage Arrow buffers in + // `KernelArrowImport`), so there are no IPC bytes to parse — + // iterate them directly. + this.recordBatchReader = arrowBatch.recordBatches[Symbol.iterator](); + this.remainingRows = arrowBatch.rowCount; + } else if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) { const reader = RecordBatchReader.from(arrowBatch.batches); this.recordBatchReader = reader[Symbol.iterator](); this.remainingRows = arrowBatch.rowCount; diff --git a/lib/result/utils.ts b/lib/result/utils.ts index 4bbdf418..08c7f07b 100644 --- a/lib/result/utils.ts +++ b/lib/result/utils.ts @@ -15,6 +15,8 @@ import { Binary as ArrowBinary, DateUnit, RecordBatchWriter, + RecordBatch, + TypeMap, } from 'apache-arrow'; import { TTableSchema, TColumnDesc, TPrimitiveTypeEntry, TTypeId, TColumn } from '../../thrift/TCLIService_types'; import HiveDriverError from '../errors/HiveDriverError'; @@ -22,6 +24,13 @@ import HiveDriverError from '../errors/HiveDriverError'; export interface ArrowBatch { batches: Array; rowCount: number; + // Pre-decoded RecordBatches, supplied by the kernel copycage path + // (`KernelResultsProvider` + `KernelArrowImport`) instead of `batches`. + // When present, `ArrowResultConverter` consumes these directly and + // skips `RecordBatchReader.from(batches)` — there are no IPC bytes to + // decode. The Thrift/CloudFetch paths leave this undefined and continue + // to supply `batches` (Arrow IPC buffers) as before. + recordBatches?: Array>; } export function getSchemaColumns(schema?: TTableSchema): Array { diff --git a/native/kernel/index.d.ts b/native/kernel/index.d.ts index 0b042121..d329f560 100644 --- a/native/kernel/index.d.ts +++ b/native/kernel/index.d.ts @@ -669,6 +669,21 @@ export declare class AsyncResultHandle { * `Statement.fetchNextBatch()` payload for the same query. */ fetchNextBatch(): Promise + /** + * Copy-into-cage variant of `fetchNextBatch` (Flavour A): hands the + * batch's Arrow buffers to JS as in-cage (V8-allocated) `ArrayBuffer`s + * with the bytes `memcpy`'d in, instead of serialising to Arrow IPC + * bytes. Mirrors `Statement::fetch_next_batch_copycage` exactly — this + * is the handle the *async query path* (`awaitResult()`) exposes, so + * the driver's normal `executeStatement` flow can use copycage rather + * than silently falling back to IPC. See `crate::zerocopy::CopyCageBatch`. + * + * Any offset-sliced array is compacted to offset 0 first + * (`zerocopy::compact_to_offset_zero`); the JS importer's `makeData` + * cannot apply a logical element offset. Returns `null` at end of + * stream, exactly like `fetchNextBatch`. + */ + fetchNextBatchCopycage(): Promise<{ numRows: number, columns: any[] } | null> /** * Result schema as an Arrow IPC payload (schema header only, * no record-batch message). Available before any batches have @@ -1007,6 +1022,30 @@ export declare class Statement { * succeed or fail consistently. */ fetchNextBatch(): Promise + /** + * Copy-into-cage variant of `fetchNextBatch` (Flavour A): hands the + * batch's Arrow buffers to JS as in-cage (V8-allocated) `ArrayBuffer`s + * with the bytes `memcpy`'d in, instead of serialising to Arrow IPC + * bytes (`fetchNextBatch`). See `crate::zerocopy::CopyCageBatch`. + * + * Cage-safe (works under the V8 sandbox / Electron) and GC-clean — V8 + * owns the bytes, so no Arc/finalizer is needed. Costs one plain copy + * per buffer but still avoids the IPC path's serialize + decode passes. + * + * Any offset-sliced array is compacted to offset 0 first + * (`zerocopy::compact_to_offset_zero`), because the JS importer's + * `makeData` cannot apply a logical element offset. + * + * Returns `null` at end of stream, exactly like `fetchNextBatch`. Same + * error/auto-close/finished semantics — only the result encoding + * differs, so the two are interchangeable per-fetch. + * + * `ts_return_type` pins a structural shape in the generated d.ts: + * `CopyCageBatch` is a manual `ToNapiValue` type (not a `#[napi(object)]` + * struct), so napi-rs has no interface to emit for it. The driver + * re-types the value precisely via `KernelArrowImport.ts`. + */ + fetchNextBatchCopycage(): Promise<{ numRows: number, columns: any[] } | null> /** * Result schema as an Arrow IPC payload (schema header only, no * record-batch message). Available before any batches have been diff --git a/tests/unit/kernel/KernelArrowImport.test.ts b/tests/unit/kernel/KernelArrowImport.test.ts new file mode 100644 index 00000000..f82711ed --- /dev/null +++ b/tests/unit/kernel/KernelArrowImport.test.ts @@ -0,0 +1,229 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { expect } from 'chai'; +import { + tableFromArrays, + vectorFromArray, + makeData, + Data, + Table, + RecordBatch, + RecordBatchReader, + RecordBatchStreamWriter, + Schema, + Field, + Struct, + List, + Int32, + Int64, + Float64, + Utf8, + Bool, + Dictionary, + TypeMap, + DataType, +} from 'apache-arrow'; +import { + importZeroCopyBatch, + isZeroCopySupported, + isSchemaZeroCopySupported, + KernelArrayNode, + KernelZeroCopyBatch, +} from '../../../lib/kernel/KernelArrowImport'; +import HiveDriverError from '../../../lib/errors/HiveDriverError'; + +// Hermetic coverage for the kernel copycage import layer. There is no +// warehouse here, so we synthesise the kernel's `zerocopy.rs::build_node` +// descriptor in pure JS by walking an apache-arrow `Data` tree's buffers — +// the exact buffer→slot mapping the native side emits — then feed it through +// `importZeroCopyBatch` and assert the result is byte-identical (via IPC +// round-trip) to the source. This pins the layout contract so any +// apache-arrow / arrow-rs columnar-layout drift fails loudly here rather than +// silently mis-decoding live results. + +/** + * Build a `KernelArrayNode` from an apache-arrow `Data`, mirroring the kernel's + * `build_node`: copy each Arrow buffer into a fresh `ArrayBuffer` (the in-cage + * memcpy the native side does), expose validity + the type's data buffers in + * canonical order, and recurse into children. Offset is always 0 (the kernel + * compacts before handoff). + */ +function nodeFromData(data: Data): KernelArrayNode { + const toAB = (ta: { buffer: ArrayBufferLike; byteOffset: number; byteLength: number } | undefined): ArrayBuffer => { + if (!ta) return new ArrayBuffer(0); + // Copy out the exact byte window (the kernel ships a fresh in-cage copy). + return ta.buffer.slice(ta.byteOffset, ta.byteOffset + ta.byteLength) as ArrayBuffer; + }; + + const validity = + data.nullCount > 0 && data.values && (data as any).nullBitmap && (data as any).nullBitmap.byteLength > 0 + ? toAB((data as any).nullBitmap) + : undefined; + + // Canonical Arrow buffer order per type. apache-arrow stores these as + // `valueOffsets` (offsets) + `values` (data) for variable-binary, and + // `values` for fixed-width. We mirror `ArrayData::buffers()` order. + const buffers: ArrayBuffer[] = []; + const type = data.type; + if (DataType.isUtf8(type) || DataType.isBinary(type)) { + buffers.push(toAB((data as any).valueOffsets)); + buffers.push(toAB((data as any).values)); + } else if (DataType.isList(type) || DataType.isMap(type)) { + buffers.push(toAB((data as any).valueOffsets)); + } else if (DataType.isStruct(type) || DataType.isFixedSizeList(type) || DataType.isNull(type)) { + // No data buffers (children carry everything; null has none). + } else { + buffers.push(toAB((data as any).values)); + } + + return { + length: data.length, + offset: 0, + nullCount: data.nullCount, + validity, + buffers, + children: data.children ? data.children.map((c) => nodeFromData(c)) : [], + }; +} + +/** Build the kernel batch descriptor from an apache-arrow RecordBatch. */ +function descriptorFromRecordBatch(rb: RecordBatch): KernelZeroCopyBatch { + return { + numRows: rb.numRows, + columns: rb.schema.fields.map((_, i) => nodeFromData(rb.data.children[i])), + }; +} + +/** Canonicalise a Table's rows to a comparable JSON string (bigint-safe). */ +function serializeTable(t: Table): string { + return JSON.stringify(t.toArray().map((r: any) => (r.toJSON ? r.toJSON() : r)), (_k, v) => + typeof v === 'bigint' ? `BI:${v}` : v, + ); +} + +/** IPC round-trip: write the batch to an Arrow IPC stream and decode it back. */ +function ipcRoundTrip(rb: RecordBatch): Table { + const writer = new RecordBatchStreamWriter(); + writer.write(rb); + writer.finish(); + const bytes = Buffer.from(writer.toUint8Array(true)); + const reader = RecordBatchReader.from([bytes]); + return new Table(reader); +} + +describe('KernelArrowImport — isZeroCopySupported predicate', () => { + it('accepts the fixed-width / variable-binary leaf families', () => { + expect(isZeroCopySupported(new Int32())).to.equal(true); + expect(isZeroCopySupported(new Int64())).to.equal(true); + expect(isZeroCopySupported(new Float64())).to.equal(true); + expect(isZeroCopySupported(new Utf8())).to.equal(true); + expect(isZeroCopySupported(new Bool())).to.equal(true); + }); + + it('rejects dictionary types (side-vector not in the buffer tree)', () => { + const dict = new Dictionary(new Utf8(), new Int32()); + expect(isZeroCopySupported(dict as unknown as DataType)).to.equal(false); + }); + + it('recurses: a struct/list of a supported type is supported', () => { + const okStruct = new Struct([new Field('a', new Int32()), new Field('b', new Utf8())]); + expect(isZeroCopySupported(okStruct)).to.equal(true); + const okList = new List(new Field('item', new Int32())); + expect(isZeroCopySupported(okList)).to.equal(true); + }); + + it('recurses: a struct/list containing a dictionary is unsupported', () => { + const dict = new Dictionary(new Utf8(), new Int32()); + const badStruct = new Struct([new Field('a', new Int32()), new Field('d', dict as unknown as DataType)]); + expect(isZeroCopySupported(badStruct)).to.equal(false); + const badList = new List(new Field('item', dict as unknown as DataType)); + expect(isZeroCopySupported(badList)).to.equal(false); + }); + + it('gates a whole schema: one unsupported column forces IPC', () => { + const dict = new Dictionary(new Utf8(), new Int32()); + const goodSchema = new Schema([new Field('a', new Int32()), new Field('s', new Utf8())]); + const badSchema = new Schema([new Field('a', new Int32()), new Field('d', dict as unknown as DataType)]); + expect(isSchemaZeroCopySupported(goodSchema)).to.equal(true); + expect(isSchemaZeroCopySupported(badSchema)).to.equal(false); + }); +}); + +describe('KernelArrowImport — importZeroCopyBatch layout-compat (pinned)', () => { + it('rebuilds a fixed-width + string batch byte-identical to IPC', () => { + // Build the string column as a PLAIN Utf8 vector (not dictionary): the + // kernel always ships plain Utf8, whereas `tableFromArrays` would + // auto-dictionary-encode a JS string array. + const iVec = vectorFromArray([1, 2, 3, 4], new Int32()); + const dVec = vectorFromArray([1.5, 2.5, Number.NaN, -0], new Float64()); + const sVec = vectorFromArray(['a', '', '日本語🚀', 'x'.repeat(100)], new Utf8()); + const schema = new Schema([ + new Field('i', new Int32(), true), + new Field('d', new Float64(), true), + new Field('s', new Utf8(), true), + ]); + const rb = new RecordBatch( + schema, + makeData({ type: new Struct(schema.fields), length: 4, children: [iVec.data[0], dVec.data[0], sVec.data[0]] }), + ); + const desc = descriptorFromRecordBatch(rb); + const rebuilt = new Table(importZeroCopyBatch(schema, desc)); + expect(serializeTable(rebuilt)).to.equal(serializeTable(ipcRoundTrip(rb))); + }); + + it('rebuilds a batch with nulls byte-identical to IPC', () => { + const v = vectorFromArray([10, null, 30, null, 50], new Int32()); + const schema = new Schema([new Field('n', new Int32(), true)]); + const rb = new RecordBatch(schema, makeData({ type: new Struct(schema.fields), length: 5, children: [v.data[0]] })); + const desc = descriptorFromRecordBatch(rb); + const rebuilt = new Table(importZeroCopyBatch(schema, desc)); + expect(serializeTable(rebuilt)).to.equal(serializeTable(ipcRoundTrip(rb))); + }); + + it('rebuilds nested struct> byte-identical to IPC', () => { + const structVec = vectorFromArray( + [ + { k: 1, v: [1, 2] }, + { k: 2, v: [3] }, + { k: 3, v: [] }, + ], + new Struct([new Field('k', new Int32()), new Field('v', new List(new Field('item', new Int32())))]), + ); + const schema = new Schema([new Field('st', structVec.type, true)]); + const rb = new RecordBatch( + schema, + makeData({ type: new Struct(schema.fields), length: 3, children: [structVec.data[0]] }), + ); + const desc = descriptorFromRecordBatch(rb); + const rebuilt = new Table(importZeroCopyBatch(schema, desc)); + expect(serializeTable(rebuilt)).to.equal(serializeTable(ipcRoundTrip(rb))); + }); + + it('rejects a non-zero offset descriptor (kernel must compact first)', () => { + const table = tableFromArrays({ i: Int32Array.from([1, 2, 3]) }); + const rb = table.batches[0]; + const desc = descriptorFromRecordBatch(rb); + desc.columns[0].offset = 1; // simulate a sliced array slipping through + expect(() => importZeroCopyBatch(rb.schema, desc)).to.throw(HiveDriverError, /non-zero array offset/); + }); + + it('rejects a column-count mismatch', () => { + const table = tableFromArrays({ i: Int32Array.from([1, 2, 3]) }); + const rb = table.batches[0]; + const desc = descriptorFromRecordBatch(rb); + desc.columns = []; // schema has 1 field, descriptor has 0 + expect(() => importZeroCopyBatch(rb.schema, desc)).to.throw(HiveDriverError, /column count/); + }); +});