diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs
index 7abb39e1a7130..f4a53d49f7cdd 100644
--- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs
@@ -63,7 +63,6 @@ async fn search_accounts(
) -> Result<()> {
// create local execution context
let ctx = SessionContext::new();
-
// create logical plan composed of a single TableScan
let logical_plan = LogicalPlanBuilder::scan_with_filters(
"accounts",
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index d71af206c78d5..c057ca15e1f6e 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -743,6 +743,10 @@ config_namespace! {
/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true
+ /// (reading) If true, the parquet reader will share work between partitions
+ /// using morsel-driven execution. This can help mitigate data skew.
+ pub allow_morsel_driven: bool, default = true
+
/// (reading) The maximum predicate cache size, in bytes. When
/// `pushdown_filters` is enabled, sets the maximum memory used to cache
/// the results of predicate evaluation between filter evaluation and
diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs
index f6608d16c1022..d00c22adc6559 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -208,6 +208,7 @@ impl ParquetOptions {
binary_as_string: _, // not used for writer props
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
+ allow_morsel_driven: _,
max_predicate_cache_size: _,
} = self;
@@ -460,6 +461,7 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
+ allow_morsel_driven: defaults.allow_morsel_driven,
}
}
@@ -573,6 +575,7 @@ mod tests {
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
+ allow_morsel_driven: global_options_defaults.allow_morsel_driven,
coerce_int96: None,
},
column_specific_options,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 4c6d915d5bcaa..9835574ab5827 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -49,8 +49,11 @@ mod tests {
use datafusion_common::config::TableParquetOptions;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{Result, ScalarValue, assert_contains};
+ use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::file_format::FileFormat;
- use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+ use datafusion_datasource::file_scan_config::{
+ FileScanConfig, FileScanConfigBuilder,
+ };
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::file::FileSource;
diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
index 7f994daeaa58c..beb414fc22375 100644
--- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
+++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
@@ -227,8 +227,58 @@ impl RunQueryResult {
format!("{}", pretty_format_batches(&self.result).unwrap())
}
+ /// Extract ORDER BY column names from the query.
+ /// The query format is always:
+ /// `SELECT * FROM test_table ORDER BY
, ... LIMIT `
+ fn sort_columns(&self) -> Vec {
+ let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER BY".len();
+ let limit_start = self.query.rfind(" LIMIT").unwrap();
+ self.query[order_by_start..limit_start]
+ .trim()
+ .split(',')
+ .map(|part| part.split_whitespace().next().unwrap().to_string())
+ .collect()
+ }
+
+ /// Project `batches` to only include the named columns.
+ fn project_columns(batches: &[RecordBatch], cols: &[String]) -> Vec {
+ batches
+ .iter()
+ .map(|b| {
+ let schema = b.schema();
+ let indices: Vec = cols
+ .iter()
+ .filter_map(|c| schema.index_of(c).ok())
+ .collect();
+ let columns: Vec<_> =
+ indices.iter().map(|&i| Arc::clone(b.column(i))).collect();
+ let fields: Vec<_> =
+ indices.iter().map(|&i| schema.field(i).clone()).collect();
+ let new_schema = Arc::new(Schema::new(fields));
+ RecordBatch::try_new(new_schema, columns).unwrap()
+ })
+ .collect()
+ }
+
fn is_ok(&self) -> bool {
- self.expected_formatted() == self.result_formatted()
+ if self.expected_formatted() == self.result_formatted() {
+ return true;
+ }
+ // If the full results differ, compare only the ORDER BY column values.
+ //
+ // For queries with ORDER BY
LIMIT k, multiple rows may tie on the
+ // sort key (e.g. two rows with id=27 for ORDER BY id DESC LIMIT 1).
+ // SQL permits returning any of the tied rows, so with vs without dynamic
+ // filter pushdown may legitimately return different tied rows.
+ //
+ // The dynamic filter must not change the *sort-key values* of the top-k
+ // result. We verify correctness by projecting both results down to only
+ // the ORDER BY columns and comparing those.
+ let sort_cols = self.sort_columns();
+ let expected_keys = Self::project_columns(&self.expected, &sort_cols);
+ let result_keys = Self::project_columns(&self.result, &sort_cols);
+ format!("{}", pretty_format_batches(&expected_keys).unwrap())
+ == format!("{}", pretty_format_batches(&result_keys).unwrap())
}
}
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs
index 445ae7e97f228..35e2ec6cde7bc 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -382,7 +382,11 @@ async fn prune_disabled() {
.await;
println!("{}", output.description());
- // This should not prune any
+ // Row group stats pruning is disabled, so 0 row groups are pruned by statistics.
+ // Bloom filter runs next and matches all 4 row groups (bloom filters don't help
+ // for range/inequality predicates like `nanos < threshold`). Page index pruning
+ // runs afterwards and can produce row-level selections, but those don't affect
+ // the bloom filter matched count. The query result is still correct.
assert_eq!(output.predicate_evaluation_errors(), Some(0));
assert_eq!(output.row_groups_matched(), Some(4));
assert_eq!(output.row_groups_pruned(), Some(0));
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index fa021ed3dcce3..9601095bb0d9a 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -78,6 +78,11 @@ mod test {
target_partition: Option,
) -> Arc {
let mut session_config = SessionConfig::new().with_collect_statistics(true);
+ session_config
+ .options_mut()
+ .execution
+ .parquet
+ .allow_morsel_driven = false;
if let Some(partition) = target_partition {
session_config = session_config.with_target_partitions(partition);
}
diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs
index 6d1758abeb47b..e1b2ca3bcf8f0 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -27,12 +27,12 @@ use std::{fmt, vec};
use arrow::array::RecordBatch;
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
-use datafusion_datasource::TableSchema;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
use datafusion_datasource::write::{
ObjectWriterBuilder, SharedBuffer, get_writer_schema,
};
+use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
@@ -49,12 +49,14 @@ use datafusion_common::{HashMap, Statistics};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
+use datafusion_physical_plan::merge_partitions::MergePartitionsExec;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;
@@ -522,8 +524,10 @@ impl FileFormat for ParquetFormat {
let store = state
.runtime_env()
.object_store(conf.object_store_url.clone())?;
- let cached_parquet_read_factory =
- Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
+ let cached_parquet_read_factory = Arc::new(CachedParquetFileReaderFactory::new(
+ Arc::clone(&store),
+ metadata_cache,
+ ));
source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
if let Some(metadata_size_hint) = metadata_size_hint {
@@ -532,10 +536,85 @@ impl FileFormat for ParquetFormat {
source = self.set_source_encryption_factory(source, state)?;
- let conf = FileScanConfigBuilder::from(conf)
- .with_source(Arc::new(source))
- .build();
- Ok(DataSourceExec::from_data_source(conf))
+ let use_merge_partitions = self.options.global.allow_morsel_driven;
+
+ if use_merge_partitions {
+ // Morsel-driven execution: create one partition per file for the
+ // plan tree, then use a ParquetMorselizer that lazily expands
+ // files into row-group-level work items for fine-grained
+ // work-stealing via MergePartitionsExec.
+ let target_partitions = state.config_options().execution.target_partitions;
+
+ // Collect all files before conf is consumed.
+ let all_files: Vec = conf
+ .file_groups
+ .iter()
+ .flat_map(|group| group.iter().cloned())
+ .collect();
+
+ // Get the reader factory (for metadata reads in the morselizer).
+ // It was set above as CachedParquetFileReaderFactory.
+ let reader_factory: Arc = source
+ .parquet_file_reader_factory()
+ .cloned()
+ .expect("reader factory was set above");
+
+ // Set batch size on source before creating the opener (normally
+ // done by FileScanConfig::open_file_reader, but we create the
+ // opener directly here for the morselizer).
+ source.batch_size = Some(
+ conf.batch_size
+ .unwrap_or(state.config_options().execution.batch_size),
+ );
+
+ // Create the opener for row-group morsel execution.
+ let opener = source.create_file_opener(Arc::clone(&store), &conf, 0)?;
+
+ let output_schema = conf.projected_schema()?;
+ let metrics = source.metrics().clone();
+
+ // Build the DataSourceExec (one partition per file, for plan tree).
+ let one_per_file: Vec = conf
+ .file_groups
+ .iter()
+ .flat_map(|group| {
+ group.iter().map(|file| FileGroup::new(vec![file.clone()]))
+ })
+ .collect();
+
+ let conf = FileScanConfigBuilder::from(conf)
+ .with_source(Arc::new(source))
+ .with_file_groups(one_per_file)
+ .build();
+ let data_source = DataSourceExec::from_data_source(conf);
+ let input_partitions = data_source
+ .properties()
+ .output_partitioning()
+ .partition_count();
+
+ if input_partitions > target_partitions {
+ let morselizer = Arc::new(crate::morselizer::ParquetMorselizer::new(
+ all_files,
+ opener,
+ reader_factory,
+ metrics,
+ metadata_size_hint,
+ output_schema,
+ ));
+ Ok(Arc::new(MergePartitionsExec::new_with_morselizer(
+ data_source,
+ target_partitions,
+ morselizer,
+ )))
+ } else {
+ Ok(data_source)
+ }
+ } else {
+ let conf = FileScanConfigBuilder::from(conf)
+ .with_source(Arc::new(source))
+ .build();
+ Ok(DataSourceExec::from_data_source(conf))
+ }
}
async fn create_writer_physical_plan(
diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs
index 0e137a706fad7..d1e46196103ae 100644
--- a/datafusion/datasource-parquet/src/mod.rs
+++ b/datafusion/datasource-parquet/src/mod.rs
@@ -24,6 +24,7 @@ pub mod access_plan;
pub mod file_format;
pub mod metadata;
mod metrics;
+pub(crate) mod morselizer;
mod opener;
mod page_filter;
mod reader;
diff --git a/datafusion/datasource-parquet/src/morselizer.rs b/datafusion/datasource-parquet/src/morselizer.rs
new file mode 100644
index 0000000000000..3593a24974f3e
--- /dev/null
+++ b/datafusion/datasource-parquet/src/morselizer.rs
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetMorselizer`] — row-group-level morselizer for parquet files.
+
+use std::fmt;
+use std::sync::Arc;
+
+use crate::{ParquetAccessPlan, ParquetFileReaderFactory};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_datasource::PartitionedFile;
+use datafusion_datasource::file_stream::FileOpener;
+use datafusion_execution::TaskContext;
+use datafusion_physical_plan::merge_partitions::{MorselResult, Morselizer, WorkItem};
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+
+use futures::StreamExt;
+use futures::future::BoxFuture;
+use parquet::arrow::async_reader::AsyncFileReader;
+
+/// Work item types for the parquet morselizer.
+enum ParquetWorkItem {
+ /// A file that hasn't been morselized yet. Metadata will be read lazily
+ /// when this item is executed, and the file will be split into row groups.
+ File(PartitionedFile),
+ /// A single row group within a file. The `PartitionedFile` carries a
+ /// `ParquetAccessPlan` extension that selects exactly one row group.
+ RowGroup(PartitionedFile),
+}
+
+/// A [`Morselizer`] that expands parquet files into row-group-level work items.
+///
+/// When a file work item is executed:
+/// 1. Metadata is read lazily (first access populates the metadata cache)
+/// 2. The file is split into one work item per row group
+/// 3. The first row group is executed immediately
+/// 4. Remaining row groups are pushed to the shared queue for other streams
+///
+/// When a row-group work item is executed, it is opened directly via the
+/// opener (metadata comes from the cache).
+pub(crate) struct ParquetMorselizer {
+ /// Files to scan
+ files: Vec,
+ /// Opener for creating streams for individual files/row groups
+ opener: Arc,
+ /// Factory for creating readers (used for metadata reads)
+ reader_factory: Arc,
+ /// Metrics for the reader factory
+ metrics: ExecutionPlanMetricsSet,
+ /// Metadata size hint
+ metadata_size_hint: Option,
+ /// Output schema
+ schema: SchemaRef,
+}
+
+impl fmt::Debug for ParquetMorselizer {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetMorselizer")
+ .field("num_files", &self.files.len())
+ .finish()
+ }
+}
+
+impl ParquetMorselizer {
+ pub(crate) fn new(
+ files: Vec,
+ opener: Arc,
+ reader_factory: Arc,
+ metrics: ExecutionPlanMetricsSet,
+ metadata_size_hint: Option,
+ schema: SchemaRef,
+ ) -> Self {
+ Self {
+ files,
+ opener,
+ reader_factory,
+ metrics,
+ metadata_size_hint,
+ schema,
+ }
+ }
+}
+
+impl Morselizer for ParquetMorselizer {
+ fn initial_items(&self) -> Vec {
+ self.files
+ .iter()
+ .map(|f| WorkItem::new(ParquetWorkItem::File(f.clone())))
+ .collect()
+ }
+
+ fn execute_item(
+ &self,
+ item: WorkItem,
+ _context: Arc,
+ ) -> Result>> {
+ let work_item: ParquetWorkItem = item
+ .downcast()
+ .expect("ParquetMorselizer work item should be ParquetWorkItem");
+
+ match work_item {
+ ParquetWorkItem::File(file) => self.execute_file_item(file),
+ ParquetWorkItem::RowGroup(file) => self.execute_row_group_item(file),
+ }
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
+
+impl ParquetMorselizer {
+ /// Execute a file-level work item: read metadata, split into row groups.
+ fn execute_file_item(
+ &self,
+ file: PartitionedFile,
+ ) -> Result>> {
+ // Create a reader to fetch metadata
+ let mut reader: Box =
+ self.reader_factory.create_reader(
+ 0, // partition index doesn't matter for metadata reads
+ file.clone(),
+ self.metadata_size_hint,
+ &self.metrics,
+ )?;
+ let opener = Arc::clone(&self.opener);
+
+ Ok(Box::pin(async move {
+ // Read metadata (this populates the cache for subsequent reads)
+ let metadata = reader
+ .get_metadata(None)
+ .await
+ .map_err(|e| datafusion_common::DataFusionError::External(Box::new(e)))?;
+ let num_row_groups = metadata.num_row_groups();
+
+ if num_row_groups == 0 {
+ // No row groups — return an empty stream
+ let empty: Vec> = vec![];
+ return Ok(MorselResult {
+ stream: futures::stream::iter(empty).boxed(),
+ additional_items: vec![],
+ });
+ }
+
+ // Create a work item for each row group except the first
+ let mut additional_items =
+ Vec::with_capacity(num_row_groups.saturating_sub(1));
+ for rg_idx in 1..num_row_groups {
+ let mut access_plan = ParquetAccessPlan::new_none(num_row_groups);
+ access_plan.scan(rg_idx);
+ let mut rg_file = file.clone();
+ rg_file.extensions = Some(Arc::new(access_plan));
+ additional_items.push(WorkItem::new(ParquetWorkItem::RowGroup(rg_file)));
+ }
+
+ // Execute the first row group
+ let mut first_file = file;
+ let mut access_plan = ParquetAccessPlan::new_none(num_row_groups);
+ access_plan.scan(0);
+ first_file.extensions = Some(Arc::new(access_plan));
+
+ let future = opener.open(first_file)?;
+ let stream = future.await?;
+
+ Ok(MorselResult {
+ stream,
+ additional_items,
+ })
+ }))
+ }
+
+ /// Execute a row-group-level work item: open the specific row group.
+ fn execute_row_group_item(
+ &self,
+ file: PartitionedFile,
+ ) -> Result>> {
+ let future = self.opener.open(file)?;
+
+ Ok(Box::pin(async move {
+ let stream = future.await?;
+ Ok(MorselResult {
+ stream,
+ additional_items: vec![],
+ })
+ }))
+ }
+}
diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs
index f87a30265a17b..5fd3f21832ea0 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -40,7 +40,7 @@ use datafusion_common::stats::Precision;
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err,
};
-use datafusion_datasource::{PartitionedFile, TableSchema};
+use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::{
@@ -64,10 +64,12 @@ use parquet::arrow::arrow_reader::{
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData};
+use parquet::schema::types::SchemaDescriptor;
/// Implements [`FileOpener`] for a parquet file
-pub(super) struct ParquetOpener {
+pub(crate) struct ParquetOpener {
/// Execution partition index
pub(crate) partition_index: usize,
/// Projection to apply on top of the table schema (i.e. can reference partition columns).
@@ -130,6 +132,13 @@ pub(crate) struct PreparedAccessPlan {
pub(crate) row_selection: Option,
}
+struct RowGroupStatisticsPruningContext<'a> {
+ physical_file_schema: &'a SchemaRef,
+ parquet_schema: &'a SchemaDescriptor,
+ predicate: &'a PruningPredicate,
+ file_metrics: &'a ParquetFileMetrics,
+}
+
impl PreparedAccessPlan {
/// Create a new prepared access plan from a ParquetAccessPlan
pub(crate) fn from_access_plan(
@@ -146,10 +155,7 @@ impl PreparedAccessPlan {
}
/// Reverse the access plan for reverse scanning
- pub(crate) fn reverse(
- mut self,
- file_metadata: &parquet::file::metadata::ParquetMetaData,
- ) -> Result {
+ pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result {
// Get the row group indexes before reversing
let row_groups_to_scan = self.row_group_indexes.clone();
@@ -180,6 +186,39 @@ impl PreparedAccessPlan {
}
}
+impl ParquetOpener {
+ fn build_row_group_access_filter(
+ file_name: &str,
+ extensions: Option>,
+ row_group_count: usize,
+ row_group_metadata: &[RowGroupMetaData],
+ file_range: Option<&FileRange>,
+ stats_pruning: Option>,
+ ) -> Result {
+ let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+ file_name,
+ extensions,
+ row_group_count,
+ )?);
+
+ if let Some(range) = file_range {
+ row_groups.prune_by_range(row_group_metadata, range);
+ }
+
+ if let Some(stats_pruning) = stats_pruning {
+ row_groups.prune_by_statistics(
+ stats_pruning.physical_file_schema.as_ref(),
+ stats_pruning.parquet_schema,
+ row_group_metadata,
+ stats_pruning.predicate,
+ stats_pruning.file_metrics,
+ );
+ }
+
+ Ok(row_groups)
+ }
+}
+
impl FileOpener for ParquetOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result {
// -----------------------------------
@@ -358,7 +397,7 @@ impl FileOpener for ParquetOpener {
// Begin by loading the metadata from the underlying reader (note
// the returned metadata may actually include page indexes as some
// readers may return page indexes even when not requested -- for
- // example when they are cached)
+ // example when they are cached).
let mut reader_metadata =
ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
.await?;
@@ -495,26 +534,25 @@ impl FileOpener for ParquetOpener {
let file_metadata = Arc::clone(builder.metadata());
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
- // track which row groups to actually read
- let access_plan =
- create_initial_plan(&file_name, extensions, rg_metadata.len())?;
- let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
- // if there is a range restricting what parts of the file to read
- if let Some(range) = file_range.as_ref() {
- row_groups.prune_by_range(rg_metadata, range);
- }
+ let mut row_groups = Self::build_row_group_access_filter(
+ &file_name,
+ extensions,
+ rg_metadata.len(),
+ rg_metadata,
+ file_range.as_ref(),
+ predicate
+ .filter(|_| enable_row_group_stats_pruning)
+ .map(|predicate| RowGroupStatisticsPruningContext {
+ physical_file_schema: &physical_file_schema,
+ parquet_schema: builder.parquet_schema(),
+ predicate,
+ file_metrics: &file_metrics,
+ }),
+ )?;
// If there is a predicate that can be evaluated against the metadata
if let Some(predicate) = predicate.as_ref() {
- if enable_row_group_stats_pruning {
- row_groups.prune_by_statistics(
- &physical_file_schema,
- builder.parquet_schema(),
- rg_metadata,
- predicate,
- &file_metrics,
- );
- } else {
+ if !enable_row_group_stats_pruning {
// Update metrics: statistics unavailable, so all row groups are
// matched (not pruned)
file_metrics
@@ -522,6 +560,12 @@ impl FileOpener for ParquetOpener {
.add_matched(row_groups.remaining_row_group_count());
}
+ // Prune by limit before bloom filter: no point reading bloom filter data
+ // for row groups that will be skipped by the limit anyway.
+ if let (Some(limit), false) = (limit, preserve_order) {
+ row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
+ }
+
if enable_bloom_filter && !row_groups.is_empty() {
row_groups
.prune_by_bloom_filters(
@@ -549,11 +593,6 @@ impl FileOpener for ParquetOpener {
.add_matched(n_remaining_row_groups);
}
- // Prune by limit if limit is set and limit order is not sensitive
- if let (Some(limit), false) = (limit, preserve_order) {
- row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
- }
-
// --------------------------------------------------------
// Step: prune pages from the kept row groups
//
@@ -1035,7 +1074,7 @@ mod test {
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, replace_columns_with_literals,
};
- use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+ use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricValue};
use futures::{Stream, StreamExt};
use object_store::{ObjectStore, memory::InMemory, path::Path};
use parquet::arrow::ArrowWriter;
@@ -1365,6 +1404,19 @@ mod test {
))
}
+ fn get_pruning_metric(
+ metrics: &ExecutionPlanMetricsSet,
+ metric_name: &str,
+ ) -> (usize, usize) {
+ match metrics.clone_inner().sum_by_name(metric_name) {
+ Some(MetricValue::PruningMetrics {
+ pruning_metrics, ..
+ }) => (pruning_metrics.pruned(), pruning_metrics.matched()),
+ Some(_) => panic!("Metric '{metric_name}' is not a pruning metric"),
+ None => panic!("Metric '{metric_name}' not found"),
+ }
+ }
+
#[tokio::test]
async fn test_prune_on_statistics() {
let store = Arc::new(InMemory::new()) as Arc;
diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs
index c8090382094ef..f216855471482 100644
--- a/datafusion/datasource/src/file_stream.rs
+++ b/datafusion/datasource/src/file_stream.rs
@@ -76,9 +76,10 @@ impl FileStream {
let projected_schema = config.projected_schema()?;
let file_group = config.file_groups[partition].clone();
+ let file_iter = file_group.into_inner().into_iter().collect();
Ok(Self {
- file_iter: file_group.into_inner().into_iter().collect(),
+ file_iter,
projected_schema,
remain: config.limit,
file_opener,
@@ -214,7 +215,9 @@ impl FileStream {
}
}
}
- None => return Poll::Ready(None),
+ None => {
+ return Poll::Ready(None);
+ }
},
OnError::Fail => {
self.state = FileStreamState::Error;
@@ -243,7 +246,9 @@ impl FileStream {
}
}
}
- None => return Poll::Ready(None),
+ None => {
+ return Poll::Ready(None);
+ }
}
}
}
diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs
index 05028ed0f4683..cda7ab470d4b7 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -125,6 +125,7 @@ pub trait DataSource: Send + Sync + Debug {
partition: usize,
context: Arc,
) -> Result;
+
fn as_any(&self) -> &dyn Any;
/// Format this source for display in explain plans
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
diff --git a/datafusion/physical-expr/src/simplifier/mod.rs b/datafusion/physical-expr/src/simplifier/mod.rs
index 3f3f8573449eb..38663f1b06609 100644
--- a/datafusion/physical-expr/src/simplifier/mod.rs
+++ b/datafusion/physical-expr/src/simplifier/mod.rs
@@ -61,7 +61,10 @@ impl<'a> PhysicalExprSimplifier<'a> {
count += 1;
let result = current_expr.transform(|node| {
#[cfg(debug_assertions)]
- let original_type = node.data_type(schema).unwrap();
+ // Use `ok()` to skip the assertion when data_type fails (e.g., for
+ // DynamicFilterPhysicalExpr whose inner expression may reference columns
+ // outside the provided schema when the filter has been updated concurrently).
+ let original_type = node.data_type(schema).ok();
// Apply NOT expression simplification first, then unwrap cast optimization,
// then constant expression evaluation
@@ -73,11 +76,14 @@ impl<'a> PhysicalExprSimplifier<'a> {
})?;
#[cfg(debug_assertions)]
- assert_eq!(
- rewritten.data.data_type(schema).unwrap(),
- original_type,
- "Simplified expression should have the same data type as the original"
- );
+ if let Some(original_type) = original_type
+ && let Ok(rewritten_type) = rewritten.data.data_type(schema) {
+ assert_eq!(
+ rewritten_type,
+ original_type,
+ "Simplified expression should have the same data type as the original"
+ );
+ }
Ok(rewritten)
})?;
diff --git a/datafusion/physical-expr/src/utils/guarantee.rs b/datafusion/physical-expr/src/utils/guarantee.rs
index c4ce74fd3a573..70c83cee65b74 100644
--- a/datafusion/physical-expr/src/utils/guarantee.rs
+++ b/datafusion/physical-expr/src/utils/guarantee.rs
@@ -389,6 +389,9 @@ impl<'a> ColOpLit<'a> {
/// 2. `literal col`
/// 3. operator is `=` or `!=`
///
+ /// Also handles `CastColumnExpr(col) literal` patterns where the
+ /// column is wrapped in a cast (e.g., from schema adaptation).
+ ///
/// Returns None otherwise
fn try_new(expr: &'a Arc) -> Option {
let binary_expr = expr
@@ -405,9 +408,9 @@ impl<'a> ColOpLit<'a> {
Operator::NotEq => Guarantee::NotIn,
_ => return None,
};
- // col literal
+ // col literal (also handles CastColumnExpr(col) literal)
if let (Some(col), Some(lit)) = (
- left.downcast_ref::(),
+ extract_column(binary_expr.left()),
right.downcast_ref::(),
) {
Some(Self {
@@ -416,10 +419,10 @@ impl<'a> ColOpLit<'a> {
lit,
})
}
- // literal col
+ // literal col (also handles literal CastColumnExpr(col))
else if let (Some(lit), Some(col)) = (
left.downcast_ref::(),
- right.downcast_ref::(),
+ extract_column(binary_expr.right()),
) {
Some(Self {
col,
@@ -432,6 +435,25 @@ impl<'a> ColOpLit<'a> {
}
}
+/// Extracts a [`Column`](crate::expressions::Column) reference from a physical
+/// expression, looking through [`CastColumnExpr`](crate::expressions::CastColumnExpr)
+/// wrappers.
+fn extract_column(expr: &Arc) -> Option<&crate::expressions::Column> {
+ if let Some(col) = expr.as_any().downcast_ref::() {
+ return Some(col);
+ }
+ if let Some(cast) = expr
+ .as_any()
+ .downcast_ref::()
+ {
+ return cast
+ .expr()
+ .as_any()
+ .downcast_ref::();
+ }
+ None
+}
+
/// Represents a single `col [not]in literal` expression
struct ColInList<'a> {
col: &'a crate::expressions::Column,
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs
index d23a699f715de..c7bb5481b7d6b 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -1306,8 +1306,15 @@ pub fn ensure_distribution(
// If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
// then no repartitioning will have occurred. As the default implementation returns None, it is only
// specific physical plan nodes, such as certain datasources, which are repartitioned.
+ // Only repartition file scans when it would not reduce the
+ // child's partition count. When the child already has more
+ // partitions than target (e.g. under MergePartitionsExec),
+ // collapsing them would be counterproductive.
+ let would_reduce_partitions =
+ child.plan.output_partitioning().partition_count() > target_partitions;
if repartition_file_scans
&& roundrobin_beneficial_stats
+ && !would_reduce_partitions
&& let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs
index 6467d7a2e389d..c38d4649b5809 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -79,6 +79,7 @@ pub mod filter_pushdown;
pub mod joins;
pub mod limit;
pub mod memory;
+pub mod merge_partitions;
pub mod metrics;
pub mod placeholder_row;
pub mod projection;
diff --git a/datafusion/physical-plan/src/merge_partitions.rs b/datafusion/physical-plan/src/merge_partitions.rs
new file mode 100644
index 0000000000000..a1dcca2780ffb
--- /dev/null
+++ b/datafusion/physical-plan/src/merge_partitions.rs
@@ -0,0 +1,625 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! MergePartitionsExec maps N input work items to M output partitions using
+//! a shared work queue for work-stealing. Each output stream pops the next
+//! work item, executes it (which may produce additional items), yields all
+//! batches, then pops the next. Supports lazy morselization: file-level items
+//! can be expanded into row-group-level items at execution time.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::fmt;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+ DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
+ Statistics,
+};
+use crate::execution_plan::{CardinalityEffect, EvaluationType};
+use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
+use crate::sort_pushdown::SortOrderPushdownResult;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::PhysicalExpr;
+
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use futures::{Stream, StreamExt, ready};
+
+// ─────────────────────────────────────────────────────────────────────────────
+// Morselizer trait and types
+// ─────────────────────────────────────────────────────────────────────────────
+
+/// An opaque unit of work that a [`Morselizer`] can execute.
+///
+/// Work items are type-erased so that [`MergePartitionsExec`] stays generic.
+/// The [`Morselizer`] implementation knows how to interpret them.
+pub struct WorkItem {
+ data: Box,
+}
+
+impl WorkItem {
+ /// Create a new work item wrapping any `Send + Sync + 'static` value.
+ pub fn new(value: T) -> Self {
+ Self {
+ data: Box::new(value),
+ }
+ }
+
+ /// Downcast to a concrete type reference.
+ pub fn downcast_ref(&self) -> Option<&T> {
+ self.data.downcast_ref()
+ }
+
+ /// Consume and downcast to a concrete type.
+ pub fn downcast(self) -> Result> {
+ match self.data.downcast::() {
+ Ok(val) => Ok(*val),
+ Err(boxed) => Err(boxed),
+ }
+ }
+}
+
+impl fmt::Debug for WorkItem {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WorkItem").finish_non_exhaustive()
+ }
+}
+
+/// The result of executing a [`WorkItem`] via a [`Morselizer`].
+pub struct MorselResult {
+ /// A stream of record batches for this morsel.
+ pub stream: BoxStream<'static, Result>,
+ /// Additional work items discovered during execution (e.g., row groups
+ /// discovered after reading file metadata). These are pushed into the
+ /// shared queue for other output streams to steal.
+ pub additional_items: Vec,
+}
+
+/// Provides work items and executes them for [`MergePartitionsExec`].
+///
+/// This trait enables lazy morselization: a [`Morselizer`] can start with
+/// coarse-grained items (e.g., files) and expand them into fine-grained
+/// items (e.g., row groups) at execution time, after reading metadata.
+pub trait Morselizer: Send + Sync + fmt::Debug {
+ /// Return the initial set of work items (called once per execute).
+ fn initial_items(&self) -> Vec;
+
+ /// Execute a work item. Returns a future that resolves to a stream
+ /// of record batches plus any additional work items discovered.
+ ///
+ /// For example, executing a file item might read parquet metadata,
+ /// return a stream for the first row group, and return additional
+ /// items for the remaining row groups.
+ fn execute_item(
+ &self,
+ item: WorkItem,
+ context: Arc,
+ ) -> Result>>;
+
+ /// The output schema.
+ fn schema(&self) -> SchemaRef;
+}
+
+// ─────────────────────────────────────────────────────────────────────────────
+// PartitionMorselizer — default impl wrapping an ExecutionPlan
+// ─────────────────────────────────────────────────────────────────────────────
+
+/// Default [`Morselizer`] that provides one work item per input partition.
+///
+/// This gives the same behavior as the original atomic-counter approach:
+/// each output stream claims whole input partitions.
+#[derive(Debug)]
+struct PartitionMorselizer {
+ input: Arc,
+}
+
+impl Morselizer for PartitionMorselizer {
+ fn initial_items(&self) -> Vec {
+ let count = self.input.output_partitioning().partition_count();
+ (0..count).map(|i| WorkItem::new(i as usize)).collect()
+ }
+
+ fn execute_item(
+ &self,
+ item: WorkItem,
+ context: Arc,
+ ) -> Result>> {
+ let partition_idx: usize = *item
+ .downcast_ref()
+ .expect("PartitionMorselizer work item should be usize");
+ let stream = self.input.execute(partition_idx, context)?;
+ Ok(Box::pin(async move {
+ Ok(MorselResult {
+ stream: stream.boxed(),
+ additional_items: vec![],
+ })
+ }))
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+}
+
+// ─────────────────────────────────────────────────────────────────────────────
+// MergePartitionsExec
+// ─────────────────────────────────────────────────────────────────────────────
+
+/// Maps N input work items to M output partitions using work-stealing.
+///
+/// Each output stream pops the next work item from a shared queue, executes
+/// it via the [`Morselizer`], yields all batches, then pops the next item.
+/// Executing an item may produce additional items (e.g., row groups discovered
+/// after reading file metadata), which are pushed back into the queue for
+/// other streams to steal.
+///
+/// This provides natural load balancing without channels or background tasks.
+#[derive(Debug)]
+pub struct MergePartitionsExec {
+ /// The input plan (used for plan-tree operations: children, with_new_children, etc.)
+ input: Arc,
+ /// Number of output partitions
+ output_partitions: usize,
+ /// Provides and executes work items
+ morselizer: Arc,
+ /// Shared work queue populated during execute()
+ queue: Arc>>,
+ /// Whether the queue has been initialized
+ queue_initialized: Arc>,
+ metrics: ExecutionPlanMetricsSet,
+ cache: Arc,
+}
+
+impl Clone for MergePartitionsExec {
+ fn clone(&self) -> Self {
+ Self {
+ input: Arc::clone(&self.input),
+ output_partitions: self.output_partitions,
+ morselizer: Arc::clone(&self.morselizer),
+ queue: Arc::new(Mutex::new(VecDeque::new())),
+ queue_initialized: Arc::new(Mutex::new(false)),
+ metrics: ExecutionPlanMetricsSet::new(),
+ cache: Arc::clone(&self.cache),
+ }
+ }
+}
+
+impl MergePartitionsExec {
+ /// Create a new MergePartitionsExec that maps the input's partitions
+ /// to `output_partitions` output partitions using partition-level
+ /// work stealing (one work item per input partition).
+ pub fn new(input: Arc, output_partitions: usize) -> Self {
+ let morselizer = Arc::new(PartitionMorselizer {
+ input: Arc::clone(&input),
+ });
+ Self::new_with_morselizer(input, output_partitions, morselizer)
+ }
+
+ /// Create a new MergePartitionsExec with a custom [`Morselizer`].
+ ///
+ /// The morselizer controls the granularity of work stealing. For example,
+ /// a parquet morselizer can expand file-level items into row-group-level
+ /// items for finer-grained load balancing.
+ pub fn new_with_morselizer(
+ input: Arc,
+ output_partitions: usize,
+ morselizer: Arc,
+ ) -> Self {
+ let cache = Self::compute_properties(&input, output_partitions);
+ MergePartitionsExec {
+ input,
+ output_partitions,
+ morselizer,
+ queue: Arc::new(Mutex::new(VecDeque::new())),
+ queue_initialized: Arc::new(Mutex::new(false)),
+ metrics: ExecutionPlanMetricsSet::new(),
+ cache: Arc::new(cache),
+ }
+ }
+
+ /// Input execution plan
+ pub fn input(&self) -> &Arc {
+ &self.input
+ }
+
+ /// Number of output partitions
+ pub fn output_partitions(&self) -> usize {
+ self.output_partitions
+ }
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec>,
+ ) -> Self {
+ let morselizer = Arc::new(PartitionMorselizer {
+ input: Arc::clone(&children[0]),
+ });
+ Self {
+ input: children.swap_remove(0),
+ morselizer,
+ queue: Arc::new(Mutex::new(VecDeque::new())),
+ queue_initialized: Arc::new(Mutex::new(false)),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
+
+ fn compute_properties(
+ input: &Arc,
+ output_partitions: usize,
+ ) -> PlanProperties {
+ let mut eq_properties = input.equivalence_properties().clone();
+ eq_properties.clear_orderings();
+ eq_properties.clear_per_partition_constants();
+ PlanProperties::new(
+ eq_properties,
+ Partitioning::UnknownPartitioning(output_partitions),
+ input.pipeline_behavior(),
+ input.boundedness(),
+ )
+ .with_evaluation_type(EvaluationType::Lazy)
+ .with_scheduling_type(input.properties().scheduling_type)
+ }
+}
+
+impl DisplayAs for MergePartitionsExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
+ let input_partitions = self.input.output_partitioning().partition_count();
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "MergePartitionsExec: partitions={input_partitions}→{}",
+ self.output_partitions
+ )
+ }
+ DisplayFormatType::TreeRender => {
+ write!(
+ f,
+ "partitions: {input_partitions}→{}",
+ self.output_partitions
+ )
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for MergePartitionsExec {
+ fn name(&self) -> &'static str {
+ "MergePartitionsExec"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn properties(&self) -> &Arc {
+ &self.cache
+ }
+
+ fn children(&self) -> Vec<&Arc> {
+ vec![&self.input]
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec {
+ vec![false]
+ }
+
+ fn with_new_children(
+ self: Arc,
+ mut children: Vec>,
+ ) -> Result> {
+ check_if_same_properties!(self, children);
+ Ok(Arc::new(MergePartitionsExec::new(
+ children.swap_remove(0),
+ self.output_partitions,
+ )))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc,
+ ) -> Result {
+ // Initialize the shared queue exactly once (first call to execute).
+ {
+ let mut initialized = self.queue_initialized.lock().unwrap();
+ if !*initialized {
+ let items = self.morselizer.initial_items();
+ let mut queue = self.queue.lock().unwrap();
+ for item in items {
+ queue.push_back(item);
+ }
+ *initialized = true;
+ }
+ }
+
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
+ Ok(Box::pin(MergeStream {
+ morselizer: Arc::clone(&self.morselizer),
+ context,
+ queue: Arc::clone(&self.queue),
+ state: MergeStreamState::Idle,
+ schema: self.schema(),
+ baseline_metrics,
+ }))
+ }
+
+ fn metrics(&self) -> Option {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn partition_statistics(&self, _partition: Option) -> Result {
+ Ok(Statistics::new_unknown(&self.schema()))
+ }
+
+ fn cardinality_effect(&self) -> CardinalityEffect {
+ CardinalityEffect::Equal
+ }
+
+ fn gather_filters_for_pushdown(
+ &self,
+ _phase: FilterPushdownPhase,
+ parent_filters: Vec>,
+ _config: &ConfigOptions,
+ ) -> Result {
+ FilterDescription::from_children(parent_filters, &self.children())
+ }
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result>> {
+ let result = self.input.try_pushdown_sort(order)?;
+
+ let has_multiple_partitions =
+ self.input.output_partitioning().partition_count() > 1;
+
+ result
+ .try_map(|new_input| {
+ Ok(
+ Arc::new(MergePartitionsExec::new(new_input, self.output_partitions))
+ as Arc,
+ )
+ })
+ .map(|r| {
+ if has_multiple_partitions {
+ r.into_inexact()
+ } else {
+ r
+ }
+ })
+ }
+}
+
+// ─────────────────────────────────────────────────────────────────────────────
+// MergeStream — the output stream
+// ─────────────────────────────────────────────────────────────────────────────
+
+enum MergeStreamState {
+ /// Ready to pop the next work item from the queue.
+ Idle,
+ /// Waiting for a morsel future to complete (e.g., reading metadata).
+ Opening {
+ future: BoxFuture<'static, Result>,
+ },
+ /// Yielding batches from a morsel's stream.
+ Scanning {
+ stream: BoxStream<'static, Result>,
+ },
+ /// Terminal state.
+ Done,
+}
+
+struct MergeStream {
+ morselizer: Arc,
+ context: Arc,
+ queue: Arc>>,
+ state: MergeStreamState,
+ schema: SchemaRef,
+ baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for MergeStream {
+ type Item = Result;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll