-
Notifications
You must be signed in to change notification settings - Fork 2k
Introduce morsel-driven Parquet scan #20481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
34963ed
a67f9ac
d0da5da
32eec3c
5dc895c
cc73788
d517b5d
de1606d
950f6db
7f57317
fd6d7fd
37126bf
2d3c33e
98f0ea9
a389b02
4065448
415315d
13b4977
a30c3f8
8b32ca8
876c296
d2df36b
869b7d3
67ea9ab
e845675
6885981
3384b8f
211d4fc
2db61f1
c859d6a
24b95fb
80fa1ec
1dcd401
04b08a6
9799b96
de29e40
aa27a43
9a4aa84
692bff6
9a9cf0b
f79fe63
976d8dc
f937f98
1bc9375
e0e8520
25b044b
b872660
eb7dfa3
4ed7af3
b440313
a11409c
e95c2d7
b71a2a7
c383e6f
ccd21c8
763fff2
2e2b68b
6ee74d6
040eacc
06e254b
c769c2d
74b7fce
7238c1c
f7614b7
28e64c9
8eda140
afd2936
e31bf97
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,8 +49,11 @@ mod tests { | |
| use datafusion_common::config::TableParquetOptions; | ||
| use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; | ||
| use datafusion_common::{Result, ScalarValue, assert_contains}; | ||
| use datafusion_common_runtime::SpawnedTask; | ||
| use datafusion_datasource::file_format::FileFormat; | ||
| use datafusion_datasource::file_scan_config::FileScanConfigBuilder; | ||
| use datafusion_datasource::file_scan_config::{ | ||
| FileScanConfig, FileScanConfigBuilder, | ||
| }; | ||
| use datafusion_datasource::source::DataSourceExec; | ||
|
|
||
| use datafusion_datasource::file::FileSource; | ||
|
|
@@ -76,7 +79,7 @@ mod tests { | |
| use insta::assert_snapshot; | ||
| use object_store::local::LocalFileSystem; | ||
| use object_store::path::Path; | ||
| use object_store::{ObjectMeta, ObjectStore}; | ||
| use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt}; | ||
| use parquet::arrow::ArrowWriter; | ||
| use parquet::file::properties::WriterProperties; | ||
| use tempfile::TempDir; | ||
|
|
@@ -2459,4 +2462,152 @@ mod tests { | |
| assert_eq!(calls.len(), 2); | ||
| assert_eq!(calls, vec![Some(123), Some(456)]); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn parquet_morsel_driven_execution() -> Result<()> { | ||
| let store = | ||
| Arc::new(object_store::memory::InMemory::new()) as Arc<dyn ObjectStore>; | ||
| let store_url = ObjectStoreUrl::parse("memory://test").unwrap(); | ||
|
|
||
| let ctx = SessionContext::new(); | ||
| ctx.register_object_store(store_url.as_ref(), store.clone()); | ||
|
|
||
| // Create a Parquet file with 100 row groups, each with 10 rows | ||
| let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); | ||
|
|
||
| let mut out = Vec::new(); | ||
| let props = WriterProperties::builder() | ||
| .set_max_row_group_row_count(Some(10)) | ||
| .build(); | ||
| { | ||
| let mut writer = | ||
| ArrowWriter::try_new(&mut out, Arc::clone(&schema), Some(props))?; | ||
| // Write many batches to ensure they are not coalesced and we can verify work distribution | ||
| for i in 0..100 { | ||
| let batch = RecordBatch::try_new( | ||
| Arc::clone(&schema), | ||
| vec![Arc::new(Int32Array::from(vec![i; 10]))], | ||
| )?; | ||
| writer.write(&batch)?; | ||
| } | ||
| writer.close()?; | ||
| } | ||
|
|
||
| let path = Path::from("skewed.parquet"); | ||
| store.put(&path, out.into()).await?; | ||
| let meta = store.head(&path).await?; | ||
|
|
||
| // Set up DataSourceExec with 2 partitions, but the file is only in partition 0 (skewed) | ||
| let source = Arc::new(ParquetSource::new(schema)); | ||
| let config = FileScanConfigBuilder::new(store_url, source) | ||
| .with_file_group(FileGroup::new(vec![PartitionedFile::new_from_meta(meta)])) | ||
| .with_file_group(FileGroup::new(vec![])) // Partition 1 is empty | ||
| .with_morsel_driven(true) | ||
| .build(); | ||
|
|
||
| let exec = DataSourceExec::from_data_source(config); | ||
|
|
||
| // Execute both partitions concurrently | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW I think You need to do something like #[tokio::test(flavor = "multi_thread", worker_threads = 2)]To make it truly multi-threaded (not sure if that is important for this test) |
||
| let task_ctx = ctx.task_ctx(); | ||
| let stream0 = exec.execute(0, Arc::clone(&task_ctx))?; | ||
| let stream1 = exec.execute(1, Arc::clone(&task_ctx))?; | ||
|
|
||
| let handle0 = SpawnedTask::spawn(async move { | ||
| let mut count = 0; | ||
| let mut s = stream0; | ||
| while let Some(batch) = s.next().await { | ||
| count += batch.unwrap().num_rows(); | ||
| tokio::task::yield_now().await; | ||
| } | ||
| count | ||
| }); | ||
|
|
||
| let handle1 = SpawnedTask::spawn(async move { | ||
| let mut count = 0; | ||
| let mut s = stream1; | ||
| while let Some(batch) = s.next().await { | ||
| count += batch.unwrap().num_rows(); | ||
| tokio::task::yield_now().await; | ||
| } | ||
| count | ||
| }); | ||
|
|
||
| let count0 = handle0.await.unwrap(); | ||
| let count1 = handle1.await.unwrap(); | ||
|
|
||
| // Total rows should be 1000 | ||
| assert_eq!(count0 + count1, 1000); | ||
|
|
||
| // Since it's morsel-driven, both partitions should have done some work | ||
| // because the work from partition 0 (the single file) was split into | ||
| // individual row groups and shared via the shared queue. | ||
| assert!(count0 > 0, "Partition 0 should have produced rows"); | ||
| assert!(count1 > 0, "Partition 1 should have produced rows"); | ||
|
|
||
| // Test re-executability: executing the same plan again should work | ||
| let stream0 = exec.execute(0, Arc::clone(&task_ctx))?; | ||
| let stream1 = exec.execute(1, Arc::clone(&task_ctx))?; | ||
|
|
||
| let mut count = 0; | ||
| let mut s0 = stream0; | ||
| while let Some(batch) = s0.next().await { | ||
| count += batch.unwrap().num_rows(); | ||
| } | ||
| let mut s1 = stream1; | ||
| while let Some(batch) = s1.next().await { | ||
| count += batch.unwrap().num_rows(); | ||
| } | ||
| assert_eq!( | ||
| count, 1000, | ||
| "Second execution should also produce 1000 rows" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn parquet_morsel_driven_enabled_by_default() -> Result<()> { | ||
| let tmp_dir = TempDir::new()?; | ||
| let path = tmp_dir.path().join("test.parquet"); | ||
| let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); | ||
| let batch = RecordBatch::try_new( | ||
| Arc::clone(&schema), | ||
| vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], | ||
| )?; | ||
|
|
||
| let file = File::create(&path)?; | ||
| let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None)?; | ||
| writer.write(&batch)?; | ||
| writer.close()?; | ||
|
|
||
| let ctx = SessionContext::new(); | ||
| ctx.register_parquet("t", path.to_str().unwrap(), ParquetReadOptions::default()) | ||
| .await?; | ||
|
|
||
| let df = ctx.sql("SELECT * FROM t").await?; | ||
| let plan = df.create_physical_plan().await?; | ||
|
|
||
| // Plan should be a ProjectionExec over a DataSourceExec | ||
| let ds_exec = if let Some(ds) = plan.as_any().downcast_ref::<DataSourceExec>() { | ||
| ds | ||
| } else { | ||
| plan.children()[0] | ||
| .as_any() | ||
| .downcast_ref::<DataSourceExec>() | ||
| .expect("Expected DataSourceExec") | ||
| }; | ||
|
|
||
| let config = ds_exec | ||
| .data_source() | ||
| .as_any() | ||
| .downcast_ref::<FileScanConfig>() | ||
| .expect("Expected FileScanConfig"); | ||
|
|
||
| assert!( | ||
| config.morsel_driven, | ||
| "morsel_driven should be enabled by default for Parquet" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -227,8 +227,59 @@ impl RunQueryResult { | |
| format!("{}", pretty_format_batches(&self.result).unwrap()) | ||
| } | ||
|
|
||
| /// Extract ORDER BY column names from the query. | ||
| /// The query format is always: | ||
| /// `SELECT * FROM test_table ORDER BY <col> <dir> <nulls>, ... LIMIT <n>` | ||
| fn sort_columns(&self) -> Vec<String> { | ||
| let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER BY".len(); | ||
| let limit_start = self.query.rfind(" LIMIT").unwrap(); | ||
| self.query[order_by_start..limit_start] | ||
| .trim() | ||
| .split(',') | ||
| .map(|part| part.split_whitespace().next().unwrap().to_string()) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Project `batches` to only include the named columns. | ||
| fn project_columns(batches: &[RecordBatch], cols: &[String]) -> Vec<RecordBatch> { | ||
| batches | ||
| .iter() | ||
| .map(|b| { | ||
| let indices: Vec<usize> = cols | ||
| .iter() | ||
| .filter_map(|c| b.schema().index_of(c).ok()) | ||
| .collect(); | ||
| b.project(&indices).unwrap() | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| fn is_ok(&self) -> bool { | ||
| self.expected_formatted() == self.result_formatted() | ||
| if self.expected_formatted() == self.result_formatted() { | ||
| return true; | ||
| } | ||
| // If the full results differ, compare only the ORDER BY column values. | ||
| // | ||
| // For queries with ORDER BY <col> LIMIT k, multiple rows may tie on the | ||
| // sort key (e.g. two rows with id=27 for ORDER BY id DESC LIMIT 1). | ||
| // SQL permits returning any of the tied rows, so with vs without dynamic | ||
| // filter pushdown may legitimately return different tied rows. | ||
| // | ||
| // The dynamic filter must not change the *sort-key values* of the top-k | ||
| // result. We verify correctness by: | ||
| // 1. Checking the row counts match (wrong count is always a bug). | ||
| // 2. Projecting both results down to only the ORDER BY columns and | ||
| // comparing those (tied rows may differ, but the sort-key values must not). | ||
| let expected_rows: usize = self.expected.iter().map(|b| b.num_rows()).sum(); | ||
| let result_rows: usize = self.result.iter().map(|b| b.num_rows()).sum(); | ||
| if expected_rows != result_rows { | ||
| return false; | ||
| } | ||
| let sort_cols = self.sort_columns(); | ||
| let expected_keys = Self::project_columns(&self.expected, &sort_cols); | ||
| let result_keys = Self::project_columns(&self.result, &sort_cols); | ||
| format!("{}", pretty_format_batches(&expected_keys).unwrap()) | ||
| == format!("{}", pretty_format_batches(&result_keys).unwrap()) | ||
| } | ||
|
Comment on lines
257
to
283
|
||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would help me grok/understand this test better if the mechanics were refactored out:
fn input_file()->Vec<u8>or something