Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
34963ed
Implement morsel-driven execution for ParquetExec
google-labs-jules[bot] Feb 22, 2026
a67f9ac
Proto
Dandandan Feb 22, 2026
d0da5da
Proto
Dandandan Feb 22, 2026
32eec3c
Fmt
Dandandan Feb 22, 2026
5dc895c
Merge remote-tracking branch 'upstream/main' into parquet-morsel-driv…
Dandandan Feb 22, 2026
cc73788
Proto
Dandandan Feb 22, 2026
d517b5d
Fix
Dandandan Feb 22, 2026
de1606d
Fix
Dandandan Feb 22, 2026
950f6db
Clippy
Dandandan Feb 22, 2026
7f57317
Refactor
Dandandan Feb 23, 2026
fd6d7fd
WIP
Dandandan Feb 23, 2026
37126bf
WIP
Dandandan Feb 23, 2026
2d3c33e
WIP
Dandandan Feb 23, 2026
98f0ea9
WIP
Dandandan Feb 23, 2026
a389b02
WIP
Dandandan Feb 23, 2026
4065448
Update
Dandandan Feb 23, 2026
415315d
Update
Dandandan Feb 23, 2026
13b4977
Config
Dandandan Feb 23, 2026
a30c3f8
Test
Dandandan Feb 23, 2026
8b32ca8
Refactor
Dandandan Feb 23, 2026
876c296
Update test
Dandandan Feb 23, 2026
d2df36b
Update test
Dandandan Feb 23, 2026
869b7d3
Autofix
Dandandan Feb 23, 2026
67ea9ab
Prune files
Dandandan Feb 23, 2026
e845675
Update test
Dandandan Feb 23, 2026
6885981
Update test
Dandandan Feb 23, 2026
3384b8f
Update morsel_driven
Dandandan Feb 23, 2026
211d4fc
Update morsel_driven
Dandandan Feb 23, 2026
2db61f1
fmt
Dandandan Feb 23, 2026
c859d6a
move pruning
Dandandan Feb 23, 2026
24b95fb
Revert "move pruning"
Dandandan Feb 24, 2026
80fa1ec
Reapply "move pruning"
Dandandan Feb 24, 2026
1dcd401
Autofix
Dandandan Feb 24, 2026
04b08a6
Autofix
Dandandan Feb 24, 2026
9799b96
Autofix
Dandandan Feb 24, 2026
de29e40
Autofix
Dandandan Feb 24, 2026
aa27a43
CLippy
Dandandan Feb 24, 2026
9a4aa84
Undo submodule
Dandandan Feb 24, 2026
692bff6
Also change open to be consistent
Dandandan Feb 24, 2026
9a9cf0b
Move page index back to morselize
Dandandan Feb 24, 2026
f79fe63
Move page index back to morselize
Dandandan Feb 25, 2026
976d8dc
Add back lost optimizations
Dandandan Feb 25, 2026
f937f98
Tweak
Dandandan Feb 25, 2026
1bc9375
Merge
Dandandan Feb 25, 2026
e0e8520
Autofix
Dandandan Feb 25, 2026
25b044b
Fmt
Dandandan Feb 25, 2026
b872660
Merge
Dandandan Feb 27, 2026
eb7dfa3
Use builder API
Dandandan Feb 27, 2026
4ed7af3
WIP
Dandandan Mar 1, 2026
b440313
WIP
Dandandan Mar 1, 2026
a11409c
Simplify morsel-driven execution code
Dandandan Mar 1, 2026
e95c2d7
Move morsel queue ownership to DataSourceExecStream
Dandandan Mar 1, 2026
b71a2a7
Use Arc::strong_count instead of Weak for morsel queue lifecycle
Dandandan Mar 1, 2026
c383e6f
Fix morsel queue reset using partition counter
Dandandan Mar 1, 2026
ccd21c8
Add FileStream::with_shared_queue builder method
Dandandan Mar 1, 2026
763fff2
Use TaskContext::query_id to detect morsel queue execution cycles
Dandandan Mar 1, 2026
2e2b68b
Remove query_id
Dandandan Mar 1, 2026
6ee74d6
Change tests
Dandandan Mar 1, 2026
040eacc
Remove allow_morsel_driven=false workarounds from tests and examples
Dandandan Mar 1, 2026
06e254b
Plan whole files instead of byte-range splits for morsel-driven execu…
Dandandan Mar 1, 2026
c769c2d
Improve I/O locality by pushing morsels to front of work queue
Dandandan Mar 1, 2026
74b7fce
Merge remote-tracking branch 'upstream/main' into parquet-morsel-driv…
Dandandan Mar 1, 2026
7238c1c
Simplify morsel handling: push all morsels to front of queue
Dandandan Mar 1, 2026
f7614b7
Use separate queues for files and morsels to improve I/O locality
Dandandan Mar 1, 2026
28e64c9
Fix
Dandandan Mar 1, 2026
8eda140
Fix time_opening metric not stopped after morselizing
Dandandan Mar 1, 2026
afd2936
Sort files and morsels by estimated row count for dynamic filters
Dandandan Mar 2, 2026
e31bf97
Revert "Sort files and morsels by estimated row count for dynamic fil…
Dandandan Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,10 @@ config_namespace! {
/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// (reading) If true, the parquet reader will share work between partitions
/// using morsel-driven execution. This can help mitigate data skew.
pub allow_morsel_driven: bool, default = true

/// (reading) The maximum predicate cache size, in bytes. When
/// `pushdown_filters` is enabled, sets the maximum memory used to cache
/// the results of predicate evaluation between filter evaluation and
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl ParquetOptions {
binary_as_string: _, // not used for writer props
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
allow_morsel_driven: _,
max_predicate_cache_size: _,
} = self;

Expand Down Expand Up @@ -460,6 +461,7 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
allow_morsel_driven: defaults.allow_morsel_driven,
}
}

Expand Down Expand Up @@ -575,6 +577,7 @@ mod tests {
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
allow_morsel_driven: global_options_defaults.allow_morsel_driven,
coerce_int96: None,
},
column_specific_options,
Expand Down
155 changes: 153 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ mod tests {
use datafusion_common::config::TableParquetOptions;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{Result, ScalarValue, assert_contains};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::file_scan_config::{
FileScanConfig, FileScanConfigBuilder,
};
use datafusion_datasource::source::DataSourceExec;

use datafusion_datasource::file::FileSource;
Expand All @@ -76,7 +79,7 @@ mod tests {
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
Expand Down Expand Up @@ -2459,4 +2462,152 @@ mod tests {
assert_eq!(calls.len(), 2);
assert_eq!(calls, vec![Some(123), Some(456)]);
}

#[tokio::test]
async fn parquet_morsel_driven_execution() -> Result<()> {
let store =
Arc::new(object_store::memory::InMemory::new()) as Arc<dyn ObjectStore>;
let store_url = ObjectStoreUrl::parse("memory://test").unwrap();

let ctx = SessionContext::new();
ctx.register_object_store(store_url.as_ref(), store.clone());

// Create a Parquet file with 100 row groups, each with 10 rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help me grok/understand this test better if the mechanics were refactored out:

  1. The setup were in a separate function (e.g. fn input_file()->Vec<u8> or something
  2. The code that drove a partition to completion was in its own function

let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let mut out = Vec::new();
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(10))
.build();
{
let mut writer =
ArrowWriter::try_new(&mut out, Arc::clone(&schema), Some(props))?;
// Write many batches to ensure they are not coalesced and we can verify work distribution
for i in 0..100 {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![i; 10]))],
)?;
writer.write(&batch)?;
}
writer.close()?;
}

let path = Path::from("skewed.parquet");
store.put(&path, out.into()).await?;
let meta = store.head(&path).await?;

// Set up DataSourceExec with 2 partitions, but the file is only in partition 0 (skewed)
let source = Arc::new(ParquetSource::new(schema));
let config = FileScanConfigBuilder::new(store_url, source)
.with_file_group(FileGroup::new(vec![PartitionedFile::new_from_meta(meta)]))
.with_file_group(FileGroup::new(vec![])) // Partition 1 is empty
.with_morsel_driven(true)
.build();

let exec = DataSourceExec::from_data_source(config);

// Execute both partitions concurrently
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think tokio::test only uses a single thread --

You need to do something like

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

To make it truly multi-threaded (not sure if that is important for this test)

let task_ctx = ctx.task_ctx();
let stream0 = exec.execute(0, Arc::clone(&task_ctx))?;
let stream1 = exec.execute(1, Arc::clone(&task_ctx))?;

let handle0 = SpawnedTask::spawn(async move {
let mut count = 0;
let mut s = stream0;
while let Some(batch) = s.next().await {
count += batch.unwrap().num_rows();
tokio::task::yield_now().await;
}
count
});

let handle1 = SpawnedTask::spawn(async move {
let mut count = 0;
let mut s = stream1;
while let Some(batch) = s.next().await {
count += batch.unwrap().num_rows();
tokio::task::yield_now().await;
}
count
});

let count0 = handle0.await.unwrap();
let count1 = handle1.await.unwrap();

// Total rows should be 1000
assert_eq!(count0 + count1, 1000);

// Since it's morsel-driven, both partitions should have done some work
// because the work from partition 0 (the single file) was split into
// individual row groups and shared via the shared queue.
assert!(count0 > 0, "Partition 0 should have produced rows");
assert!(count1 > 0, "Partition 1 should have produced rows");

// Test re-executability: executing the same plan again should work
let stream0 = exec.execute(0, Arc::clone(&task_ctx))?;
let stream1 = exec.execute(1, Arc::clone(&task_ctx))?;

let mut count = 0;
let mut s0 = stream0;
while let Some(batch) = s0.next().await {
count += batch.unwrap().num_rows();
}
let mut s1 = stream1;
while let Some(batch) = s1.next().await {
count += batch.unwrap().num_rows();
}
assert_eq!(
count, 1000,
"Second execution should also produce 1000 rows"
);

Ok(())
}

#[tokio::test]
async fn parquet_morsel_driven_enabled_by_default() -> Result<()> {
let tmp_dir = TempDir::new()?;
let path = tmp_dir.path().join("test.parquet");
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;

let file = File::create(&path)?;
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None)?;
writer.write(&batch)?;
writer.close()?;

let ctx = SessionContext::new();
ctx.register_parquet("t", path.to_str().unwrap(), ParquetReadOptions::default())
.await?;

let df = ctx.sql("SELECT * FROM t").await?;
let plan = df.create_physical_plan().await?;

// Plan should be a ProjectionExec over a DataSourceExec
let ds_exec = if let Some(ds) = plan.as_any().downcast_ref::<DataSourceExec>() {
ds
} else {
plan.children()[0]
.as_any()
.downcast_ref::<DataSourceExec>()
.expect("Expected DataSourceExec")
};

let config = ds_exec
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.expect("Expected FileScanConfig");

assert!(
config.morsel_driven,
"morsel_driven should be enabled by default for Parquet"
);

Ok(())
}
}
53 changes: 52 additions & 1 deletion datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,59 @@ impl RunQueryResult {
format!("{}", pretty_format_batches(&self.result).unwrap())
}

/// Extract ORDER BY column names from the query.
/// The query format is always:
/// `SELECT * FROM test_table ORDER BY <col> <dir> <nulls>, ... LIMIT <n>`
fn sort_columns(&self) -> Vec<String> {
let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER BY".len();
let limit_start = self.query.rfind(" LIMIT").unwrap();
self.query[order_by_start..limit_start]
.trim()
.split(',')
.map(|part| part.split_whitespace().next().unwrap().to_string())
.collect()
}

/// Project `batches` to only include the named columns.
fn project_columns(batches: &[RecordBatch], cols: &[String]) -> Vec<RecordBatch> {
batches
.iter()
.map(|b| {
let indices: Vec<usize> = cols
.iter()
.filter_map(|c| b.schema().index_of(c).ok())
.collect();
b.project(&indices).unwrap()
})
.collect()
}

fn is_ok(&self) -> bool {
self.expected_formatted() == self.result_formatted()
if self.expected_formatted() == self.result_formatted() {
return true;
}
// If the full results differ, compare only the ORDER BY column values.
//
// For queries with ORDER BY <col> LIMIT k, multiple rows may tie on the
// sort key (e.g. two rows with id=27 for ORDER BY id DESC LIMIT 1).
// SQL permits returning any of the tied rows, so with vs without dynamic
// filter pushdown may legitimately return different tied rows.
//
// The dynamic filter must not change the *sort-key values* of the top-k
// result. We verify correctness by:
// 1. Checking the row counts match (wrong count is always a bug).
// 2. Projecting both results down to only the ORDER BY columns and
// comparing those (tied rows may differ, but the sort-key values must not).
let expected_rows: usize = self.expected.iter().map(|b| b.num_rows()).sum();
let result_rows: usize = self.result.iter().map(|b| b.num_rows()).sum();
if expected_rows != result_rows {
return false;
}
let sort_cols = self.sort_columns();
let expected_keys = Self::project_columns(&self.expected, &sort_cols);
let result_keys = Self::project_columns(&self.result, &sort_cols);
format!("{}", pretty_format_batches(&expected_keys).unwrap())
== format!("{}", pretty_format_batches(&result_keys).unwrap())
}
Comment on lines 257 to 283
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new comparison logic for TOP-K queries with ties is well-designed and handles the case where morsel-driven execution can return different tied rows. The implementation correctly projects down to only the ORDER BY columns and compares those values. However, consider adding a comment in the code explaining that this is specifically needed to handle non-deterministic row selection in the presence of ties, which can vary with morsel-driven execution. This will help future maintainers understand the special handling.

Copilot uses AI. Check for mistakes.
}

Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,11 @@ async fn prune_disabled() {
.await;
println!("{}", output.description());

// This should not prune any
// Row group stats pruning is disabled, so 0 row groups are pruned by statistics.
// Bloom filter runs next and matches all 4 row groups (bloom filters don't help
// for range/inequality predicates like `nanos < threshold`). Page index pruning
// runs afterwards and can produce row-level selections, but those don't affect
// the bloom filter matched count. The query result is still correct.
assert_eq!(output.predicate_evaluation_errors(), Some(0));
assert_eq!(output.row_groups_matched(), Some(4));
assert_eq!(output.row_groups_pruned(), Some(0));
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ impl FileFormat for ParquetFormat {

let conf = FileScanConfigBuilder::from(conf)
.with_source(Arc::new(source))
.with_morsel_driven(self.options.global.allow_morsel_driven)
.build();
Ok(DataSourceExec::from_data_source(conf))
}
Expand Down
Loading
Loading