diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 7abb39e1a7130..f4a53d49f7cdd 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -63,7 +63,6 @@ async fn search_accounts( ) -> Result<()> { // create local execution context let ctx = SessionContext::new(); - // create logical plan composed of a single TableScan let logical_plan = LogicalPlanBuilder::scan_with_filters( "accounts", diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d71af206c78d5..c057ca15e1f6e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -743,6 +743,10 @@ config_namespace! { /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true + /// (reading) If true, the parquet reader will share work between partitions + /// using morsel-driven execution. This can help mitigate data skew. + pub allow_morsel_driven: bool, default = true + /// (reading) The maximum predicate cache size, in bytes. When /// `pushdown_filters` is enabled, sets the maximum memory used to cache /// the results of predicate evaluation between filter evaluation and diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f6608d16c1022..d00c22adc6559 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -208,6 +208,7 @@ impl ParquetOptions { binary_as_string: _, // not used for writer props coerce_int96: _, // not used for writer props skip_arrow_metadata: _, + allow_morsel_driven: _, max_predicate_cache_size: _, } = self; @@ -460,6 +461,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + allow_morsel_driven: defaults.allow_morsel_driven, } } @@ -573,6 +575,7 @@ mod tests { schema_force_view_types: global_options_defaults.schema_force_view_types, binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, + allow_morsel_driven: global_options_defaults.allow_morsel_driven, coerce_int96: None, }, column_specific_options, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4c6d915d5bcaa..9835574ab5827 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -49,8 +49,11 @@ mod tests { use datafusion_common::config::TableParquetOptions; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{Result, ScalarValue, assert_contains}; + use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::file_format::FileFormat; - use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_datasource::file_scan_config::{ + FileScanConfig, FileScanConfigBuilder, + }; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::file::FileSource; diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index 7f994daeaa58c..beb414fc22375 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -227,8 +227,58 @@ impl RunQueryResult { format!("{}", pretty_format_batches(&self.result).unwrap()) } + /// Extract ORDER BY column names from the query. + /// The query format is always: + /// `SELECT * FROM test_table ORDER BY , ... LIMIT ` + fn sort_columns(&self) -> Vec { + let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER BY".len(); + let limit_start = self.query.rfind(" LIMIT").unwrap(); + self.query[order_by_start..limit_start] + .trim() + .split(',') + .map(|part| part.split_whitespace().next().unwrap().to_string()) + .collect() + } + + /// Project `batches` to only include the named columns. + fn project_columns(batches: &[RecordBatch], cols: &[String]) -> Vec { + batches + .iter() + .map(|b| { + let schema = b.schema(); + let indices: Vec = cols + .iter() + .filter_map(|c| schema.index_of(c).ok()) + .collect(); + let columns: Vec<_> = + indices.iter().map(|&i| Arc::clone(b.column(i))).collect(); + let fields: Vec<_> = + indices.iter().map(|&i| schema.field(i).clone()).collect(); + let new_schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(new_schema, columns).unwrap() + }) + .collect() + } + fn is_ok(&self) -> bool { - self.expected_formatted() == self.result_formatted() + if self.expected_formatted() == self.result_formatted() { + return true; + } + // If the full results differ, compare only the ORDER BY column values. + // + // For queries with ORDER BY LIMIT k, multiple rows may tie on the + // sort key (e.g. two rows with id=27 for ORDER BY id DESC LIMIT 1). + // SQL permits returning any of the tied rows, so with vs without dynamic + // filter pushdown may legitimately return different tied rows. + // + // The dynamic filter must not change the *sort-key values* of the top-k + // result. We verify correctness by projecting both results down to only + // the ORDER BY columns and comparing those. + let sort_cols = self.sort_columns(); + let expected_keys = Self::project_columns(&self.expected, &sort_cols); + let result_keys = Self::project_columns(&self.result, &sort_cols); + format!("{}", pretty_format_batches(&expected_keys).unwrap()) + == format!("{}", pretty_format_batches(&result_keys).unwrap()) } } diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 445ae7e97f228..35e2ec6cde7bc 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -382,7 +382,11 @@ async fn prune_disabled() { .await; println!("{}", output.description()); - // This should not prune any + // Row group stats pruning is disabled, so 0 row groups are pruned by statistics. + // Bloom filter runs next and matches all 4 row groups (bloom filters don't help + // for range/inequality predicates like `nanos < threshold`). Page index pruning + // runs afterwards and can produce row-level selections, but those don't affect + // the bloom filter matched count. The query result is still correct. assert_eq!(output.predicate_evaluation_errors(), Some(0)); assert_eq!(output.row_groups_matched(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index fa021ed3dcce3..9601095bb0d9a 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -78,6 +78,11 @@ mod test { target_partition: Option, ) -> Arc { let mut session_config = SessionConfig::new().with_collect_statistics(true); + session_config + .options_mut() + .execution + .parquet + .allow_morsel_driven = false; if let Some(partition) = target_partition { session_config = session_config.with_target_partitions(partition); } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 6d1758abeb47b..e1b2ca3bcf8f0 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -27,12 +27,12 @@ use std::{fmt, vec}; use arrow::array::RecordBatch; use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; -use datafusion_datasource::TableSchema; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::write::{ ObjectWriterBuilder, SharedBuffer, get_writer_schema, }; +use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; @@ -49,12 +49,14 @@ use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_plan::merge_partitions::MergePartitionsExec; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; @@ -522,8 +524,10 @@ impl FileFormat for ParquetFormat { let store = state .runtime_env() .object_store(conf.object_store_url.clone())?; - let cached_parquet_read_factory = - Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); + let cached_parquet_read_factory = Arc::new(CachedParquetFileReaderFactory::new( + Arc::clone(&store), + metadata_cache, + )); source = source.with_parquet_file_reader_factory(cached_parquet_read_factory); if let Some(metadata_size_hint) = metadata_size_hint { @@ -532,10 +536,85 @@ impl FileFormat for ParquetFormat { source = self.set_source_encryption_factory(source, state)?; - let conf = FileScanConfigBuilder::from(conf) - .with_source(Arc::new(source)) - .build(); - Ok(DataSourceExec::from_data_source(conf)) + let use_merge_partitions = self.options.global.allow_morsel_driven; + + if use_merge_partitions { + // Morsel-driven execution: create one partition per file for the + // plan tree, then use a ParquetMorselizer that lazily expands + // files into row-group-level work items for fine-grained + // work-stealing via MergePartitionsExec. + let target_partitions = state.config_options().execution.target_partitions; + + // Collect all files before conf is consumed. + let all_files: Vec = conf + .file_groups + .iter() + .flat_map(|group| group.iter().cloned()) + .collect(); + + // Get the reader factory (for metadata reads in the morselizer). + // It was set above as CachedParquetFileReaderFactory. + let reader_factory: Arc = source + .parquet_file_reader_factory() + .cloned() + .expect("reader factory was set above"); + + // Set batch size on source before creating the opener (normally + // done by FileScanConfig::open_file_reader, but we create the + // opener directly here for the morselizer). + source.batch_size = Some( + conf.batch_size + .unwrap_or(state.config_options().execution.batch_size), + ); + + // Create the opener for row-group morsel execution. + let opener = source.create_file_opener(Arc::clone(&store), &conf, 0)?; + + let output_schema = conf.projected_schema()?; + let metrics = source.metrics().clone(); + + // Build the DataSourceExec (one partition per file, for plan tree). + let one_per_file: Vec = conf + .file_groups + .iter() + .flat_map(|group| { + group.iter().map(|file| FileGroup::new(vec![file.clone()])) + }) + .collect(); + + let conf = FileScanConfigBuilder::from(conf) + .with_source(Arc::new(source)) + .with_file_groups(one_per_file) + .build(); + let data_source = DataSourceExec::from_data_source(conf); + let input_partitions = data_source + .properties() + .output_partitioning() + .partition_count(); + + if input_partitions > target_partitions { + let morselizer = Arc::new(crate::morselizer::ParquetMorselizer::new( + all_files, + opener, + reader_factory, + metrics, + metadata_size_hint, + output_schema, + )); + Ok(Arc::new(MergePartitionsExec::new_with_morselizer( + data_source, + target_partitions, + morselizer, + ))) + } else { + Ok(data_source) + } + } else { + let conf = FileScanConfigBuilder::from(conf) + .with_source(Arc::new(source)) + .build(); + Ok(DataSourceExec::from_data_source(conf)) + } } async fn create_writer_physical_plan( diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 0e137a706fad7..d1e46196103ae 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -24,6 +24,7 @@ pub mod access_plan; pub mod file_format; pub mod metadata; mod metrics; +pub(crate) mod morselizer; mod opener; mod page_filter; mod reader; diff --git a/datafusion/datasource-parquet/src/morselizer.rs b/datafusion/datasource-parquet/src/morselizer.rs new file mode 100644 index 0000000000000..3593a24974f3e --- /dev/null +++ b/datafusion/datasource-parquet/src/morselizer.rs @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ParquetMorselizer`] — row-group-level morselizer for parquet files. + +use std::fmt; +use std::sync::Arc; + +use crate::{ParquetAccessPlan, ParquetFileReaderFactory}; + +use arrow::datatypes::SchemaRef; +use datafusion_common::Result; +use datafusion_datasource::PartitionedFile; +use datafusion_datasource::file_stream::FileOpener; +use datafusion_execution::TaskContext; +use datafusion_physical_plan::merge_partitions::{MorselResult, Morselizer, WorkItem}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + +use futures::StreamExt; +use futures::future::BoxFuture; +use parquet::arrow::async_reader::AsyncFileReader; + +/// Work item types for the parquet morselizer. +enum ParquetWorkItem { + /// A file that hasn't been morselized yet. Metadata will be read lazily + /// when this item is executed, and the file will be split into row groups. + File(PartitionedFile), + /// A single row group within a file. The `PartitionedFile` carries a + /// `ParquetAccessPlan` extension that selects exactly one row group. + RowGroup(PartitionedFile), +} + +/// A [`Morselizer`] that expands parquet files into row-group-level work items. +/// +/// When a file work item is executed: +/// 1. Metadata is read lazily (first access populates the metadata cache) +/// 2. The file is split into one work item per row group +/// 3. The first row group is executed immediately +/// 4. Remaining row groups are pushed to the shared queue for other streams +/// +/// When a row-group work item is executed, it is opened directly via the +/// opener (metadata comes from the cache). +pub(crate) struct ParquetMorselizer { + /// Files to scan + files: Vec, + /// Opener for creating streams for individual files/row groups + opener: Arc, + /// Factory for creating readers (used for metadata reads) + reader_factory: Arc, + /// Metrics for the reader factory + metrics: ExecutionPlanMetricsSet, + /// Metadata size hint + metadata_size_hint: Option, + /// Output schema + schema: SchemaRef, +} + +impl fmt::Debug for ParquetMorselizer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetMorselizer") + .field("num_files", &self.files.len()) + .finish() + } +} + +impl ParquetMorselizer { + pub(crate) fn new( + files: Vec, + opener: Arc, + reader_factory: Arc, + metrics: ExecutionPlanMetricsSet, + metadata_size_hint: Option, + schema: SchemaRef, + ) -> Self { + Self { + files, + opener, + reader_factory, + metrics, + metadata_size_hint, + schema, + } + } +} + +impl Morselizer for ParquetMorselizer { + fn initial_items(&self) -> Vec { + self.files + .iter() + .map(|f| WorkItem::new(ParquetWorkItem::File(f.clone()))) + .collect() + } + + fn execute_item( + &self, + item: WorkItem, + _context: Arc, + ) -> Result>> { + let work_item: ParquetWorkItem = item + .downcast() + .expect("ParquetMorselizer work item should be ParquetWorkItem"); + + match work_item { + ParquetWorkItem::File(file) => self.execute_file_item(file), + ParquetWorkItem::RowGroup(file) => self.execute_row_group_item(file), + } + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl ParquetMorselizer { + /// Execute a file-level work item: read metadata, split into row groups. + fn execute_file_item( + &self, + file: PartitionedFile, + ) -> Result>> { + // Create a reader to fetch metadata + let mut reader: Box = + self.reader_factory.create_reader( + 0, // partition index doesn't matter for metadata reads + file.clone(), + self.metadata_size_hint, + &self.metrics, + )?; + let opener = Arc::clone(&self.opener); + + Ok(Box::pin(async move { + // Read metadata (this populates the cache for subsequent reads) + let metadata = reader + .get_metadata(None) + .await + .map_err(|e| datafusion_common::DataFusionError::External(Box::new(e)))?; + let num_row_groups = metadata.num_row_groups(); + + if num_row_groups == 0 { + // No row groups — return an empty stream + let empty: Vec> = vec![]; + return Ok(MorselResult { + stream: futures::stream::iter(empty).boxed(), + additional_items: vec![], + }); + } + + // Create a work item for each row group except the first + let mut additional_items = + Vec::with_capacity(num_row_groups.saturating_sub(1)); + for rg_idx in 1..num_row_groups { + let mut access_plan = ParquetAccessPlan::new_none(num_row_groups); + access_plan.scan(rg_idx); + let mut rg_file = file.clone(); + rg_file.extensions = Some(Arc::new(access_plan)); + additional_items.push(WorkItem::new(ParquetWorkItem::RowGroup(rg_file))); + } + + // Execute the first row group + let mut first_file = file; + let mut access_plan = ParquetAccessPlan::new_none(num_row_groups); + access_plan.scan(0); + first_file.extensions = Some(Arc::new(access_plan)); + + let future = opener.open(first_file)?; + let stream = future.await?; + + Ok(MorselResult { + stream, + additional_items, + }) + })) + } + + /// Execute a row-group-level work item: open the specific row group. + fn execute_row_group_item( + &self, + file: PartitionedFile, + ) -> Result>> { + let future = self.opener.open(file)?; + + Ok(Box::pin(async move { + let stream = future.await?; + Ok(MorselResult { + stream, + additional_items: vec![], + }) + })) + } +} diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..5fd3f21832ea0 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -40,7 +40,7 @@ use datafusion_common::stats::Precision; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, }; -use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_datasource::{FileRange, PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -64,10 +64,12 @@ use parquet::arrow::arrow_reader::{ }; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::ParquetMetaData; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; +use parquet::schema::types::SchemaDescriptor; /// Implements [`FileOpener`] for a parquet file -pub(super) struct ParquetOpener { +pub(crate) struct ParquetOpener { /// Execution partition index pub(crate) partition_index: usize, /// Projection to apply on top of the table schema (i.e. can reference partition columns). @@ -130,6 +132,13 @@ pub(crate) struct PreparedAccessPlan { pub(crate) row_selection: Option, } +struct RowGroupStatisticsPruningContext<'a> { + physical_file_schema: &'a SchemaRef, + parquet_schema: &'a SchemaDescriptor, + predicate: &'a PruningPredicate, + file_metrics: &'a ParquetFileMetrics, +} + impl PreparedAccessPlan { /// Create a new prepared access plan from a ParquetAccessPlan pub(crate) fn from_access_plan( @@ -146,10 +155,7 @@ impl PreparedAccessPlan { } /// Reverse the access plan for reverse scanning - pub(crate) fn reverse( - mut self, - file_metadata: &parquet::file::metadata::ParquetMetaData, - ) -> Result { + pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { // Get the row group indexes before reversing let row_groups_to_scan = self.row_group_indexes.clone(); @@ -180,6 +186,39 @@ impl PreparedAccessPlan { } } +impl ParquetOpener { + fn build_row_group_access_filter( + file_name: &str, + extensions: Option>, + row_group_count: usize, + row_group_metadata: &[RowGroupMetaData], + file_range: Option<&FileRange>, + stats_pruning: Option>, + ) -> Result { + let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan( + file_name, + extensions, + row_group_count, + )?); + + if let Some(range) = file_range { + row_groups.prune_by_range(row_group_metadata, range); + } + + if let Some(stats_pruning) = stats_pruning { + row_groups.prune_by_statistics( + stats_pruning.physical_file_schema.as_ref(), + stats_pruning.parquet_schema, + row_group_metadata, + stats_pruning.predicate, + stats_pruning.file_metrics, + ); + } + + Ok(row_groups) + } +} + impl FileOpener for ParquetOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { // ----------------------------------- @@ -358,7 +397,7 @@ impl FileOpener for ParquetOpener { // Begin by loading the metadata from the underlying reader (note // the returned metadata may actually include page indexes as some // readers may return page indexes even when not requested -- for - // example when they are cached) + // example when they are cached). let mut reader_metadata = ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone()) .await?; @@ -495,26 +534,25 @@ impl FileOpener for ParquetOpener { let file_metadata = Arc::clone(builder.metadata()); let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); - // track which row groups to actually read - let access_plan = - create_initial_plan(&file_name, extensions, rg_metadata.len())?; - let mut row_groups = RowGroupAccessPlanFilter::new(access_plan); - // if there is a range restricting what parts of the file to read - if let Some(range) = file_range.as_ref() { - row_groups.prune_by_range(rg_metadata, range); - } + let mut row_groups = Self::build_row_group_access_filter( + &file_name, + extensions, + rg_metadata.len(), + rg_metadata, + file_range.as_ref(), + predicate + .filter(|_| enable_row_group_stats_pruning) + .map(|predicate| RowGroupStatisticsPruningContext { + physical_file_schema: &physical_file_schema, + parquet_schema: builder.parquet_schema(), + predicate, + file_metrics: &file_metrics, + }), + )?; // If there is a predicate that can be evaluated against the metadata if let Some(predicate) = predicate.as_ref() { - if enable_row_group_stats_pruning { - row_groups.prune_by_statistics( - &physical_file_schema, - builder.parquet_schema(), - rg_metadata, - predicate, - &file_metrics, - ); - } else { + if !enable_row_group_stats_pruning { // Update metrics: statistics unavailable, so all row groups are // matched (not pruned) file_metrics @@ -522,6 +560,12 @@ impl FileOpener for ParquetOpener { .add_matched(row_groups.remaining_row_group_count()); } + // Prune by limit before bloom filter: no point reading bloom filter data + // for row groups that will be skipped by the limit anyway. + if let (Some(limit), false) = (limit, preserve_order) { + row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); + } + if enable_bloom_filter && !row_groups.is_empty() { row_groups .prune_by_bloom_filters( @@ -549,11 +593,6 @@ impl FileOpener for ParquetOpener { .add_matched(n_remaining_row_groups); } - // Prune by limit if limit is set and limit order is not sensitive - if let (Some(limit), false) = (limit, preserve_order) { - row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); - } - // -------------------------------------------------------- // Step: prune pages from the kept row groups // @@ -1035,7 +1074,7 @@ mod test { use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, }; - use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricValue}; use futures::{Stream, StreamExt}; use object_store::{ObjectStore, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; @@ -1365,6 +1404,19 @@ mod test { )) } + fn get_pruning_metric( + metrics: &ExecutionPlanMetricsSet, + metric_name: &str, + ) -> (usize, usize) { + match metrics.clone_inner().sum_by_name(metric_name) { + Some(MetricValue::PruningMetrics { + pruning_metrics, .. + }) => (pruning_metrics.pruned(), pruning_metrics.matched()), + Some(_) => panic!("Metric '{metric_name}' is not a pruning metric"), + None => panic!("Metric '{metric_name}' not found"), + } + } + #[tokio::test] async fn test_prune_on_statistics() { let store = Arc::new(InMemory::new()) as Arc; diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index c8090382094ef..f216855471482 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -76,9 +76,10 @@ impl FileStream { let projected_schema = config.projected_schema()?; let file_group = config.file_groups[partition].clone(); + let file_iter = file_group.into_inner().into_iter().collect(); Ok(Self { - file_iter: file_group.into_inner().into_iter().collect(), + file_iter, projected_schema, remain: config.limit, file_opener, @@ -214,7 +215,9 @@ impl FileStream { } } } - None => return Poll::Ready(None), + None => { + return Poll::Ready(None); + } }, OnError::Fail => { self.state = FileStreamState::Error; @@ -243,7 +246,9 @@ impl FileStream { } } } - None => return Poll::Ready(None), + None => { + return Poll::Ready(None); + } } } } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 05028ed0f4683..cda7ab470d4b7 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -125,6 +125,7 @@ pub trait DataSource: Send + Sync + Debug { partition: usize, context: Arc, ) -> Result; + fn as_any(&self) -> &dyn Any; /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; diff --git a/datafusion/physical-expr/src/simplifier/mod.rs b/datafusion/physical-expr/src/simplifier/mod.rs index 3f3f8573449eb..38663f1b06609 100644 --- a/datafusion/physical-expr/src/simplifier/mod.rs +++ b/datafusion/physical-expr/src/simplifier/mod.rs @@ -61,7 +61,10 @@ impl<'a> PhysicalExprSimplifier<'a> { count += 1; let result = current_expr.transform(|node| { #[cfg(debug_assertions)] - let original_type = node.data_type(schema).unwrap(); + // Use `ok()` to skip the assertion when data_type fails (e.g., for + // DynamicFilterPhysicalExpr whose inner expression may reference columns + // outside the provided schema when the filter has been updated concurrently). + let original_type = node.data_type(schema).ok(); // Apply NOT expression simplification first, then unwrap cast optimization, // then constant expression evaluation @@ -73,11 +76,14 @@ impl<'a> PhysicalExprSimplifier<'a> { })?; #[cfg(debug_assertions)] - assert_eq!( - rewritten.data.data_type(schema).unwrap(), - original_type, - "Simplified expression should have the same data type as the original" - ); + if let Some(original_type) = original_type + && let Ok(rewritten_type) = rewritten.data.data_type(schema) { + assert_eq!( + rewritten_type, + original_type, + "Simplified expression should have the same data type as the original" + ); + } Ok(rewritten) })?; diff --git a/datafusion/physical-expr/src/utils/guarantee.rs b/datafusion/physical-expr/src/utils/guarantee.rs index c4ce74fd3a573..70c83cee65b74 100644 --- a/datafusion/physical-expr/src/utils/guarantee.rs +++ b/datafusion/physical-expr/src/utils/guarantee.rs @@ -389,6 +389,9 @@ impl<'a> ColOpLit<'a> { /// 2. `literal col` /// 3. operator is `=` or `!=` /// + /// Also handles `CastColumnExpr(col) literal` patterns where the + /// column is wrapped in a cast (e.g., from schema adaptation). + /// /// Returns None otherwise fn try_new(expr: &'a Arc) -> Option { let binary_expr = expr @@ -405,9 +408,9 @@ impl<'a> ColOpLit<'a> { Operator::NotEq => Guarantee::NotIn, _ => return None, }; - // col literal + // col literal (also handles CastColumnExpr(col) literal) if let (Some(col), Some(lit)) = ( - left.downcast_ref::(), + extract_column(binary_expr.left()), right.downcast_ref::(), ) { Some(Self { @@ -416,10 +419,10 @@ impl<'a> ColOpLit<'a> { lit, }) } - // literal col + // literal col (also handles literal CastColumnExpr(col)) else if let (Some(lit), Some(col)) = ( left.downcast_ref::(), - right.downcast_ref::(), + extract_column(binary_expr.right()), ) { Some(Self { col, @@ -432,6 +435,25 @@ impl<'a> ColOpLit<'a> { } } +/// Extracts a [`Column`](crate::expressions::Column) reference from a physical +/// expression, looking through [`CastColumnExpr`](crate::expressions::CastColumnExpr) +/// wrappers. +fn extract_column(expr: &Arc) -> Option<&crate::expressions::Column> { + if let Some(col) = expr.as_any().downcast_ref::() { + return Some(col); + } + if let Some(cast) = expr + .as_any() + .downcast_ref::() + { + return cast + .expr() + .as_any() + .downcast_ref::(); + } + None +} + /// Represents a single `col [not]in literal` expression struct ColInList<'a> { col: &'a crate::expressions::Column, diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index d23a699f715de..c7bb5481b7d6b 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1306,8 +1306,15 @@ pub fn ensure_distribution( // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`) // then no repartitioning will have occurred. As the default implementation returns None, it is only // specific physical plan nodes, such as certain datasources, which are repartitioned. + // Only repartition file scans when it would not reduce the + // child's partition count. When the child already has more + // partitions than target (e.g. under MergePartitionsExec), + // collapsing them would be counterproductive. + let would_reduce_partitions = + child.plan.output_partitioning().partition_count() > target_partitions; if repartition_file_scans && roundrobin_beneficial_stats + && !would_reduce_partitions && let Some(new_child) = child.plan.repartitioned(target_partitions, config)? { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6467d7a2e389d..c38d4649b5809 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -79,6 +79,7 @@ pub mod filter_pushdown; pub mod joins; pub mod limit; pub mod memory; +pub mod merge_partitions; pub mod metrics; pub mod placeholder_row; pub mod projection; diff --git a/datafusion/physical-plan/src/merge_partitions.rs b/datafusion/physical-plan/src/merge_partitions.rs new file mode 100644 index 0000000000000..a1dcca2780ffb --- /dev/null +++ b/datafusion/physical-plan/src/merge_partitions.rs @@ -0,0 +1,625 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! MergePartitionsExec maps N input work items to M output partitions using +//! a shared work queue for work-stealing. Each output stream pops the next +//! work item, executes it (which may produce additional items), yields all +//! batches, then pops the next. Supports lazy morselization: file-level items +//! can be expanded into row-group-level items at execution time. + +use std::any::Any; +use std::collections::VecDeque; +use std::fmt; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use super::{ + DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, + Statistics, +}; +use crate::execution_plan::{CardinalityEffect, EvaluationType}; +use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; +use crate::sort_pushdown::SortOrderPushdownResult; +use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_execution::{RecordBatchStream, TaskContext}; +use datafusion_physical_expr::PhysicalExpr; + +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt, ready}; + +// ───────────────────────────────────────────────────────────────────────────── +// Morselizer trait and types +// ───────────────────────────────────────────────────────────────────────────── + +/// An opaque unit of work that a [`Morselizer`] can execute. +/// +/// Work items are type-erased so that [`MergePartitionsExec`] stays generic. +/// The [`Morselizer`] implementation knows how to interpret them. +pub struct WorkItem { + data: Box, +} + +impl WorkItem { + /// Create a new work item wrapping any `Send + Sync + 'static` value. + pub fn new(value: T) -> Self { + Self { + data: Box::new(value), + } + } + + /// Downcast to a concrete type reference. + pub fn downcast_ref(&self) -> Option<&T> { + self.data.downcast_ref() + } + + /// Consume and downcast to a concrete type. + pub fn downcast(self) -> Result> { + match self.data.downcast::() { + Ok(val) => Ok(*val), + Err(boxed) => Err(boxed), + } + } +} + +impl fmt::Debug for WorkItem { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkItem").finish_non_exhaustive() + } +} + +/// The result of executing a [`WorkItem`] via a [`Morselizer`]. +pub struct MorselResult { + /// A stream of record batches for this morsel. + pub stream: BoxStream<'static, Result>, + /// Additional work items discovered during execution (e.g., row groups + /// discovered after reading file metadata). These are pushed into the + /// shared queue for other output streams to steal. + pub additional_items: Vec, +} + +/// Provides work items and executes them for [`MergePartitionsExec`]. +/// +/// This trait enables lazy morselization: a [`Morselizer`] can start with +/// coarse-grained items (e.g., files) and expand them into fine-grained +/// items (e.g., row groups) at execution time, after reading metadata. +pub trait Morselizer: Send + Sync + fmt::Debug { + /// Return the initial set of work items (called once per execute). + fn initial_items(&self) -> Vec; + + /// Execute a work item. Returns a future that resolves to a stream + /// of record batches plus any additional work items discovered. + /// + /// For example, executing a file item might read parquet metadata, + /// return a stream for the first row group, and return additional + /// items for the remaining row groups. + fn execute_item( + &self, + item: WorkItem, + context: Arc, + ) -> Result>>; + + /// The output schema. + fn schema(&self) -> SchemaRef; +} + +// ───────────────────────────────────────────────────────────────────────────── +// PartitionMorselizer — default impl wrapping an ExecutionPlan +// ───────────────────────────────────────────────────────────────────────────── + +/// Default [`Morselizer`] that provides one work item per input partition. +/// +/// This gives the same behavior as the original atomic-counter approach: +/// each output stream claims whole input partitions. +#[derive(Debug)] +struct PartitionMorselizer { + input: Arc, +} + +impl Morselizer for PartitionMorselizer { + fn initial_items(&self) -> Vec { + let count = self.input.output_partitioning().partition_count(); + (0..count).map(|i| WorkItem::new(i as usize)).collect() + } + + fn execute_item( + &self, + item: WorkItem, + context: Arc, + ) -> Result>> { + let partition_idx: usize = *item + .downcast_ref() + .expect("PartitionMorselizer work item should be usize"); + let stream = self.input.execute(partition_idx, context)?; + Ok(Box::pin(async move { + Ok(MorselResult { + stream: stream.boxed(), + additional_items: vec![], + }) + })) + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// MergePartitionsExec +// ───────────────────────────────────────────────────────────────────────────── + +/// Maps N input work items to M output partitions using work-stealing. +/// +/// Each output stream pops the next work item from a shared queue, executes +/// it via the [`Morselizer`], yields all batches, then pops the next item. +/// Executing an item may produce additional items (e.g., row groups discovered +/// after reading file metadata), which are pushed back into the queue for +/// other streams to steal. +/// +/// This provides natural load balancing without channels or background tasks. +#[derive(Debug)] +pub struct MergePartitionsExec { + /// The input plan (used for plan-tree operations: children, with_new_children, etc.) + input: Arc, + /// Number of output partitions + output_partitions: usize, + /// Provides and executes work items + morselizer: Arc, + /// Shared work queue populated during execute() + queue: Arc>>, + /// Whether the queue has been initialized + queue_initialized: Arc>, + metrics: ExecutionPlanMetricsSet, + cache: Arc, +} + +impl Clone for MergePartitionsExec { + fn clone(&self) -> Self { + Self { + input: Arc::clone(&self.input), + output_partitions: self.output_partitions, + morselizer: Arc::clone(&self.morselizer), + queue: Arc::new(Mutex::new(VecDeque::new())), + queue_initialized: Arc::new(Mutex::new(false)), + metrics: ExecutionPlanMetricsSet::new(), + cache: Arc::clone(&self.cache), + } + } +} + +impl MergePartitionsExec { + /// Create a new MergePartitionsExec that maps the input's partitions + /// to `output_partitions` output partitions using partition-level + /// work stealing (one work item per input partition). + pub fn new(input: Arc, output_partitions: usize) -> Self { + let morselizer = Arc::new(PartitionMorselizer { + input: Arc::clone(&input), + }); + Self::new_with_morselizer(input, output_partitions, morselizer) + } + + /// Create a new MergePartitionsExec with a custom [`Morselizer`]. + /// + /// The morselizer controls the granularity of work stealing. For example, + /// a parquet morselizer can expand file-level items into row-group-level + /// items for finer-grained load balancing. + pub fn new_with_morselizer( + input: Arc, + output_partitions: usize, + morselizer: Arc, + ) -> Self { + let cache = Self::compute_properties(&input, output_partitions); + MergePartitionsExec { + input, + output_partitions, + morselizer, + queue: Arc::new(Mutex::new(VecDeque::new())), + queue_initialized: Arc::new(Mutex::new(false)), + metrics: ExecutionPlanMetricsSet::new(), + cache: Arc::new(cache), + } + } + + /// Input execution plan + pub fn input(&self) -> &Arc { + &self.input + } + + /// Number of output partitions + pub fn output_partitions(&self) -> usize { + self.output_partitions + } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let morselizer = Arc::new(PartitionMorselizer { + input: Arc::clone(&children[0]), + }); + Self { + input: children.swap_remove(0), + morselizer, + queue: Arc::new(Mutex::new(VecDeque::new())), + queue_initialized: Arc::new(Mutex::new(false)), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } + + fn compute_properties( + input: &Arc, + output_partitions: usize, + ) -> PlanProperties { + let mut eq_properties = input.equivalence_properties().clone(); + eq_properties.clear_orderings(); + eq_properties.clear_per_partition_constants(); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(output_partitions), + input.pipeline_behavior(), + input.boundedness(), + ) + .with_evaluation_type(EvaluationType::Lazy) + .with_scheduling_type(input.properties().scheduling_type) + } +} + +impl DisplayAs for MergePartitionsExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + let input_partitions = self.input.output_partitioning().partition_count(); + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "MergePartitionsExec: partitions={input_partitions}→{}", + self.output_partitions + ) + } + DisplayFormatType::TreeRender => { + write!( + f, + "partitions: {input_partitions}→{}", + self.output_partitions + ) + } + } + } +} + +impl ExecutionPlan for MergePartitionsExec { + fn name(&self) -> &'static str { + "MergePartitionsExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + check_if_same_properties!(self, children); + Ok(Arc::new(MergePartitionsExec::new( + children.swap_remove(0), + self.output_partitions, + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + // Initialize the shared queue exactly once (first call to execute). + { + let mut initialized = self.queue_initialized.lock().unwrap(); + if !*initialized { + let items = self.morselizer.initial_items(); + let mut queue = self.queue.lock().unwrap(); + for item in items { + queue.push_back(item); + } + *initialized = true; + } + } + + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + + Ok(Box::pin(MergeStream { + morselizer: Arc::clone(&self.morselizer), + context, + queue: Arc::clone(&self.queue), + state: MergeStreamState::Idle, + schema: self.schema(), + baseline_metrics, + })) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn partition_statistics(&self, _partition: Option) -> Result { + Ok(Statistics::new_unknown(&self.schema())) + } + + fn cardinality_effect(&self) -> CardinalityEffect { + CardinalityEffect::Equal + } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + let result = self.input.try_pushdown_sort(order)?; + + let has_multiple_partitions = + self.input.output_partitioning().partition_count() > 1; + + result + .try_map(|new_input| { + Ok( + Arc::new(MergePartitionsExec::new(new_input, self.output_partitions)) + as Arc, + ) + }) + .map(|r| { + if has_multiple_partitions { + r.into_inexact() + } else { + r + } + }) + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// MergeStream — the output stream +// ───────────────────────────────────────────────────────────────────────────── + +enum MergeStreamState { + /// Ready to pop the next work item from the queue. + Idle, + /// Waiting for a morsel future to complete (e.g., reading metadata). + Opening { + future: BoxFuture<'static, Result>, + }, + /// Yielding batches from a morsel's stream. + Scanning { + stream: BoxStream<'static, Result>, + }, + /// Terminal state. + Done, +} + +struct MergeStream { + morselizer: Arc, + context: Arc, + queue: Arc>>, + state: MergeStreamState, + schema: SchemaRef, + baseline_metrics: BaselineMetrics, +} + +impl Stream for MergeStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + + loop { + match &mut this.state { + MergeStreamState::Idle => { + // Pop next work item from the shared queue + let item = this.queue.lock().unwrap().pop_front(); + match item { + Some(item) => { + match this + .morselizer + .execute_item(item, Arc::clone(&this.context)) + { + Ok(future) => { + this.state = MergeStreamState::Opening { future }; + } + Err(e) => { + this.state = MergeStreamState::Done; + return Poll::Ready(Some(Err(e))); + } + } + } + None => { + this.state = MergeStreamState::Done; + return Poll::Ready(None); + } + } + } + + MergeStreamState::Opening { future } => { + match ready!(future.as_mut().poll(cx)) { + Ok(result) => { + // Push additional items to the shared queue + if !result.additional_items.is_empty() { + let mut queue = this.queue.lock().unwrap(); + for item in result.additional_items { + queue.push_back(item); + } + } + this.state = MergeStreamState::Scanning { + stream: result.stream, + }; + } + Err(e) => { + this.state = MergeStreamState::Done; + return Poll::Ready(Some(Err(e))); + } + } + } + + MergeStreamState::Scanning { stream } => { + match ready!(stream.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + this.baseline_metrics.record_output(batch.num_rows()); + return Poll::Ready(Some(Ok(batch))); + } + Some(Err(e)) => { + this.state = MergeStreamState::Done; + return Poll::Ready(Some(Err(e))); + } + None => { + // Current morsel exhausted, get next + this.state = MergeStreamState::Idle; + } + } + } + + MergeStreamState::Done => { + return Poll::Ready(None); + } + } + } + } +} + +impl RecordBatchStream for MergeStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common; + use crate::test; + + #[tokio::test] + async fn test_merge_10_to_3() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + + let num_input_partitions = 10; + let num_output_partitions = 3; + let input = test::scan_partitioned(num_input_partitions); + + assert_eq!( + input.output_partitioning().partition_count(), + num_input_partitions + ); + + let merge = MergePartitionsExec::new(input, num_output_partitions); + + assert_eq!( + merge.properties().output_partitioning().partition_count(), + num_output_partitions + ); + + // Collect all output partitions + let mut total_rows = 0; + for partition in 0..num_output_partitions { + let stream = merge.execute(partition, Arc::clone(&task_ctx))?; + let batches = common::collect(stream).await?; + let rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + total_rows += rows; + } + + // 10 partitions x 100 rows each = 1000 total rows + assert_eq!(total_rows, 1000); + + Ok(()) + } + + #[tokio::test] + async fn test_merge_output_ge_input() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + + let num_input_partitions = 3; + let num_output_partitions = 5; + let input = test::scan_partitioned(num_input_partitions); + + let merge = MergePartitionsExec::new(input, num_output_partitions); + + assert_eq!( + merge.properties().output_partitioning().partition_count(), + num_output_partitions + ); + + let mut total_rows = 0; + for partition in 0..num_output_partitions { + let stream = merge.execute(partition, Arc::clone(&task_ctx))?; + let batches = common::collect(stream).await?; + let rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + total_rows += rows; + } + + // 3 partitions x 100 rows each = 300 total rows + assert_eq!(total_rows, 300); + + Ok(()) + } + + #[tokio::test] + async fn test_schema_preserved() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + + let input = test::scan_partitioned(4); + let expected_schema = input.schema(); + + let merge = MergePartitionsExec::new(input, 2); + + // Verify the operator schema matches input + assert_eq!(merge.schema(), expected_schema); + + // Verify stream schema matches + let stream = merge.execute(0, task_ctx)?; + assert_eq!(stream.schema(), expected_schema); + + Ok(()) + } +} diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..da7491c6115c8 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -549,6 +549,7 @@ message ParquetOptions { bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false bool skip_arrow_metadata = 30; // default = false + bool allow_morsel_driven = 35; // default = true oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..387aeda657e1f 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1090,6 +1090,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + allow_morsel_driven: value.allow_morsel_driven, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..e5c6a509a5f4c 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5683,6 +5683,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { len += 1; } + if self.allow_morsel_driven { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5788,6 +5791,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; } + if self.allow_morsel_driven { + struct_ser.serialize_field("allowMorselDriven", &self.allow_morsel_driven)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5936,6 +5942,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "binaryAsString", "skip_arrow_metadata", "skipArrowMetadata", + "allow_morsel_driven", + "allowMorselDriven", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5985,6 +5993,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { SchemaForceViewTypes, BinaryAsString, SkipArrowMetadata, + AllowMorselDriven, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -6038,6 +6047,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), + "allowMorselDriven" | "allow_morsel_driven" => Ok(GeneratedField::AllowMorselDriven), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -6089,6 +6099,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; let mut skip_arrow_metadata__ = None; + let mut allow_morsel_driven__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -6216,6 +6227,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } skip_arrow_metadata__ = Some(map_.next_value()?); } + GeneratedField::AllowMorselDriven => { + if allow_morsel_driven__.is_some() { + return Err(serde::de::Error::duplicate_field("allowMorselDriven")); + } + allow_morsel_driven__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -6332,6 +6349,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), + allow_morsel_driven: allow_morsel_driven__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..739bd28188fa7 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -830,6 +830,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = true + #[prost(bool, tag = "35")] + pub allow_morsel_driven: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..7afe1d57c8940 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + allow_morsel_driven: value.allow_morsel_driven, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..739bd28188fa7 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -830,6 +830,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = true + #[prost(bool, tag = "35")] + pub allow_morsel_driven: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..573dc533cdf57 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -426,6 +426,7 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + allow_morsel_driven: global_options.global.allow_morsel_driven, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -525,6 +526,7 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + allow_morsel_driven: proto.allow_morsel_driven, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b61ceecb24fc0..35aee28b949dd 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -228,6 +228,7 @@ datafusion.execution.max_spill_file_size_bytes 134217728 datafusion.execution.meta_fetch_concurrency 32 datafusion.execution.minimum_parallel_output_files 4 datafusion.execution.objectstore_writer_buffer_size 10485760 +datafusion.execution.parquet.allow_morsel_driven true datafusion.execution.parquet.allow_single_file_parallelism true datafusion.execution.parquet.binary_as_string false datafusion.execution.parquet.bloom_filter_fpp NULL @@ -366,6 +367,7 @@ datafusion.execution.max_spill_file_size_bytes 134217728 Maximum size in bytes f datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. datafusion.execution.objectstore_writer_buffer_size 10485760 Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. +datafusion.execution.parquet.allow_morsel_driven true (reading) If true, the parquet reader will share work between partitions using morsel-driven execution. This can help mitigate data skew. datafusion.execution.parquet.allow_single_file_parallelism true (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. datafusion.execution.parquet.binary_as_string false (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt b/datafusion/sqllogictest/test_files/limit_pruning.slt index 72672b707d4f5..f6c54f26d25fe 100644 --- a/datafusion/sqllogictest/test_files/limit_pruning.slt +++ b/datafusion/sqllogictest/test_files/limit_pruning.slt @@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary; query TT explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3; ---- -Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] +Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] # limit_pruned_row_groups=0 total → 0 matched # because of order by, scan needs to preserve sort, so limit pruning is disabled @@ -71,8 +71,8 @@ query TT explain analyze select * from tracking_data where species > 'M' AND s >= 50 order by species limit 3; ---- Plan with Metrics -01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=161.67µs, output_bytes=72.0 B] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=1ns, output_bytes=142.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=521, metadata_load_time=253.42µs, scan_efficiency_ratio=22% (521/2.35 K)] statement ok drop table tracking_data; diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index be713b963b451..63a9e2395ced9 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -145,7 +145,9 @@ logical_plan 02)--TableScan: test_table projection=[int_col, string_col] physical_plan 01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] -02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet +02)--SortExec: expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----MergePartitionsExec: partitions=3→2 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet # Perform queries using MIN and MAX diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index fd3a40ca17079..1c2b2a0243d62 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -120,7 +120,9 @@ logical_plan 02)--TableScan: test_table projection=[int_col, bigint_col, nulls_first_col, nulls_last_col] physical_plan 01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST] -02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], file_type=parquet +02)--SortExec: expr=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], preserve_partitioning=[true] +03)----MergePartitionsExec: partitions=3→2 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], file_type=parquet # Another planning test, but project on a column with unsupported statistics # We should be able to ignore this and look at only the relevant statistics @@ -137,7 +139,10 @@ logical_plan physical_plan 01)ProjectionExec: expr=[string_col@0 as string_col] 02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], file_type=parquet +03)----SortExec: expr=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], preserve_partitioning=[true] +04)------ProjectionExec: expr=[string_col@1 as string_col, int_col@0 as int_col, bigint_col@2 as bigint_col, nulls_first_col@3 as nulls_first_col, nulls_last_col@4 as nulls_last_col] +05)--------MergePartitionsExec: partitions=3→2 +06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, string_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], file_type=parquet # Clean up & recreate but sort on descending column statement ok @@ -171,7 +176,9 @@ logical_plan 02)--TableScan: test_table projection=[descending_col, bigint_col] physical_plan 01)SortPreservingMergeExec: [descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[descending_col, bigint_col], output_ordering=[descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet +02)--SortExec: expr=[descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST], preserve_partitioning=[true] +03)----MergePartitionsExec: partitions=3→2 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[descending_col, bigint_col], output_ordering=[descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet # Clean up & re-create with partition columns in sort order statement ok @@ -207,7 +214,9 @@ logical_plan 02)--TableScan: test_table projection=[int_col, bigint_col, partition_col] physical_plan 01)SortPreservingMergeExec: [partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, partition_col], output_ordering=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet +02)--SortExec: expr=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], preserve_partitioning=[true] +03)----MergePartitionsExec: partitions=3→2 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, partition_col], output_ordering=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet # Clean up & re-create with overlapping column in sort order # This will test the ability to sort files with overlapping statistics @@ -240,7 +249,9 @@ logical_plan 02)--TableScan: test_table projection=[int_col, bigint_col, overlapping_col] physical_plan 01)SortPreservingMergeExec: [overlapping_col@2 ASC NULLS LAST] -02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, overlapping_col], output_ordering=[overlapping_col@2 ASC NULLS LAST], file_type=parquet +02)--SortExec: expr=[overlapping_col@2 ASC NULLS LAST], preserve_partitioning=[true] +03)----MergePartitionsExec: partitions=3→2 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, overlapping_col], output_ordering=[overlapping_col@2 ASC NULLS LAST], file_type=parquet # Clean up & re-create with constant column in sort order # This will require a sort because the # of required file groups (3) @@ -274,4 +285,6 @@ logical_plan 02)--TableScan: test_table projection=[constant_col] physical_plan 01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST] -02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], output_ordering=[constant_col@0 ASC NULLS LAST], file_type=parquet +02)--SortExec: expr=[constant_col@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----MergePartitionsExec: partitions=3→2 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], output_ordering=[constant_col@0 ASC NULLS LAST], file_type=parquet diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 297094fab16e7..f13c4335a72c7 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -551,8 +551,11 @@ logical_plan 03)----TableScan: high_cardinality_table projection=[value, f_dkey] physical_plan 01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), sum(high_cardinality_table.value)@2 as sum(high_cardinality_table.value)] -02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] +03)----RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3 +04)------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] +05)--------MergePartitionsExec: partitions=5→3 +06)----------DataSourceExec: file_groups={5 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet # Verify results with optimization match results without optimization query TIR rowsort @@ -596,7 +599,8 @@ physical_plan 02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] 03)----RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3 04)------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] -05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +05)--------MergePartitionsExec: partitions=5→3 +06)----------DataSourceExec: file_groups={5 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet]]}, projection=[value, f_dkey], file_type=parquet query TIR rowsort SELECT f_dkey, count(*), sum(value) diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt b/datafusion/sqllogictest/test_files/projection_pushdown.slt index dbb77b33c21b7..8f160537a7d56 100644 --- a/datafusion/sqllogictest/test_files/projection_pushdown.slt +++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt @@ -684,7 +684,9 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] 02)--SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[id, get_field(s@1, value) as multi_struct.s[value]], file_type=parquet +03)----ProjectionExec: expr=[id@0 as id, get_field(s@1, value) as multi_struct.s[value]] +04)------MergePartitionsExec: partitions=5→4 +05)--------DataSourceExec: file_groups={5 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[id, s], file_type=parquet # Verify correctness query II @@ -710,7 +712,9 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3 02)--SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[id, get_field(s@1, value) as multi_struct.s[value]], file_type=parquet, predicate=DynamicFilter [ empty ] +03)----ProjectionExec: expr=[id@0 as id, get_field(s@1, value) as multi_struct.s[value]] +04)------MergePartitionsExec: partitions=5→4 +05)--------DataSourceExec: file_groups={5 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[id, s], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify correctness query II @@ -734,7 +738,9 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3 02)--SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[id, get_field(s@1, value) + 1 as multi_struct.s[value] + Int64(1)], file_type=parquet, predicate=DynamicFilter [ empty ] +03)----ProjectionExec: expr=[id@0 as id, get_field(s@1, value) + 1 as multi_struct.s[value] + Int64(1)] +04)------MergePartitionsExec: partitions=5→4 +05)--------DataSourceExec: file_groups={5 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[id, s], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify correctness query II @@ -762,8 +768,9 @@ physical_plan 02)--SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[id@1 as id, __datafusion_extracted_1@0 as multi_struct.s[value]] 04)------FilterExec: id@1 > 2 -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 -06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_1, id], file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, required_guarantees=[] +05)--------ProjectionExec: expr=[get_field(s@1, value) as __datafusion_extracted_1, id@0 as id] +06)----------MergePartitionsExec: partitions=5→4 +07)------------DataSourceExec: file_groups={5 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[id, s], file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, required_guarantees=[] # Verify correctness query II @@ -788,9 +795,11 @@ logical_plan physical_plan 01)ProjectionExec: expr=[__datafusion_extracted_1@0 as multi_struct.s[label], sum(__datafusion_extracted_2)@1 as sum(multi_struct.s[value])] 02)--AggregateExec: mode=FinalPartitioned, gby=[__datafusion_extracted_1@0 as __datafusion_extracted_1], aggr=[sum(__datafusion_extracted_2)] -03)----RepartitionExec: partitioning=Hash([__datafusion_extracted_1@0], 4), input_partitions=3 +03)----RepartitionExec: partitioning=Hash([__datafusion_extracted_1@0], 4), input_partitions=4 04)------AggregateExec: mode=Partial, gby=[__datafusion_extracted_1@0 as __datafusion_extracted_1], aggr=[sum(__datafusion_extracted_2)] -05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[get_field(s@1, label) as __datafusion_extracted_1, get_field(s@1, value) as __datafusion_extracted_2], file_type=parquet +05)--------ProjectionExec: expr=[get_field(s@0, label) as __datafusion_extracted_1, get_field(s@0, value) as __datafusion_extracted_2] +06)----------MergePartitionsExec: partitions=5→4 +07)------------DataSourceExec: file_groups={5 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[s], file_type=parquet # Verify correctness query TI diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt index ca4a30fa96c35..601b597204816 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt @@ -151,7 +151,8 @@ physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] 02)--CoalescePartitionsExec 03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[] +04)------MergePartitionsExec: partitions=4→2 +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[] query I select max(id) from agg_dyn_test where id > 1; @@ -166,7 +167,8 @@ physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] 02)--CoalescePartitionsExec 03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=CAST(id@0 AS Int64) + 1 > 1 AND DynamicFilter [ empty ] +04)------MergePartitionsExec: partitions=4→2 +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=CAST(id@0 AS Int64) + 1 > 1 AND DynamicFilter [ empty ] # Expect dynamic filter available inside data source query TT @@ -176,7 +178,8 @@ physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] 02)--CoalescePartitionsExec 03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] +04)------MergePartitionsExec: partitions=4→2 +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] # Dynamic filter should not be available for grouping sets query TT @@ -188,7 +191,8 @@ physical_plan 02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[max(agg_dyn_test.id)] 03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), input_partitions=2 04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], aggr=[max(agg_dyn_test.id)] -05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] +05)--------MergePartitionsExec: partitions=4→2 +06)----------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] statement ok drop table agg_dyn_test; diff --git a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt index e2c9fa4237939..1f2842be0d2e9 100644 --- a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt +++ b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt @@ -374,16 +374,16 @@ physical_plan 05)--------RepartitionExec: partitioning=Hash([env@0, time_bin@1], 3), input_partitions=3 06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[avg(a.max_bin_value)] 07)------------ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value] -08)--------------AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@2 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) -09)----------------SortExec: expr=[f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 ASC NULLS LAST], preserve_partitioning=[true] -10)------------------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3 -11)--------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) -12)----------------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value] -13)------------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4] -14)--------------------------CoalescePartitionsExec -15)----------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] -16)------------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -17)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +08)--------------AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@2 as env], aggr=[max(j.value)] +09)----------------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3 +10)------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)] +11)--------------------ProjectionExec: expr=[f_dkey@2 as f_dkey, env@3 as env, timestamp@0 as timestamp, value@1 as value] +12)----------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(f_dkey@2, d_dkey@1)], projection=[timestamp@0, value@1, f_dkey@2, env@3] +13)------------------------CoalescePartitionsExec +14)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +15)------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] +16)--------------------------MergePartitionsExec: partitions=4→3 +17)----------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log AND DynamicFilter [ empty ], pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] # Verify results without subset satisfaction query TPR rowsort @@ -473,13 +473,16 @@ physical_plan 05)--------RepartitionExec: partitioning=Hash([env@0, time_bin@1], 3), input_partitions=3 06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[avg(a.max_bin_value)] 07)------------ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value] -08)--------------AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) -09)----------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value] -10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4] -11)--------------------CoalescePartitionsExec -12)----------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] -13)------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -14)--------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +08)--------------AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@2 as env], aggr=[max(j.value)] +09)----------------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3 +10)------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)] +11)--------------------ProjectionExec: expr=[f_dkey@2 as f_dkey, env@3 as env, timestamp@0 as timestamp, value@1 as value] +12)----------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(f_dkey@2, d_dkey@1)], projection=[timestamp@0, value@1, f_dkey@2, env@3] +13)------------------------CoalescePartitionsExec +14)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +15)------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] +16)--------------------------MergePartitionsExec: partitions=4→3 +17)----------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log AND DynamicFilter [ empty ], pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] # Verify results match with subset satisfaction query TPR rowsort diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 99f26b66d458b..84a7df09d7fb2 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1109,7 +1109,8 @@ logical_plan 02)--TableScan: reversed_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet +02)--MergePartitionsExec: partitions=3→1 +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 4.2: Results must be correct query II @@ -1160,7 +1161,8 @@ logical_plan 02)--TableScan: overlap_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/overlap/file_x.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/overlap/file_y.parquet]]}, projection=[id, value], file_type=parquet +02)--MergePartitionsExec: partitions=2→1 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/overlap/file_x.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/overlap/file_y.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 5.2: Results must be correct query II @@ -1193,7 +1195,8 @@ logical_plan 02)--TableScan: reversed_with_order_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet +02)--MergePartitionsExec: partitions=3→1 +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 6.2: Results must be correct query II @@ -1253,7 +1256,10 @@ EXPLAIN SELECT * FROM correct_parquet ORDER BY id ASC; logical_plan 01)Sort: correct_parquet.id ASC NULLS LAST 02)--TableScan: correct_parquet projection=[id, value] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/a_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/c_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--MergePartitionsExec: partitions=3→1 +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/a_low.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/b_mid.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/c_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 7.2: Results must be correct query II @@ -1279,7 +1285,8 @@ logical_plan 02)--TableScan: correct_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/c_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/a_low.parquet]]}, projection=[id, value], file_type=parquet, reverse_row_groups=true +02)--MergePartitionsExec: partitions=3→1 +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/a_low.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/b_mid.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/c_high.parquet]]}, projection=[id, value], file_type=parquet, reverse_row_groups=true query II SELECT * FROM correct_parquet ORDER BY id DESC; @@ -1332,7 +1339,8 @@ logical_plan 02)--TableScan: desc_reversed_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet]]}, projection=[id, value], file_type=parquet +02)--MergePartitionsExec: partitions=2→1 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet]]}, projection=[id, value], output_ordering=[id@0 DESC], file_type=parquet # Test 8.2: Results must be correct query II @@ -1387,7 +1395,8 @@ logical_plan 02)--TableScan: multi_col_parquet projection=[category, id, value] physical_plan 01)SortExec: expr=[category@0 ASC NULLS LAST, id@1 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col/a_first.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col/b_second.parquet]]}, projection=[category, id, value], file_type=parquet +02)--MergePartitionsExec: partitions=2→1 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col/a_first.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col/b_second.parquet]]}, projection=[category, id, value], output_ordering=[category@0 ASC NULLS LAST, id@1 ASC NULLS LAST], file_type=parquet # Test 9.2: Results must be correct query TII @@ -1436,7 +1445,10 @@ EXPLAIN SELECT * FROM multi_col_clean_parquet ORDER BY category ASC, id ASC; logical_plan 01)Sort: multi_col_clean_parquet.category ASC NULLS LAST, multi_col_clean_parquet.id ASC NULLS LAST 02)--TableScan: multi_col_clean_parquet projection=[category, id, value] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col_clean/x_first.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col_clean/y_second.parquet]]}, projection=[category, id, value], output_ordering=[category@0 ASC NULLS LAST, id@1 ASC NULLS LAST], file_type=parquet +physical_plan +01)SortExec: expr=[category@0 ASC NULLS LAST, id@1 ASC NULLS LAST], preserve_partitioning=[false] +02)--MergePartitionsExec: partitions=2→1 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col_clean/x_first.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/multi_col_clean/y_second.parquet]]}, projection=[category, id, value], output_ordering=[category@0 ASC NULLS LAST, id@1 ASC NULLS LAST], file_type=parquet # Test 9.3b: Results must be correct query TII @@ -1464,7 +1476,10 @@ EXPLAIN SELECT * FROM correct_with_order_parquet ORDER BY id ASC; logical_plan 01)Sort: correct_with_order_parquet.id ASC NULLS LAST 02)--TableScan: correct_with_order_parquet projection=[id, value] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/a_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/c_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--MergePartitionsExec: partitions=3→1 +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/a_low.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/b_mid.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/correct/c_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 10.2: Results must be correct query II diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index e48f0a7c92276..8a8c6983444b2 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -91,6 +91,7 @@ The following configuration settings are available: | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.allow_morsel_driven | true | (reading) If true, the parquet reader will share work between partitions using morsel-driven execution. This can help mitigate data skew. | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in rows |