From 2d3d46e7d741eaf592cbc802fa02c61391550cd3 Mon Sep 17 00:00:00 2001 From: Andrew Stein Date: Mon, 23 Feb 2026 21:27:54 -0500 Subject: [PATCH 1/2] Implement alt formats for Virtual Server Signed-off-by: Andrew Stein # Conflicts: # rust/perspective-client/src/rust/virtual_server/data.rs # Conflicts: # rust/perspective-client/src/rust/virtual_server/server.rs --- Cargo.lock | 386 +++++++++++++ examples/esbuild-duckdb-virtual/src/index.ts | 2 + rust/perspective-client/Cargo.toml | 4 + .../src/rust/virtual_server/data.rs | 528 ++++++++++++++---- .../src/rust/virtual_server/mod.rs | 2 +- .../src/rust/virtual_server/server.rs | 74 ++- 6 files changed, 866 insertions(+), 130 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e672845f60..04c387e108 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,20 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "const-random", + "getrandom 0.3.4", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -76,6 +90,196 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "arrow" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4754a624e5ae42081f464514be454b39711daae0458906dacde5f4c632f33a8" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7b3141e0ec5145a22d8694ea8b6d6f69305971c4fa1c1a13ef0195aef2d678b" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num-traits", +] + +[[package]] +name = "arrow-array" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8955af33b25f3b175ee10af580577280b4bd01f7e823d94c7cdef7cf8c9aef" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.16.1", + "num-complex", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-buffer" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c697ddca96183182f35b3a18e50b9110b11e916d7b7799cbfd4d34662f2c56c2" +dependencies = [ + "bytes", + "half", + "num-bigint", + "num-traits", +] + +[[package]] +name = "arrow-cast" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "646bbb821e86fd57189c10b4fcdaa941deaf4181924917b0daa92735baa6ada5" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ord", + "arrow-schema", + "arrow-select", + "atoi", + "base64", + "chrono", + "half", + "lexical-core", + "num-traits", + "ryu", +] + +[[package]] +name = "arrow-csv" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da746f4180004e3ce7b83c977daf6394d768332349d3d913998b10a120b790a" +dependencies = [ + "arrow-array", + "arrow-cast", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fdd994a9d28e6365aa78e15da3f3950c0fdcea6b963a12fa1c391afb637b304" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-ipc" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abf7df950701ab528bf7c0cf7eeadc0445d03ef5d6ffc151eaae6b38a58feff1" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers", +] + +[[package]] +name = "arrow-ord" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d8f1870e03d4cbed632959498bcc84083b5a24bded52905ae1695bd29da45b" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18228633bad92bff92a95746bbeb16e5fc318e8382b75619dec26db79e4de4c0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c872d36b7bf2a6a6a2b40de9156265f0242910791db366a2c17476ba8330d68" + +[[package]] +name = "arrow-select" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68bf3e3efbd1278f770d67e5dc410257300b161b93baedb3aae836144edcaf4b" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num-traits", +] + +[[package]] +name = "arrow-string" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85e968097061b3c0e9fe3079cf2e703e487890700546b5b0647f60fca1b5a8d8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num-traits", + "regex", + "regex-syntax", +] + [[package]] name = "async-lock" version = "2.8.0" @@ -96,6 +300,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -395,6 +608,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -435,6 +668,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.7" @@ -445,6 +684,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "cxx" version = "1.0.190" @@ -668,6 +928,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "25.12.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35f6839d7b3b98adde531effaf34f0c2badc6f4735d26fe74709d8e513a96ef3" +dependencies = [ + "bitflags", + "rustc_version", +] + [[package]] name = "float-cmp" version = "0.9.0" @@ -1054,6 +1324,18 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", + "zerocopy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1387,6 +1669,63 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util", +] + [[package]] name = "libc" version = "0.2.178" @@ -1608,6 +1947,34 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1724,6 +2091,7 @@ dependencies = [ name = "perspective-client" version = "4.3.0" dependencies = [ + "arrow", "async-lock", "futures", "getrandom 0.3.4", @@ -2348,6 +2716,15 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver 1.0.27", +] + [[package]] name = "rustix" version = "1.1.2" @@ -2808,6 +3185,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tokio" version = "1.48.0" diff --git a/examples/esbuild-duckdb-virtual/src/index.ts b/examples/esbuild-duckdb-virtual/src/index.ts index baa7646209..7db1aefa56 100644 --- a/examples/esbuild-duckdb-virtual/src/index.ts +++ b/examples/esbuild-duckdb-virtual/src/index.ts @@ -31,7 +31,9 @@ import * as duckdb from "@duckdb/duckdb-wasm"; import SUPERSTORE_ARROW from "superstore-arrow/superstore.lz4.arrow"; await Promise.all([ + // @ts-ignore perspective.init_server(fetch(SERVER_WASM)), + // @ts-ignore perspective_viewer.init_client(fetch(CLIENT_WASM)), ]); diff --git a/rust/perspective-client/Cargo.toml b/rust/perspective-client/Cargo.toml index a9e1bb9f09..f469146d71 100644 --- a/rust/perspective-client/Cargo.toml +++ b/rust/perspective-client/Cargo.toml @@ -65,6 +65,10 @@ prost-build = { version = "0.12.3" } protobuf-src = { version = "2.1.1", optional = true } [dependencies] +arrow = { version = "57.3.0", default-features = false, features = [ + "csv", + "ipc", +] } async-lock = { version = "2.5.0" } futures = { version = "0.3.28" } indexmap = { version = "2.2.6", features = ["serde"] } diff --git a/rust/perspective-client/src/rust/virtual_server/data.rs b/rust/perspective-client/src/rust/virtual_server/data.rs index 770ef88f60..2cbd76aef3 100644 --- a/rust/perspective-client/src/rust/virtual_server/data.rs +++ b/rust/perspective-client/src/rust/virtual_server/data.rs @@ -11,27 +11,29 @@ // ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ use std::error::Error; -use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use arrow::array::{ + Array, ArrayRef, BooleanArray, BooleanBuilder, Float64Array, Float64Builder, Int32Array, + Int32Builder, RecordBatch, StringArray, StringBuilder, TimestampMillisecondArray, + TimestampMillisecondBuilder, +}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::StreamWriter; use indexmap::IndexMap; use serde::Serialize; use crate::config::{Scalar, ViewConfig}; -/// A column of data returned from a virtual server query. -/// -/// Each variant represents a different column type, containing a vector -/// of optional values. `None` values represent null/missing data. -#[derive(Debug, Serialize)] -#[serde(untagged)] -pub enum VirtualDataColumn { - Boolean(Vec>), - String(Vec>), - Float(Vec>), - Integer(Vec>), - Datetime(Vec>), - IntegerIndex(Vec>>), - RowPath(Vec>), +/// An Arrow column builder, used during the population phase of +/// [`VirtualDataSlice`]. +pub enum ColumnBuilder { + Boolean(BooleanBuilder), + String(StringBuilder), + Float(Float64Builder), + Integer(Int32Builder), + Datetime(TimestampMillisecondBuilder), } /// A single cell value in a row-oriented data representation. @@ -46,147 +48,432 @@ pub enum VirtualDataCell { Float(Option), Integer(Option), Datetime(Option), - IntegerIndex(Option>), RowPath(Vec), } -impl VirtualDataColumn { - /// Returns `true` if the column contains no elements. - pub fn is_empty(&self) -> bool { - match self { - VirtualDataColumn::Boolean(v) => v.is_empty(), - VirtualDataColumn::String(v) => v.is_empty(), - VirtualDataColumn::Float(v) => v.is_empty(), - VirtualDataColumn::Integer(v) => v.is_empty(), - VirtualDataColumn::Datetime(v) => v.is_empty(), - VirtualDataColumn::IntegerIndex(v) => v.is_empty(), - VirtualDataColumn::RowPath(v) => v.is_empty(), - } - } - - /// Returns the number of elements in the column. - pub fn len(&self) -> usize { - match self { - VirtualDataColumn::Boolean(v) => v.len(), - VirtualDataColumn::String(v) => v.len(), - VirtualDataColumn::Float(v) => v.len(), - VirtualDataColumn::Integer(v) => v.len(), - VirtualDataColumn::Datetime(v) => v.len(), - VirtualDataColumn::IntegerIndex(v) => v.len(), - VirtualDataColumn::RowPath(v) => v.len(), - } - } -} - -/// Trait for types that can be written to a [`VirtualDataColumn`] which +/// Trait for types that can be written to a [`ColumnBuilder`] which /// enforces sequential construction. /// /// This trait enables type-safe insertion of values into virtual data columns, /// ensuring that values are written to columns of the correct type. pub trait SetVirtualDataColumn { - /// Writes this value (sequentially) to the given column. + /// Writes this value (sequentially) to the given column builder. /// /// Returns an error if the column type does not match the value type. - fn write_to(self, col: &mut VirtualDataColumn) -> Result<(), &'static str>; + fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str>; - /// Creates a new empty column of the appropriate type for this value. - fn new_column() -> VirtualDataColumn; + /// Creates a new empty column builder of the appropriate type for this + /// value. + fn new_builder() -> ColumnBuilder; /// Converts this value to a [`Scalar`] representation. fn to_scalar(self) -> Scalar; } -macro_rules! template_psp { - ($t:ty, $u:ident, $v:ident, $w:ty) => { - impl SetVirtualDataColumn for Option<$t> { - fn write_to(self, col: &mut VirtualDataColumn) -> Result<(), &'static str> { - if let VirtualDataColumn::$u(x) = col { - x.push(self); - Ok(()) - } else { - Err("Bad type") - } +impl SetVirtualDataColumn for Option { + fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> { + if let ColumnBuilder::String(builder) = col { + match self { + Some(s) => builder.append_value(&s), + None => builder.append_null(), } + Ok(()) + } else { + Err("Bad type") + } + } + + fn new_builder() -> ColumnBuilder { + ColumnBuilder::String(StringBuilder::new()) + } + + fn to_scalar(self) -> Scalar { + if let Some(x) = self { + Scalar::String(x) + } else { + Scalar::Null + } + } +} - fn new_column() -> VirtualDataColumn { - VirtualDataColumn::$u(vec![]) +impl SetVirtualDataColumn for Option { + fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> { + if let ColumnBuilder::Float(builder) = col { + match self { + Some(v) => builder.append_value(v), + None => builder.append_null(), } + Ok(()) + } else { + Err("Bad type") + } + } - fn to_scalar(self) -> Scalar { - if let Some(x) = self { - Scalar::$v(x as $w) - } else { - Scalar::Null - } + fn new_builder() -> ColumnBuilder { + ColumnBuilder::Float(Float64Builder::new()) + } + + fn to_scalar(self) -> Scalar { + if let Some(x) = self { + Scalar::Float(x) + } else { + Scalar::Null + } + } +} + +impl SetVirtualDataColumn for Option { + fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> { + if let ColumnBuilder::Integer(builder) = col { + match self { + Some(v) => builder.append_value(v), + None => builder.append_null(), } + Ok(()) + } else { + Err("Bad type") + } + } + + fn new_builder() -> ColumnBuilder { + ColumnBuilder::Integer(Int32Builder::new()) + } + + fn to_scalar(self) -> Scalar { + if let Some(x) = self { + Scalar::Float(x as f64) + } else { + Scalar::Null } - }; + } } -template_psp!(String, String, String, String); -template_psp!(f64, Float, Float, f64); -template_psp!(i32, Integer, Float, f64); -template_psp!(i64, Datetime, Float, f64); -template_psp!(bool, Boolean, Bool, bool); +impl SetVirtualDataColumn for Option { + fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> { + if let ColumnBuilder::Datetime(builder) = col { + match self { + Some(v) => builder.append_value(v), + None => builder.append_null(), + } + Ok(()) + } else { + Err("Bad type") + } + } -/// A columnar data slice returned from a virtual server view query. -/// -/// This struct represents a rectangular slice of data from a view. It can be -/// serialized to JSON in either column-oriented or row-oriented format. -#[derive(Debug, Serialize)] -#[serde(transparent)] -pub struct VirtualDataSlice( - #[serde(skip)] ViewConfig, - IndexMap, -); + fn new_builder() -> ColumnBuilder { + ColumnBuilder::Datetime(TimestampMillisecondBuilder::new()) + } -impl Deref for VirtualDataSlice { - type Target = IndexMap; + fn to_scalar(self) -> Scalar { + if let Some(x) = self { + Scalar::Float(x as f64) + } else { + Scalar::Null + } + } +} + +impl SetVirtualDataColumn for Option { + fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> { + if let ColumnBuilder::Boolean(builder) = col { + match self { + Some(v) => builder.append_value(v), + None => builder.append_null(), + } + Ok(()) + } else { + Err("Bad type") + } + } - fn deref(&self) -> &Self::Target { - &self.1 + fn new_builder() -> ColumnBuilder { + ColumnBuilder::Boolean(BooleanBuilder::new()) + } + + fn to_scalar(self) -> Scalar { + if let Some(x) = self { + Scalar::Bool(x) + } else { + Scalar::Null + } } } -impl DerefMut for VirtualDataSlice { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.1 +/// A columnar data slice returned from a virtual server view query. +/// +/// This struct represents a rectangular slice of data from a view, stored +/// internally as Arrow builders during population and frozen into a +/// `RecordBatch` on first consumption. +#[derive(Debug)] +pub struct VirtualDataSlice { + config: ViewConfig, + builders: IndexMap, + row_path: Option>>, + frozen: Option, +} + +impl std::fmt::Debug for ColumnBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ColumnBuilder::Boolean(_) => write!(f, "ColumnBuilder::Boolean(..)"), + ColumnBuilder::String(_) => write!(f, "ColumnBuilder::String(..)"), + ColumnBuilder::Float(_) => write!(f, "ColumnBuilder::Float(..)"), + ColumnBuilder::Integer(_) => write!(f, "ColumnBuilder::Integer(..)"), + ColumnBuilder::Datetime(_) => write!(f, "ColumnBuilder::Datetime(..)"), + } } } impl VirtualDataSlice { pub fn new(config: ViewConfig) -> Self { - VirtualDataSlice(config, IndexMap::default()) + VirtualDataSlice { + config, + builders: IndexMap::default(), + row_path: None, + frozen: None, + } + } + + /// Loads data from Arrow IPC streaming format bytes, replacing any + /// existing builder state. + /// + /// This is an alternative to populating the slice cell-by-cell via + /// [`set_col`]. The IPC stream should contain a single `RecordBatch` + /// (only the first batch is used if multiple are present). + pub fn from_arrow_ipc(&mut self, ipc: &[u8]) -> Result<(), Box> { + let cursor = std::io::Cursor::new(ipc); + let mut reader = StreamReader::try_new(cursor, None)?; + let batch = reader + .next() + .ok_or("Arrow IPC stream contained no record batches")??; + + self.frozen = Some(batch); + Ok(()) + } + + /// Freezes the builders into a `RecordBatch`. Idempotent — subsequent + /// calls return the cached batch. + pub(crate) fn freeze(&mut self) -> &RecordBatch { + if self.frozen.is_none() { + let mut fields = Vec::new(); + let mut arrays: Vec = Vec::new(); + + for (name, builder) in &mut self.builders { + let (field, array): (Field, ArrayRef) = match builder { + ColumnBuilder::Boolean(b) => ( + Field::new(name, DataType::Boolean, true), + Arc::new(b.finish()), + ), + ColumnBuilder::String(b) => { + (Field::new(name, DataType::Utf8, true), Arc::new(b.finish())) + }, + ColumnBuilder::Float(b) => ( + Field::new(name, DataType::Float64, true), + Arc::new(b.finish()), + ), + ColumnBuilder::Integer(b) => ( + Field::new(name, DataType::Int32, true), + Arc::new(b.finish()), + ), + ColumnBuilder::Datetime(b) => ( + Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true), + Arc::new(b.finish()), + ), + }; + fields.push(field); + arrays.push(array); + } + + let schema = Arc::new(Schema::new(fields)); + self.frozen = Some( + RecordBatch::try_new(schema, arrays) + .expect("RecordBatch construction should not fail for well-formed builders"), + ); + } + self.frozen.as_ref().unwrap() + } + + /// Serializes the data to Arrow IPC streaming format. + pub(crate) fn to_arrow_ipc(&mut self) -> Result, Box> { + let batch = self.freeze().clone(); + let schema = batch.schema(); + let mut buf = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buf, &schema)?; + writer.write(&batch)?; + writer.finish()?; + } + Ok(buf) } - pub(super) fn to_rows(&self) -> Vec> { - let num_rows = self.values().next().map(|x| x.len()).unwrap_or(0); + /// Converts the columnar data to a row-oriented representation for JSON + /// serialization. + pub(crate) fn to_rows(&mut self) -> Vec> { + let batch = self.freeze().clone(); + let num_rows = batch.num_rows(); + let schema = batch.schema(); + (0..num_rows) .map(|row_idx| { - self.iter() - .map(|(col_name, col_data)| { - let row_value = match col_data { - VirtualDataColumn::Boolean(v) => VirtualDataCell::Boolean(v[row_idx]), - VirtualDataColumn::String(v) => { - VirtualDataCell::String(v[row_idx].clone()) + let mut row = IndexMap::new(); + + // Add RowPath column first if present + if let Some(ref rp) = self.row_path + && row_idx < rp.len() + { + row.insert( + "__ROW_PATH__".to_string(), + VirtualDataCell::RowPath(rp[row_idx].clone()), + ); + } + + // Add Arrow columns + for (col_idx, field) in schema.fields().iter().enumerate() { + let col = batch.column(col_idx); + let cell = if col.is_null(row_idx) { + match field.data_type() { + DataType::Boolean => VirtualDataCell::Boolean(None), + DataType::Utf8 => VirtualDataCell::String(None), + DataType::Float64 => VirtualDataCell::Float(None), + DataType::Int32 => VirtualDataCell::Integer(None), + DataType::Timestamp(TimeUnit::Millisecond, _) => { + VirtualDataCell::Datetime(None) + }, + _ => continue, + } + } else { + match field.data_type() { + DataType::Boolean => { + let arr = col.as_any().downcast_ref::().unwrap(); + VirtualDataCell::Boolean(Some(arr.value(row_idx))) + }, + DataType::Utf8 => { + let arr = col.as_any().downcast_ref::().unwrap(); + VirtualDataCell::String(Some(arr.value(row_idx).to_string())) }, - VirtualDataColumn::Float(v) => VirtualDataCell::Float(v[row_idx]), - VirtualDataColumn::Integer(v) => VirtualDataCell::Integer(v[row_idx]), - VirtualDataColumn::Datetime(v) => VirtualDataCell::Datetime(v[row_idx]), - VirtualDataColumn::IntegerIndex(v) => { - VirtualDataCell::IntegerIndex(v[row_idx].clone()) + DataType::Float64 => { + let arr = col.as_any().downcast_ref::().unwrap(); + VirtualDataCell::Float(Some(arr.value(row_idx))) }, - VirtualDataColumn::RowPath(v) => { - VirtualDataCell::RowPath(v[row_idx].clone()) + DataType::Int32 => { + let arr = col.as_any().downcast_ref::().unwrap(); + VirtualDataCell::Integer(Some(arr.value(row_idx))) }, - }; - (col_name.clone(), row_value) - }) - .collect() + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let arr = col + .as_any() + .downcast_ref::() + .unwrap(); + VirtualDataCell::Datetime(Some(arr.value(row_idx))) + }, + _ => continue, + } + }; + row.insert(field.name().clone(), cell); + } + + row }) .collect() } + /// Serializes the data to a column-oriented JSON string. + pub(crate) fn to_columns_json(&mut self) -> Result> { + let batch = self.freeze().clone(); + let schema = batch.schema(); + let mut map = serde_json::Map::new(); + + // Add RowPath if present + if let Some(ref rp) = self.row_path { + map.insert("__ROW_PATH__".to_string(), serde_json::to_value(rp)?); + } + + for (col_idx, field) in schema.fields().iter().enumerate() { + let col = batch.column(col_idx); + let num_rows = col.len(); + let values: serde_json::Value = match field.data_type() { + DataType::Boolean => { + let arr = col.as_any().downcast_ref::().unwrap(); + serde_json::to_value( + (0..num_rows) + .map(|i| { + if arr.is_null(i) { + None + } else { + Some(arr.value(i)) + } + }) + .collect::>(), + )? + }, + DataType::Utf8 => { + let arr = col.as_any().downcast_ref::().unwrap(); + serde_json::to_value( + (0..num_rows) + .map(|i| { + if arr.is_null(i) { + None + } else { + Some(arr.value(i)) + } + }) + .collect::>(), + )? + }, + DataType::Float64 => { + let arr = col.as_any().downcast_ref::().unwrap(); + serde_json::to_value( + (0..num_rows) + .map(|i| { + if arr.is_null(i) { + None + } else { + Some(arr.value(i)) + } + }) + .collect::>(), + )? + }, + DataType::Int32 => { + let arr = col.as_any().downcast_ref::().unwrap(); + serde_json::to_value( + (0..num_rows) + .map(|i| { + if arr.is_null(i) { + None + } else { + Some(arr.value(i)) + } + }) + .collect::>(), + )? + }, + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let arr = col + .as_any() + .downcast_ref::() + .unwrap(); + serde_json::to_value( + (0..num_rows) + .map(|i| { + if arr.is_null(i) { + None + } else { + Some(arr.value(i)) + } + }) + .collect::>(), + )? + }, + _ => continue, + }; + map.insert(field.name().clone(), values); + } + + Ok(serde_json::to_string(&map)?) + } + /// Sets a value in a column at the specified row index. /// /// If `group_by_index` is `Some`, the value is added to the `__ROW_PATH__` @@ -203,19 +490,11 @@ impl VirtualDataSlice { ) -> Result<(), Box> { if name.starts_with("__ROW_PATH_") { let group_by_index: u32 = name[11..name.len() - 2].parse()?; - let max_grouping_id = 2_i32.pow((self.0.group_by.len() as u32) - group_by_index) - 1; - if grouping_id.map(|x| x as i32).unwrap_or(0) < max_grouping_id { - if !self.contains_key("__ROW_PATH__") { - self.insert( - "__ROW_PATH__".to_owned(), - VirtualDataColumn::RowPath(vec![]), - ); - } - - let Some(VirtualDataColumn::RowPath(col)) = self.get_mut("__ROW_PATH__") else { - return Err("__ROW_PATH__ column has unexpected type".into()); - }; + let max_grouping_id = + 2_i32.pow((self.config.group_by.len() as u32) - group_by_index) - 1; + if grouping_id.map(|x| x as i32).unwrap_or(i32::MAX) < max_grouping_id { + let col = self.row_path.get_or_insert_with(Vec::new); if let Some(row) = col.get_mut(index) { let scalar = value.to_scalar(); row.push(scalar); @@ -231,11 +510,12 @@ impl VirtualDataSlice { Ok(()) } else { - if !self.contains_key(name) { - self.insert(name.to_owned(), T::new_column()); + if !self.builders.contains_key(name) { + self.builders.insert(name.to_owned(), T::new_builder()); } let col = self + .builders .get_mut(name) .ok_or_else(|| format!("Column '{}' not found after insertion", name))?; diff --git a/rust/perspective-client/src/rust/virtual_server/mod.rs b/rust/perspective-client/src/rust/virtual_server/mod.rs index 167856ed4d..ad70a46b18 100644 --- a/rust/perspective-client/src/rust/virtual_server/mod.rs +++ b/rust/perspective-client/src/rust/virtual_server/mod.rs @@ -22,7 +22,7 @@ mod generic_sql_model; mod handler; mod server; -pub use data::{SetVirtualDataColumn, VirtualDataCell, VirtualDataColumn, VirtualDataSlice}; +pub use data::{SetVirtualDataColumn, VirtualDataCell, VirtualDataSlice}; pub use error::{ResultExt, VirtualServerError}; pub use features::{AggSpec, Features}; pub use generic_sql_model::{ diff --git a/rust/perspective-client/src/rust/virtual_server/server.rs b/rust/perspective-client/src/rust/virtual_server/server.rs index ea4ae41978..fedd648a8b 100644 --- a/rust/perspective-client/src/rust/virtual_server/server.rs +++ b/rust/perspective-client/src/rust/virtual_server/server.rs @@ -27,7 +27,8 @@ use crate::proto::{ TableSchemaResp, TableSizeResp, TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp, ViewDimensionsResp, ViewExpressionSchemaResp, ViewGetConfigResp, ViewGetMinMaxResp, ViewOnDeleteResp, ViewOnUpdateResp, ViewRemoveDeleteResp, ViewRemoveOnUpdateResp, - ViewSchemaResp, ViewToColumnsStringResp, ViewToRowsStringResp, + ViewSchemaResp, ViewToArrowResp, ViewToColumnsStringResp, ViewToCsvResp, + ViewToNdjsonStringResp, ViewToRowsStringResp, }; macro_rules! respond { @@ -297,11 +298,73 @@ impl VirtualServer { .collect() }) }, + ViewToArrowReq(view_to_arrow_req) => { + let viewport = view_to_arrow_req.viewport.unwrap(); + let schema = self.get_cached_view_schema(&msg.entity_id, false).await?; + let config = self.view_configs.get(&msg.entity_id).unwrap(); + let mut cols = self + .handler + .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) + .await?; + + let arrow = cols + .to_arrow_ipc() + .map_err(|e| VirtualServerError::Other(e.to_string()))?; + + respond!(msg, ViewToArrowResp { arrow }) + }, + ViewToCsvReq(view_to_csv_req) => { + let viewport = view_to_csv_req.viewport.unwrap(); + let schema = self.get_cached_view_schema(&msg.entity_id, false).await?; + let config = self.view_configs.get(&msg.entity_id).unwrap(); + let mut cols = self + .handler + .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) + .await?; + + let rows = cols.to_rows(); + let mut csv = String::new(); + if let Some(first_row) = rows.first() { + let headers: Vec<&str> = first_row.keys().map(|k| k.as_str()).collect(); + csv.push_str(&headers.join(",")); + csv.push('\n'); + } + + for row in &rows { + let values: Vec = row + .values() + .map(|cell| serde_json::to_string(cell).unwrap_or_default()) + .collect(); + csv.push_str(&values.join(",")); + csv.push('\n'); + } + + respond!(msg, ViewToCsvResp { csv }) + }, + ViewToNdjsonStringReq(view_to_ndjson_req) => { + let viewport = view_to_ndjson_req.viewport.unwrap(); + let schema = self.get_cached_view_schema(&msg.entity_id, false).await?; + let config = self.view_configs.get(&msg.entity_id).unwrap(); + let mut cols = self + .handler + .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) + .await?; + + let rows = cols.to_rows(); + let ndjson_string = rows + .iter() + .map(serde_json::to_string) + .collect::, _>>() + .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))? + .join("\n"); + + respond!(msg, ViewToNdjsonStringResp { ndjson_string }) + }, ViewToRowsStringReq(view_to_rows_string_req) => { let viewport = view_to_rows_string_req.viewport.unwrap(); let schema = self.get_cached_view_schema(&msg.entity_id, false).await?; let config = self.view_configs.get(&msg.entity_id).unwrap(); - let cols = self + let mut cols = self .handler .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) .await?; @@ -316,13 +379,14 @@ impl VirtualServer { let viewport = view_to_columns_string_req.viewport.unwrap(); let schema = self.get_cached_view_schema(&msg.entity_id, false).await?; let config = self.view_configs.get(&msg.entity_id).unwrap(); - let cols = self + let mut cols = self .handler .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) .await?; - let json_string = serde_json::to_string(&cols) - .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?; + let json_string = cols + .to_columns_json() + .map_err(|e| VirtualServerError::Other(e.to_string()))?; respond!(msg, ViewToColumnsStringResp { json_string }) }, From ee9792c5c75f46566b42a7b5f0c904bef14cc3d8 Mon Sep 17 00:00:00 2001 From: Andrew Stein Date: Mon, 23 Feb 2026 21:28:07 -0500 Subject: [PATCH 2/2] Use Arrow IPC in Virtual Server API Signed-off-by: Andrew Stein --- Cargo.lock | 205 +--------- pnpm-lock.yaml | 45 +-- pnpm-workspace.yaml | 4 +- rust/perspective-client/Cargo.toml | 9 +- .../src/rust/virtual_server/data.rs | 360 ++++++++++++++++-- .../src/rust/virtual_server/server.rs | 10 +- .../perspective-js/src/rust/virtual_server.rs | 14 +- .../src/ts/virtual_servers/clickhouse.ts | 15 +- .../src/ts/virtual_servers/duckdb.ts | 68 +--- .../perspective/virtual_servers/clickhouse.py | 15 +- .../perspective/virtual_servers/duckdb.py | 11 +- .../perspective/virtual_servers/polars.py | 7 +- 12 files changed, 394 insertions(+), 369 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04c387e108..87deef98fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,40 +90,6 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" -[[package]] -name = "arrow" -version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4754a624e5ae42081f464514be454b39711daae0458906dacde5f4c632f33a8" -dependencies = [ - "arrow-arith", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-csv", - "arrow-data", - "arrow-ipc", - "arrow-ord", - "arrow-row", - "arrow-schema", - "arrow-select", - "arrow-string", -] - -[[package]] -name = "arrow-arith" -version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b3141e0ec5145a22d8694ea8b6d6f69305971c4fa1c1a13ef0195aef2d678b" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "chrono", - "num-traits", -] - [[package]] name = "arrow-array" version = "57.3.0" @@ -154,42 +120,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "arrow-cast" -version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "646bbb821e86fd57189c10b4fcdaa941deaf4181924917b0daa92735baa6ada5" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-ord", - "arrow-schema", - "arrow-select", - "atoi", - "base64", - "chrono", - "half", - "lexical-core", - "num-traits", - "ryu", -] - -[[package]] -name = "arrow-csv" -version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da746f4180004e3ce7b83c977daf6394d768332349d3d913998b10a120b790a" -dependencies = [ - "arrow-array", - "arrow-cast", - "arrow-schema", - "chrono", - "csv", - "csv-core", - "regex", -] - [[package]] name = "arrow-data" version = "57.3.0" @@ -217,32 +147,6 @@ dependencies = [ "flatbuffers", ] -[[package]] -name = "arrow-ord" -version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d8f1870e03d4cbed632959498bcc84083b5a24bded52905ae1695bd29da45b" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", -] - -[[package]] -name = "arrow-row" -version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18228633bad92bff92a95746bbeb16e5fc318e8382b75619dec26db79e4de4c0" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "half", -] - [[package]] name = "arrow-schema" version = "57.3.0" @@ -263,23 +167,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "arrow-string" -version = "57.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e968097061b3c0e9fe3079cf2e703e487890700546b5b0647f60fca1b5a8d8" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", - "memchr", - "num-traits", - "regex", - "regex-syntax", -] - [[package]] name = "async-lock" version = "2.8.0" @@ -300,15 +187,6 @@ dependencies = [ "syn 2.0.111", ] -[[package]] -name = "atoi" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" -dependencies = [ - "num-traits", -] - [[package]] name = "atomic-waker" version = "1.1.2" @@ -684,27 +562,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "csv" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" -dependencies = [ - "csv-core", - "itoa", - "ryu", - "serde_core", -] - -[[package]] -name = "csv-core" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" -dependencies = [ - "memchr", -] - [[package]] name = "cxx" version = "1.0.190" @@ -1669,63 +1526,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" -[[package]] -name = "lexical-core" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" -dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", -] - -[[package]] -name = "lexical-parse-float" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" -dependencies = [ - "lexical-parse-integer", - "lexical-util", -] - -[[package]] -name = "lexical-parse-integer" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" -dependencies = [ - "lexical-util", -] - -[[package]] -name = "lexical-util" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" - -[[package]] -name = "lexical-write-float" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" -dependencies = [ - "lexical-util", - "lexical-write-integer", -] - -[[package]] -name = "lexical-write-integer" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" -dependencies = [ - "lexical-util", -] - [[package]] name = "libc" version = "0.2.178" @@ -2091,7 +1891,9 @@ dependencies = [ name = "perspective-client" version = "4.3.0" dependencies = [ - "arrow", + "arrow-array", + "arrow-ipc", + "arrow-schema", "async-lock", "futures", "getrandom 0.3.4", @@ -2853,6 +2655,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ + "indexmap 2.12.1", "itoa", "memchr", "ryu", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a1f72d589f..37473fb53c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16,8 +16,8 @@ catalogs: specifier: 6.2.0 version: 6.2.0 '@duckdb/duckdb-wasm': - specifier: ^1.30.0 - version: 1.32.0 + specifier: 1.33.1-dev18.0 + version: 1.33.1-dev18.0 '@fontsource/roboto-mono': specifier: 4.5.10 version: 4.5.10 @@ -61,8 +61,8 @@ catalogs: specifier: ^2.7.54 version: 2.8.8 apache-arrow: - specifier: 18.1.0 - version: 18.1.0 + specifier: 17.0.0 + version: 17.0.0 arraybuffer-loader: specifier: '>=1.0.8 <2' version: 1.0.8 @@ -405,7 +405,7 @@ importers: dependencies: '@duckdb/duckdb-wasm': specifier: 'catalog:' - version: 1.32.0 + version: 1.33.1-dev18.0 '@perspective-dev/client': specifier: workspace:^ version: link:../../rust/perspective-js @@ -1038,7 +1038,7 @@ importers: version: 1.17.0 '@duckdb/duckdb-wasm': specifier: 'catalog:' - version: 1.32.0 + version: 1.33.1-dev18.0 '@perspective-dev/esbuild-plugin': specifier: 'workspace:' version: link:../../tools/esbuild-plugin @@ -1065,7 +1065,7 @@ importers: version: 8.18.1 apache-arrow: specifier: 'catalog:' - version: 18.1.0 + version: 17.0.0 lodash: specifier: 'catalog:' version: 4.17.21 @@ -2607,8 +2607,8 @@ packages: resolution: {integrity: sha512-lBSBiRruFurFKXr5Hbsl2thmGweAPmddhF3jb99U4EMDA5L+e5Y1rAkOS07Nvrup7HUMBDrCV45meaxZnt28nQ==} engines: {node: '>=20.0'} - '@duckdb/duckdb-wasm@1.32.0': - resolution: {integrity: sha512-IewXTNYEjsZCPE9weUWgtjGxUlMRo7qhX0GF6tq/KjK8bnY+RAl4cyUdYUfcdzbyb4b9ZxPC+FOsCcxgaKFWMg==} + '@duckdb/duckdb-wasm@1.33.1-dev18.0': + resolution: {integrity: sha512-BVF+CYmOsrMRY8rnJa0USmC5PujPs2eZovQrSEeHs85J/MZ9mSblhzDih9BPu4MgwUxK9dYbYxKkBIJ4jv3EKw==} '@esbuild/aix-ppc64@0.25.11': resolution: {integrity: sha512-Xt1dOL13m8u0WE8iplx9Ibbm+hFAO0GsU2P34UNoDGvZYkY8ifSiy6Zuc1lYxfG7svWE2fzqCUmFp5HCn51gJg==} @@ -4172,10 +4172,6 @@ packages: resolution: {integrity: sha512-X0p7auzdnGuhYMVKYINdQssS4EcKec9TCXyez/qtJt32DrIMGbzqiaMiQ0X6fQlQpw8Fl0Qygcv4dfRAr5Gu9Q==} hasBin: true - apache-arrow@18.1.0: - resolution: {integrity: sha512-v/ShMp57iBnBp4lDgV8Jx3d3Q5/Hac25FWmQ98eMahUiHPXcvwIMKJD0hBIgclm/FCG+LwPkAKtkRO1O/W0YGg==} - hasBin: true - arg@5.0.2: resolution: {integrity: sha512-PYjyFOLKQ9y57JvQ6QLo8dAgNqswh8M1RMJYdQduT6xbWSgK36P/Z/v+p888pM69jMMfS8Xd8F6I1kQ/I9HUGg==} @@ -7745,6 +7741,10 @@ packages: resolution: {integrity: sha512-YWWTjgABSKcvs/nWBi9PycY/JiPJqOD4JA6o9Sej2AtvSGarXxKC3OQSk4pAarbdQlKAh5D4FCQkJNkW+GAn3w==} engines: {node: '>=0.6'} + qs@6.15.0: + resolution: {integrity: sha512-mAZTtNCeetKMH+pSjrb76NAM8V9a05I9aBZOHztWy/UqcJdQYNsf59vrRKWnojAT9Y+GbIvoTBC++CPHqpDBhQ==} + engines: {node: '>=0.6'} + querystring@0.2.0: resolution: {integrity: sha512-X/xY82scca2tau62i9mDyU9K+I+djTMUsvwf7xnUX5GLvVzgJybOJf4Y6o9Zx3oJK/LSXg5tTZBjwzqVPaPO2g==} engines: {node: '>=0.4.x'} @@ -11260,9 +11260,10 @@ snapshots: - uglify-js - webpack-cli - '@duckdb/duckdb-wasm@1.32.0': + '@duckdb/duckdb-wasm@1.33.1-dev18.0': dependencies: apache-arrow: 17.0.0 + qs: 6.15.0 '@esbuild/aix-ppc64@0.25.11': optional: true @@ -13411,18 +13412,6 @@ snapshots: json-bignum: 0.0.3 tslib: 2.8.1 - apache-arrow@18.1.0: - dependencies: - '@swc/helpers': 0.5.17 - '@types/command-line-args': 5.2.3 - '@types/command-line-usage': 5.0.4 - '@types/node': 20.19.23 - command-line-args: 5.2.1 - command-line-usage: 7.0.3 - flatbuffers: 24.12.23 - json-bignum: 0.0.3 - tslib: 2.8.1 - arg@5.0.2: {} argparse@1.0.10: @@ -17641,6 +17630,10 @@ snapshots: dependencies: side-channel: 1.1.0 + qs@6.15.0: + dependencies: + side-channel: 1.1.0 + querystring@0.2.0: optional: true diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index d9a1b37ddf..4609d8d215 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -45,7 +45,7 @@ catalog: # Dev Dependencies "@clickhouse/client-web": "^1.12.0" - "@duckdb/duckdb-wasm": "^1.30.0" + "@duckdb/duckdb-wasm": "1.33.1-dev18.0" "@duckdb/duckdb-wasm-shell": "^1.30.0" "@fontsource/roboto-mono": "4.5.10" "@iarna/toml": "3.0.0" @@ -62,7 +62,7 @@ catalog: "@types/ws": ">=8" "@types/chroma-js": "^3.1.2" "@zip.js/zip.js": "^2.7.54" - "apache-arrow": "18.1.0" + "apache-arrow": "17.0.0" "arraybuffer-loader": ">=1.0.8 <2" "auto-changelog": "^2.5.0" "chalk": ">=5" diff --git a/rust/perspective-client/Cargo.toml b/rust/perspective-client/Cargo.toml index f469146d71..74c1db89f9 100644 --- a/rust/perspective-client/Cargo.toml +++ b/rust/perspective-client/Cargo.toml @@ -65,10 +65,9 @@ prost-build = { version = "0.12.3" } protobuf-src = { version = "2.1.1", optional = true } [dependencies] -arrow = { version = "57.3.0", default-features = false, features = [ - "csv", - "ipc", -] } +arrow-array = { version = "57.3.0", default-features = false } +arrow-ipc = { version = "57.3.0", default-features = false } +arrow-schema = { version = "57.3.0", default-features = false } async-lock = { version = "2.5.0" } futures = { version = "0.3.28" } indexmap = { version = "2.2.6", features = ["serde"] } @@ -82,7 +81,7 @@ num-traits = "0.2.19" rand = { version = ">=0.9,<1" } serde = { version = "1.0", features = ["derive"] } serde_bytes = { version = "0.11" } -serde_json = { version = "1.0.107", features = ["raw_value"] } +serde_json = { version = "1.0.107", features = ["raw_value", "preserve_order"] } talc = { version = "4.4.3", features = ["counters"], optional = true } thiserror = { version = "1.0.55" } tracing = { version = "0.1.36" } diff --git a/rust/perspective-client/src/rust/virtual_server/data.rs b/rust/perspective-client/src/rust/virtual_server/data.rs index 2cbd76aef3..5b3d33cd51 100644 --- a/rust/perspective-client/src/rust/virtual_server/data.rs +++ b/rust/perspective-client/src/rust/virtual_server/data.rs @@ -13,18 +13,21 @@ use std::error::Error; use std::sync::Arc; -use arrow::array::{ - Array, ArrayRef, BooleanArray, BooleanBuilder, Float64Array, Float64Builder, Int32Array, - Int32Builder, RecordBatch, StringArray, StringBuilder, TimestampMillisecondArray, - TimestampMillisecondBuilder, +use arrow_array::builder::{ + BooleanBuilder, Float64Builder, Int32Builder, StringBuilder, TimestampMillisecondBuilder, }; -use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; -use arrow::ipc::reader::StreamReader; -use arrow::ipc::writer::StreamWriter; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Date32Array, Decimal128Array, Float64Array, Int32Array, + Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow_ipc::reader::{FileReader, StreamReader}; +use arrow_ipc::writer::StreamWriter; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; use indexmap::IndexMap; use serde::Serialize; -use crate::config::{Scalar, ViewConfig}; +use crate::config::{GroupRollupMode, Scalar, ViewConfig}; /// An Arrow column builder, used during the population phase of /// [`VirtualDataSlice`]. @@ -225,6 +228,182 @@ impl std::fmt::Debug for ColumnBuilder { } } +/// Extracts grouping ID values from an Arrow array as `i64`. +fn cast_to_int64(array: &ArrayRef) -> Result, Box> { + let num_rows = array.len(); + let mut result = Vec::with_capacity(num_rows); + match array.data_type() { + DataType::Int32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + for i in 0..num_rows { + result.push(if arr.is_null(i) { + 0 + } else { + arr.value(i) as i64 + }); + } + }, + DataType::Int64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + for i in 0..num_rows { + result.push(if arr.is_null(i) { 0 } else { arr.value(i) }); + } + }, + DataType::Float64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + for i in 0..num_rows { + result.push(if arr.is_null(i) { + 0 + } else { + arr.value(i) as i64 + }); + } + }, + dt => return Err(format!("Cannot cast {} to Int64", dt).into()), + } + Ok(result) +} + +/// Extracts a single cell from an Arrow array as a [`Scalar`]. +fn extract_scalar(array: &ArrayRef, row_idx: usize) -> Scalar { + if array.is_null(row_idx) { + return Scalar::Null; + } + match array.data_type() { + DataType::Utf8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Scalar::String(arr.value(row_idx).to_string()) + }, + DataType::Float64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Scalar::Float(arr.value(row_idx)) + }, + DataType::Int32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Scalar::Float(arr.value(row_idx) as f64) + }, + DataType::Int64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Scalar::Float(arr.value(row_idx) as f64) + }, + DataType::Boolean => { + let arr = array.as_any().downcast_ref::().unwrap(); + Scalar::Bool(arr.value(row_idx)) + }, + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let arr = array + .as_any() + .downcast_ref::() + .unwrap(); + Scalar::Float(arr.value(row_idx) as f64) + }, + DataType::Date32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Scalar::Float(arr.value(row_idx) as f64 * 86_400_000.0) + }, + _ => Scalar::String(format!("{:?}", array)), + } +} + +/// Coerces an Arrow column to Perspective-compatible types, optionally +/// renaming. +/// Manually converts a timestamp array of any unit to milliseconds. +fn timestamp_to_millis(array: &ArrayRef, unit: &TimeUnit) -> ArrayRef { + let millis: TimestampMillisecondArray = match unit { + TimeUnit::Second => { + let arr = array + .as_any() + .downcast_ref::() + .unwrap(); + arr.iter().map(|v| v.map(|v| v * 1_000)).collect() + }, + TimeUnit::Microsecond => { + let arr = array + .as_any() + .downcast_ref::() + .unwrap(); + arr.iter().map(|v| v.map(|v| v / 1_000)).collect() + }, + TimeUnit::Nanosecond => { + let arr = array + .as_any() + .downcast_ref::() + .unwrap(); + arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect() + }, + TimeUnit::Millisecond => { + return array.clone(); + }, + }; + Arc::new(millis) as ArrayRef +} + +fn coerce_column( + name: &str, + field: &Field, + array: &ArrayRef, +) -> Result<(Field, ArrayRef), Box> { + match field.data_type() { + DataType::Boolean + | DataType::Utf8 + | DataType::Float64 + | DataType::Int32 + | DataType::Date32 => Ok(( + Field::new(name, field.data_type().clone(), true), + array.clone(), + )), + DataType::Timestamp(TimeUnit::Millisecond, _) => Ok(( + Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true), + array.clone(), + )), + DataType::Int64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect(); + Ok(( + Field::new(name, DataType::Float64, true), + Arc::new(result) as ArrayRef, + )) + }, + DataType::Decimal128(_, scale) => { + let scale = *scale; + let arr = array.as_any().downcast_ref::().unwrap(); + let divisor = 10_f64.powi(scale as i32); + let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64 / divisor)).collect(); + Ok(( + Field::new(name, DataType::Float64, true), + Arc::new(result) as ArrayRef, + )) + }, + DataType::Timestamp(unit, _) => { + let casted = timestamp_to_millis(array, unit); + Ok(( + Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true), + casted, + )) + }, + dt => { + tracing::warn!( + "Coercing unknown Arrow type {} to Utf8 for column '{}'", + dt, + name + ); + let num_rows = array.len(); + let mut builder = StringBuilder::new(); + for i in 0..num_rows { + if array.is_null(i) { + builder.append_null(); + } else { + builder.append_value(format!("{:?}", array)); + } + } + Ok(( + Field::new(name, DataType::Utf8, true), + Arc::new(builder.finish()) as ArrayRef, + )) + }, + } +} + impl VirtualDataSlice { pub fn new(config: ViewConfig) -> Self { VirtualDataSlice { @@ -235,20 +414,111 @@ impl VirtualDataSlice { } } - /// Loads data from Arrow IPC streaming format bytes, replacing any - /// existing builder state. + /// Loads data from Arrow IPC file format bytes, with automatic + /// post-processing based on the view configuration. + /// + /// When `group_by` is active, extracts `__GROUPING_ID__` and + /// `__ROW_PATH_N__` columns to build `self.row_path`, then removes + /// them from the output `RecordBatch`. /// - /// This is an alternative to populating the slice cell-by-cell via - /// [`set_col`]. The IPC stream should contain a single `RecordBatch` - /// (only the first batch is used if multiple are present). + /// When `split_by` is active, renames data columns by replacing `_` + /// with `|` (the DuckDB PIVOT separator). + /// + /// Also coerces non-standard Arrow types (e.g. `Decimal128`, `Int64`) + /// to Perspective-compatible types. pub fn from_arrow_ipc(&mut self, ipc: &[u8]) -> Result<(), Box> { let cursor = std::io::Cursor::new(ipc); - let mut reader = StreamReader::try_new(cursor, None)?; - let batch = reader - .next() - .ok_or("Arrow IPC stream contained no record batches")??; + let batch = if &ipc[0..6] == "ARROW1".as_bytes() { + FileReader::try_new(cursor, None)? + .next() + .ok_or("Arrow IPC stream contained no record batches")?? + } else { + StreamReader::try_new(cursor, None)? + .next() + .ok_or("Arrow IPC stream contained no record batches")?? + }; + + let has_group_by = !self.config.group_by.is_empty(); + let has_split_by = !self.config.split_by.is_empty(); + let is_total = self.config.group_rollup_mode == GroupRollupMode::Total; + + if !has_group_by && !has_split_by && !is_total { + self.frozen = Some(batch); + return Ok(()); + } + + let num_rows = batch.num_rows(); + let schema = batch.schema(); + + // Phase A: Extract row_path from __GROUPING_ID__ and __ROW_PATH_N__ + if has_group_by { + let group_by_len = self.config.group_by.len(); + let is_flat = self.config.group_rollup_mode == GroupRollupMode::Flat; + let grouping_ids = if is_flat { + None + } else { + let grouping_id_idx = schema + .index_of("__GROUPING_ID__") + .map_err(|_| "Missing __GROUPING_ID__ column")?; + Some(cast_to_int64(batch.column(grouping_id_idx))?) + }; + + let mut row_paths: Vec> = (0..num_rows).map(|_| Vec::new()).collect(); + for gidx in 0..group_by_len { + let col_name = format!("__ROW_PATH_{}__", gidx); + let col_idx = schema + .index_of(&col_name) + .map_err(|_| format!("Missing {} column", col_name))?; + + let col = batch.column(col_idx); + + // In flat mode, all rows are leaf rows + if is_flat { + // TODO I may be dumb but I'm not exactly sure what Clippy + // wants here. This could be an `enumerate` but how is this + // better? + #[allow(clippy::needless_range_loop)] + for row_idx in 0..num_rows { + row_paths[row_idx].push(extract_scalar(col, row_idx)); + } + } else { + let gids = grouping_ids.as_ref().unwrap(); + let max_grouping_id = 2_i64.pow(group_by_len as u32 - gidx as u32) - 1; + for row_idx in 0..num_rows { + if gids[row_idx] < max_grouping_id { + row_paths[row_idx].push(extract_scalar(col, row_idx)); + } + } + } + } - self.frozen = Some(batch); + self.row_path = Some(row_paths); + } + + // Phase B: Rebuild RecordBatch without metadata columns, with + // column renames and type coercion. + let mut new_fields = Vec::new(); + let mut new_arrays: Vec = Vec::new(); + for (col_idx, field) in schema.fields().iter().enumerate() { + let name = field.name(); + if name == "__GROUPING_ID__" || name.starts_with("__ROW_PATH_") { + continue; + } + + let new_name = if has_split_by && !name.starts_with("__") { + name.replace('_', "|") + } else { + name.clone() + }; + + let (coerced_field, coerced_array) = + coerce_column(&new_name, field, batch.column(col_idx))?; + new_fields.push(coerced_field); + new_arrays.push(coerced_array); + } + + let new_schema = Arc::new(Schema::new(new_fields)); + self.frozen = Some(RecordBatch::try_new(new_schema, new_arrays)?); Ok(()) } @@ -295,7 +565,7 @@ impl VirtualDataSlice { } /// Serializes the data to Arrow IPC streaming format. - pub(crate) fn to_arrow_ipc(&mut self) -> Result, Box> { + pub(crate) fn render_to_arrow_ipc(&mut self) -> Result, Box> { let batch = self.freeze().clone(); let schema = batch.schema(); let mut buf = Vec::new(); @@ -309,7 +579,7 @@ impl VirtualDataSlice { /// Converts the columnar data to a row-oriented representation for JSON /// serialization. - pub(crate) fn to_rows(&mut self) -> Vec> { + pub(crate) fn render_to_rows(&mut self) -> Vec> { let batch = self.freeze().clone(); let num_rows = batch.num_rows(); let schema = batch.schema(); @@ -367,7 +637,16 @@ impl VirtualDataSlice { .unwrap(); VirtualDataCell::Datetime(Some(arr.value(row_idx))) }, - _ => continue, + DataType::Date32 => { + let arr = col.as_any().downcast_ref::().unwrap(); + VirtualDataCell::Datetime(Some( + arr.value(row_idx) as i64 * 86_400_000, + )) + }, + x => { + tracing::error!("Unknown Arrow IPC type {}", x); + continue; + }, } }; row.insert(field.name().clone(), cell); @@ -379,7 +658,7 @@ impl VirtualDataSlice { } /// Serializes the data to a column-oriented JSON string. - pub(crate) fn to_columns_json(&mut self) -> Result> { + pub(crate) fn render_to_columns_json(&mut self) -> Result> { let batch = self.freeze().clone(); let schema = batch.schema(); let mut map = serde_json::Map::new(); @@ -466,7 +745,24 @@ impl VirtualDataSlice { .collect::>(), )? }, - _ => continue, + DataType::Date32 => { + let arr = col.as_any().downcast_ref::().unwrap(); + serde_json::to_value( + (0..num_rows) + .map(|i| { + if arr.is_null(i) { + None + } else { + Some(arr.value(i) as i64 * 86_400_000) + } + }) + .collect::>(), + )? + }, + x => { + tracing::error!("Unknown Arrow IPC type {}", x); + continue; + }, }; map.insert(field.name().clone(), values); } @@ -488,6 +784,10 @@ impl VirtualDataSlice { index: usize, value: T, ) -> Result<(), Box> { + if name == "__GROUPING_ID__" { + return Ok(()); + } + if name.starts_with("__ROW_PATH_") { let group_by_index: u32 = name[11..name.len() - 2].parse()?; let max_grouping_id = @@ -510,14 +810,20 @@ impl VirtualDataSlice { Ok(()) } else { - if !self.builders.contains_key(name) { - self.builders.insert(name.to_owned(), T::new_builder()); + let col_name = if !self.config.split_by.is_empty() && !name.starts_with("__") { + name.replace('_', "|") + } else { + name.to_owned() + }; + + if !self.builders.contains_key(&col_name) { + self.builders.insert(col_name.clone(), T::new_builder()); } let col = self .builders - .get_mut(name) - .ok_or_else(|| format!("Column '{}' not found after insertion", name))?; + .get_mut(&col_name) + .ok_or_else(|| format!("Column '{}' not found after insertion", col_name))?; Ok(value.write_to(col)?) } diff --git a/rust/perspective-client/src/rust/virtual_server/server.rs b/rust/perspective-client/src/rust/virtual_server/server.rs index fedd648a8b..dfb01de399 100644 --- a/rust/perspective-client/src/rust/virtual_server/server.rs +++ b/rust/perspective-client/src/rust/virtual_server/server.rs @@ -308,7 +308,7 @@ impl VirtualServer { .await?; let arrow = cols - .to_arrow_ipc() + .render_to_arrow_ipc() .map_err(|e| VirtualServerError::Other(e.to_string()))?; respond!(msg, ViewToArrowResp { arrow }) @@ -322,7 +322,7 @@ impl VirtualServer { .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) .await?; - let rows = cols.to_rows(); + let rows = cols.render_to_rows(); let mut csv = String::new(); if let Some(first_row) = rows.first() { let headers: Vec<&str> = first_row.keys().map(|k| k.as_str()).collect(); @@ -350,7 +350,7 @@ impl VirtualServer { .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) .await?; - let rows = cols.to_rows(); + let rows = cols.render_to_rows(); let ndjson_string = rows .iter() .map(serde_json::to_string) @@ -369,7 +369,7 @@ impl VirtualServer { .view_get_data(msg.entity_id.as_str(), config, &schema, &viewport) .await?; - let rows = cols.to_rows(); + let rows = cols.render_to_rows(); let json_string = serde_json::to_string(&rows) .map_err(|e| VirtualServerError::InvalidJSON(std::sync::Arc::new(e)))?; @@ -385,7 +385,7 @@ impl VirtualServer { .await?; let json_string = cols - .to_columns_json() + .render_to_columns_json() .map_err(|e| VirtualServerError::Other(e.to_string()))?; respond!(msg, ViewToColumnsStringResp { json_string }) diff --git a/rust/perspective-js/src/rust/virtual_server.rs b/rust/perspective-js/src/rust/virtual_server.rs index 425010af44..b1b2fc91c8 100644 --- a/rust/perspective-js/src/rust/virtual_server.rs +++ b/rust/perspective-js/src/rust/virtual_server.rs @@ -18,7 +18,7 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; use indexmap::IndexMap; -use js_sys::{Array, Date, Object, Reflect}; +use js_sys::{Array, Date, Object, Reflect, Uint8Array}; use perspective_client::proto::{ColumnType, HostedTable}; use perspective_client::virtual_server; use perspective_client::virtual_server::{Features, ResultExt, VirtualServerHandler}; @@ -587,6 +587,18 @@ impl VirtualDataSlice { ) } + #[wasm_bindgen(js_name = "fromArrowIpc")] + pub fn from_arrow_ipc(&self, ipc: Uint8Array) -> Result<(), JsValue> { + // tracing::error!("L {}", ipc.len()); + self.1 + .lock() + .unwrap() + .as_mut() + .unwrap() + .from_arrow_ipc(&ipc.to_vec()) + .map_err(|e| JsValue::from_str(&e.to_string())) + } + #[wasm_bindgen(js_name = "setCol")] pub fn set_col( &self, diff --git a/rust/perspective-js/src/ts/virtual_servers/clickhouse.ts b/rust/perspective-js/src/ts/virtual_servers/clickhouse.ts index 244da475bc..fb9c472814 100644 --- a/rust/perspective-js/src/ts/virtual_servers/clickhouse.ts +++ b/rust/perspective-js/src/ts/virtual_servers/clickhouse.ts @@ -309,8 +309,6 @@ export class ClickhouseHandler implements perspective.VirtualServerHandler { viewport: ViewWindow, dataSlice: perspective.VirtualDataSlice, ) { - const is_group_by = config.group_by?.length > 0; - const is_split_by = config.split_by?.length > 0; const query = this.sqlBuilder.viewGetData( viewId, config, @@ -323,23 +321,14 @@ export class ClickhouseHandler implements perspective.VirtualServerHandler { }); for (let cidx = 0; cidx < columns.length; cidx++) { - if (cidx === 0 && is_group_by && !is_split_by) { - // This is the grouping_id column, skip it - continue; - } - - let col = columns[cidx]; - if (is_split_by && !col.startsWith("__ROW_PATH_")) { - col = col.replaceAll("_", "|"); - } - + const col = columns[cidx]; const dtype = duckdbTypeToPsp(dtypes[cidx]) as ColumnType; const isDecimal = dtypes[cidx].startsWith("Decimal"); for (let ridx = 0; ridx < rows.length; ridx++) { const row = rows[ridx]; const grouping_id = row["__GROUPING_ID__"]; - let value = row[columns[cidx]]; + let value = row[col]; if (isDecimal) { value = convertDecimalToNumber(value, dtypes[cidx]); } diff --git a/rust/perspective-js/src/ts/virtual_servers/duckdb.ts b/rust/perspective-js/src/ts/virtual_servers/duckdb.ts index b7c13e9b30..436c64f726 100644 --- a/rust/perspective-js/src/ts/virtual_servers/duckdb.ts +++ b/rust/perspective-js/src/ts/virtual_servers/duckdb.ts @@ -119,25 +119,6 @@ function duckdbTypeToPsp(name: string): ColumnType { return "string"; } -function convertDecimalToNumber(value: any, dtypeString: string) { - if (!(value instanceof Uint32Array || value instanceof Int32Array)) { - return value; - } - - let bigIntValue = BigInt(0); - for (let i = 0; i < value.length; i++) { - bigIntValue |= BigInt(value[i]) << BigInt(i * 32); - } - - const scaleMatch = dtypeString.match(/Decimal\[\d+e(\d+)\]/); - if (scaleMatch) { - const scale = parseInt(scaleMatch[1]); - return Number(bigIntValue) / Math.pow(10, scale); - } else { - return Number(bigIntValue); - } -} - async function runQuery( db: duckdb.AsyncDuckDBConnection, query: string, @@ -311,10 +292,6 @@ export class DuckDBHandler implements perspective.VirtualServerHandler { viewport: ViewWindow, dataSlice: perspective.VirtualDataSlice, ) { - const is_group_by = config.group_by?.length > 0; - const is_split_by = config.split_by?.length > 0; - const is_flat = config.group_rollup_mode === "flat"; - const has_grouping_id = is_group_by && !is_flat; const query = this.sqlBuilder.viewGetData( viewId, config, @@ -322,47 +299,10 @@ export class DuckDBHandler implements perspective.VirtualServerHandler { schema, ); - const { rows, columns, dtypes } = await runQuery(this.db, query, { - columns: true, - }); - - for (let cidx = 0; cidx < columns.length; cidx++) { - if (cidx === 0 && has_grouping_id) { - // This is the grouping_id column, skip it - continue; - } - - let col = columns[cidx]; - if (is_split_by && !col.startsWith("__")) { - col = col.replaceAll("_", "|"); - } - - const dtype = duckdbTypeToPsp(dtypes[cidx]) as ColumnType; - const isDecimal = dtypes[cidx].startsWith("Decimal"); - for (let ridx = 0; ridx < rows.length; ridx++) { - const rowArray = rows[ridx].toArray(); - const grouping_id = has_grouping_id - ? Number(rowArray[0]) - : undefined; - let value = rowArray[cidx]; - if (isDecimal) { - value = convertDecimalToNumber(value, dtypes[cidx]); - } - - if (typeof value === "bigint") { - value = Number(value); - } - - if (typeof value !== "string" && dtype === "string") { - try { - value = JSON.stringify(value); - } catch (e) { - value = `${value}`; - } - } + const ipc = await this.db.useUnsafe((bindings, conn) => + bindings.runQuery(conn, query), + ); - dataSlice.setCol(dtype, col, ridx, value, grouping_id); - } - } + dataSlice.fromArrowIpc(ipc); } } diff --git a/rust/perspective-python/perspective/virtual_servers/clickhouse.py b/rust/perspective-python/perspective/virtual_servers/clickhouse.py index 7d1cb43f27..67d00a1897 100644 --- a/rust/perspective-python/perspective/virtual_servers/clickhouse.py +++ b/rust/perspective-python/perspective/virtual_servers/clickhouse.py @@ -171,25 +171,12 @@ def view_get_min_max(self, view_name, column_name, config): def view_get_data(self, view_name, config, schema, viewport, data): group_by = config["group_by"] - split_by = config["split_by"] query = self.sql_builder.view_get_data(view_name, config, viewport, schema) results, columns, dtypes = run_query(self.db, query, columns=True) for cidx, col in enumerate(columns): - if cidx == 0 and len(group_by) > 0 and len(split_by) == 0: - continue - - if len(split_by) > 0 and not col.startswith("__ROW_PATH_"): - col = col.replace("_", "|") - - # print( - # dtypes[cidx], type(dtypes[cidx]), dir(dtypes[cidx]), dtypes[cidx].name - # ) - dtype = clickhouse_type_to_psp(str(dtypes[cidx])) for ridx, row in enumerate(results): - grouping_id = ( - row[0] if len(group_by) > 0 and len(split_by) == 0 else None - ) + grouping_id = row[0] if len(group_by) > 0 else None value = row[cidx] if dtype == "string" and not isinstance(value, str): diff --git a/rust/perspective-python/perspective/virtual_servers/duckdb.py b/rust/perspective-python/perspective/virtual_servers/duckdb.py index 283926ac56..dedf6cfa1c 100644 --- a/rust/perspective-python/perspective/virtual_servers/duckdb.py +++ b/rust/perspective-python/perspective/virtual_servers/duckdb.py @@ -180,21 +180,12 @@ def view_get_min_max(self, view_name, column_name, config): def view_get_data(self, view_name, config, schema, viewport, data): group_by = config["group_by"] - split_by = config["split_by"] - is_group_by = len(group_by) > 0 - is_split_by = len(split_by) > 0 query = self.sql_builder.view_get_data(view_name, config, viewport, schema) results, columns, dtypes = run_query(self.db, query, columns=True) for cidx, col in enumerate(columns): - if cidx == 0 and is_group_by: - continue - - if is_split_by and not col.startswith("__"): - col = col.replace("_", "|") - dtype = duckdb_type_to_psp(str(dtypes[cidx])) for ridx, row in enumerate(results): - grouping_id = row[0] if is_group_by else None + grouping_id = row[0] if len(group_by) > 0 else None data.set_col(dtype, col, ridx, row[cidx], grouping_id) diff --git a/rust/perspective-python/perspective/virtual_servers/polars.py b/rust/perspective-python/perspective/virtual_servers/polars.py index 553c3e239d..6d60ed9669 100644 --- a/rust/perspective-python/perspective/virtual_servers/polars.py +++ b/rust/perspective-python/perspective/virtual_servers/polars.py @@ -278,7 +278,12 @@ def view_get_data(self, view_name, config, schema, viewport, data): push_col = col.replace("_", "|") for ridx, value in enumerate(values): - grouping_id = grouping_ids[ridx] if grouping_ids else None + if grouping_ids: + grouping_id = grouping_ids[ridx] + elif has_group_by: + grouping_id = 0 + else: + grouping_id = None if value is not None and isinstance(value, float) and value != value: value = None