Introduce morsel-driven Parquet scan#20481
Conversation
This PR implements morsel-driven execution for Parquet files in DataFusion, enabling row-group level work sharing across partitions to mitigate data skew. Key changes: - Introduced `WorkQueue` in `datafusion/datasource/src/file_stream.rs` for shared pool of work. - Added `morselize` method to `FileOpener` trait to allow dynamic splitting of files into morsels. - Implemented `morselize` for `ParquetOpener` to split files into individual row groups. - Cached `ParquetMetaData` in `ParquetMorsel` extensions to avoid redundant I/O. - Modified `FileStream` to support work stealing from the shared queue. - Implemented `Weak` pointer pattern for `WorkQueue` in `FileScanConfig` to support plan re-executability. - Added `MorselizingGuard` to ensure shared state consistency on cancellation. - Added `allow_morsel_driven` configuration option (enabled by default for Parquet). - Implemented row-group pruning during the morselization phase for better efficiency. Tests: - Added `parquet_morsel_driven_execution` test to verify work distribution and re-executability. - Added `parquet_morsel_driven_enabled_by_default` to verify the default configuration. Co-authored-by: Dandandan <163737+Dandandan@users.noreply.github.com>
|
run benchmarks |
Split the single WorkQueue into two internal queues: one for whole files awaiting morselization and one for already-morselized leaf morsels (row groups). Workers drain the morsel queue first, so freshly produced row groups are consumed before the next file is opened. This keeps I/O sequential within each file without needing push-to-front tricks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
run benchmarks |
| physical_plan | ||
| 01)FilterExec: column1@0 != 42 | ||
| 02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..135], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:135..270], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:270..405], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:405..537]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] | ||
| 02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet], [], [], []]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] |
There was a problem hiding this comment.
We probably want to improve the display of DataSourceExec to only display the file names (instead of file groups).
|
🤖: Benchmark completed Details
|
Stop the time_opening timer before transitioning back to Idle after pushing morsels to the queue. Without this, re-entering Idle would call start() on an already-running timer, triggering an assertion failure. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
🤖 |
|
🤖: Benchmark completed Details
|
Combines morsel-driven Parquet scan (per-row-group work units) with adaptive filter pushdown (selectivity-based filter placement). Key merge decisions: - morselize() uses combined predicate for coarse pruning (unchanged) - open() uses predicate_conjuncts with selectivity tracker per morsel - Proto field renumbered: filter_pushdown_min_bytes_per_sec 35 -> 42 to avoid collision with allow_morsel_driven (field 35) - SQL test outputs take morsel-driven plan format Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
I tried merging this PR and #20363 together and am seeing some pretty amazing results: adriangb#2 (comment) This makes sense logically: if we can optimize filter placements / layouts for every morsel instead of every file that makes the adaptive system much more responsive. |
I poked around a bit and came up with this: https://gist.github.com/adriangb/ae5c0ebc56d0e72a955cdbfe5dd7e23f TLDR try to make a single mpmc pipeline of open -> moreselize -> read morsel <- partitions pull |
Cool. I think there might be some "problems" with this approach.
I think this is probably retrieved from the code, but not really true in practice (metadata involves decoding / summarizing / ... as well). I think there might be some benefits from the current approach in this PR:
I am coming to the conclusion that the difference between:
is mostly due to some randomness / luckiness of more selectiveness coming from particular files. I think we could probably look in the future more how to optimize file order / rowgroup order by picking the most "selective" files early as morsels, so dynamic filters become selective early with the least IO for next scans. |
Sort at three levels so smaller/more-selective work is processed first, letting dynamic filters tighten sooner and prune more data: - Planning: sort files by statistics.num_rows before distribution - WorkQueue::new: re-sort after round-robin interleaving - push_morsels: keep the morsel queue globally sorted by estimated row count (set on each morsel in morselize()) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 |
|
I'll try some small experiments but mostly focus on
|
|
🤖: Benchmark completed Details
|
…ters" This reverts commit afd2936.
|
run benchmarks |
|
🤖 |
|
run benchmarks |
|
run benchmark tpch tpcds |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Nice 🚀🚀🚀🚀 |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
Which issue does this PR close?
TODO:
executestream is dropped / not shared between two concurrent instances (major design issue)Rationale for this change
Current parelllization of Parquet scan is bounded by the thread that has the most data / is the slowest to execute, which means in the case of data skew (driven by either larger partitions or less selective filters during pruning / filter pushdown..., variable object store latency), the parallelism will be significantly limited.
We can change the strategy by morsel-driven parallelism like described in https://db.in.tum.de/~leis/papers/morsels.pdf.
Doing so is faster for a lot of queries, when there is an amount of skew (such as clickbench) and we have enough row filters to spread out the work.
For clickbench_partitioned / clickbench_pushdown it seems up to ~2x as fast for some queries, on a 10 core machine.
It seems to have almost no regressions, perhaps some due to different file scanning order(?) - so different statistics that can be used to prune and thus some minor variation.
Morsel-Driven Execution Architecture (partly claude-generated)
This branch implements a morsel-driven execution model for Parquet scans, based on the concept
from the Morsel-Driven Parallelism paper (Leis et al.). The core idea: instead of statically
assigning files to partitions, all work is pooled in a shared queue that all partition streams pull
from dynamically.
The Problem It Solves
In the traditional model, partition 0 might get a 1 GB file while partition 1 gets nothing --
partition 1 idles while 0 is busy. Currently we already try to statically spread out work to n partitions / threads based on stats (which works very well on perfectly distributed scans on SSDs (e.g. TPCH running locally), this doesn't work well when there is any data skew caused by any of those:
Morsel-driven execution prevents this by sharing work dynamically.
Key Types
ParquetMorsel--datafusion/datasource-parquet/src/opener.rs:129A morsel = one row group of a Parquet file. Stored as an extension on
PartitionedFile.WorkQueue--datafusion/datasource/src/file_stream.rs:410The shared, thread-safe queue. Each partition stream calls
pull()which returns:Work(file)-- here's a file/morsel to processWait-- queue is empty but workers are still morselizing (wait for notification)Done-- all work consumedMorselState--datafusion/datasource/src/source.rs:240Tracks the shared queue lifecycle. A new queue is created once per execution cycle when all
partition streams have opened.
MorselizingGuard--datafusion/datasource/src/file_stream.rs:49RAII wrapper that atomically decrements
morselizing_countwhen a worker finishes -- enablingWorkStatus::WaitvsDonedecisions.FileOpenerTrait Extension --datafusion/datasource/src/file_stream.rs:498A new
morselize()method is added toFileOpener. The default implementation is a no-op(returns the file as-is).
ParquetOpeneroverrides it to split files by row group.ParquetOpener::morselize()atopener.rs:232:Arcacross all resulting morsels)PartitionedFileper surviving row group, each carrying aParquetMorselextensionFileStreamState Machine --datafusion/datasource/src/file_stream.rs:141The morsel-driven path adds two new states (
MorselizingandWaiting):Configuration
datafusion.execution.parquet.allow_morsel_driven -- datafusion/common/src/config.rs:748
Default: true. Can be disabled per-session.
FileScanConfig::morsel_driven -- datafusion/datasource/src/file_scan_config.rs:211
Automatically disabled when:
partitioned_by_file_group = true(breaks hash-partitioning guarantees)preserve_order = true(breaks SortPreservingMerge guarantees)Benchmark results
Summary: both clickbench, clickbench_partitioned
Details
Acknowledgements
I heavily used AI (Jules / Claude) for this PR, but reviewed the code myself