diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 895f2fdd4ff3a..2cc9360b153d2 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -727,8 +727,8 @@ impl ExecutionPlan for SampleExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let mut stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let ratio = self.upper_bound - self.lower_bound; // Scale statistics by sampling ratio (inexact due to randomness) @@ -741,7 +741,7 @@ impl ExecutionPlan for SampleExec { .map(|n| (n as f64 * ratio) as usize) .to_inexact(); - Ok(stats) + Ok(Arc::new(stats)) } } diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index f51d0a1e36536..75956a93a4804 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -183,12 +183,12 @@ impl ExecutionPlan for CustomExecutionPlan { Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 })) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap(); - Ok(Statistics { + Ok(Arc::new(Statistics { num_rows: Precision::Exact(batch.num_rows()), total_byte_size: Precision::Absent, column_statistics: self @@ -207,7 +207,7 @@ impl ExecutionPlan for CustomExecutionPlan { ..Default::default() }) .collect(), - }) + })) } } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 03513ec730de7..da5ec87df18dd 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -181,11 +181,11 @@ impl ExecutionPlan for StatisticsValidation { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - Ok(Statistics::new_unknown(&self.schema)) + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { - Ok(self.stats.clone()) + Ok(Arc::new(self.stats.clone())) } } } @@ -238,7 +238,7 @@ async fn sql_basic() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); // the statistics should be those of the source - assert_eq!(stats, physical_plan.partition_statistics(None)?); + assert_eq!(stats, *physical_plan.partition_statistics(None)?); Ok(()) } @@ -278,7 +278,7 @@ async fn sql_limit() -> Result<()> { .collect(), total_byte_size: Precision::Absent }, - physical_plan.partition_statistics(None)? + *physical_plan.partition_statistics(None)? ); let df = ctx @@ -287,7 +287,7 @@ async fn sql_limit() -> Result<()> { .unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is larger than the original number of lines, statistics remain unchanged - assert_eq!(stats, physical_plan.partition_statistics(None)?); + assert_eq!(stats, *physical_plan.partition_statistics(None)?); Ok(()) } @@ -307,7 +307,7 @@ async fn sql_window() -> Result<()> { let result = physical_plan.partition_statistics(None)?; assert_eq!(stats.num_rows, result.num_rows); - let col_stats = result.column_statistics; + let col_stats = &result.column_statistics; assert_eq!(2, col_stats.len()); assert_eq!(stats.column_statistics[1], col_stats[0]); diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index ef0bbfc7f4221..e1ec7a27a542e 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -1176,12 +1176,12 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { - Ok(if partition.is_some() { + fn partition_statistics(&self, partition: Option) -> Result> { + Ok(Arc::new(if partition.is_some() { Statistics::new_unknown(&self.schema) } else { self.stats.clone() - }) + })) } } diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index fa021ed3dcce3..b4f4fb2ad9393 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -255,8 +255,8 @@ mod test { ); // Check the statistics of each partition assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -288,8 +288,8 @@ mod test { create_partition_statistics(2, 8, 1, 2, None); // Check the statistics of each partition assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -322,7 +322,7 @@ mod test { Some((DATE_2025_03_01, DATE_2025_03_04)), ); assert_eq!(statistics.len(), 1); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)]; validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?; @@ -353,8 +353,8 @@ mod test { .map(|idx| sort_exec.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -402,7 +402,7 @@ mod test { }, ], }; - assert_eq!(full_statistics, expected_full_statistic); + assert_eq!(*full_statistics, expected_full_statistic); let statistics = (0..filter.output_partitioning().partition_count()) .map(|idx| filter.partition_statistics(Some(idx))) @@ -431,8 +431,8 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_partition_statistic); - assert_eq!(statistics[1], expected_partition_statistic); + assert_eq!(*statistics[0], expected_partition_statistic); + assert_eq!(*statistics[1], expected_partition_statistic); Ok(()) } @@ -463,13 +463,13 @@ mod test { Some((DATE_2025_03_03, DATE_2025_03_04)), ); // Verify first partition (from first scan) - assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[0], expected_statistic_partition_1); // Verify second partition (from first scan) - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Verify third partition (from second scan - same as first partition) - assert_eq!(statistics[2], expected_statistic_partition_1); + assert_eq!(*statistics[2], expected_statistic_partition_1); // Verify fourth partition (from second scan - same as second partition) - assert_eq!(statistics[3], expected_statistic_partition_2); + assert_eq!(*statistics[3], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -518,8 +518,8 @@ mod test { ColumnStatistics::new_unknown(), ], }; - assert_eq!(stats[0], expected_stats); - assert_eq!(stats[1], expected_stats); + assert_eq!(*stats[0], expected_stats); + assert_eq!(*stats[1], expected_stats); // Verify the execution results let partitions = execute_stream_partitioned( @@ -625,8 +625,8 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -670,7 +670,7 @@ mod test { ); expected_full_statistics.num_rows = Precision::Inexact(4); expected_full_statistics.total_byte_size = Precision::Absent; - assert_eq!(full_statistics, expected_full_statistics); + assert_eq!(*full_statistics, expected_full_statistics); // Test partition_statistics(Some(idx)) - returns partition-specific statistics // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] @@ -699,8 +699,8 @@ mod test { .map(|idx| nested_loop_join.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -729,7 +729,7 @@ mod test { .map(|idx| coalesce_partitions.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 1); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)]; @@ -746,20 +746,20 @@ mod test { .map(|idx| local_limit.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - let mut expected_0 = statistics[0].clone(); + let mut expected_0 = Statistics::clone(&statistics[0]); expected_0.column_statistics = expected_0 .column_statistics .into_iter() .map(|c| c.to_inexact()) .collect(); - let mut expected_1 = statistics[1].clone(); + let mut expected_1 = Statistics::clone(&statistics[1]); expected_1.column_statistics = expected_1 .column_statistics .into_iter() .map(|c| c.to_inexact()) .collect(); - assert_eq!(statistics[0], expected_0); - assert_eq!(statistics[1], expected_1); + assert_eq!(*statistics[0], expected_0); + assert_eq!(*statistics[1], expected_1); Ok(()) } @@ -781,7 +781,7 @@ mod test { 4, Some((DATE_2025_03_01, DATE_2025_03_02)), ); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); Ok(()) } @@ -849,7 +849,7 @@ mod test { ], }; - assert_eq!(&p0_statistics, &expected_p0_statistics); + assert_eq!(*p0_statistics, expected_p0_statistics); let expected_p1_statistics = Statistics { num_rows: Precision::Inexact(2), @@ -869,7 +869,7 @@ mod test { }; let p1_statistics = aggregate_exec_partial.partition_statistics(Some(1))?; - assert_eq!(&p1_statistics, &expected_p1_statistics); + assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( aggregate_exec_partial.clone(), @@ -891,10 +891,10 @@ mod test { )?); let p0_statistics = agg_final.partition_statistics(Some(0))?; - assert_eq!(&p0_statistics, &expected_p0_statistics); + assert_eq!(*p0_statistics, expected_p0_statistics); let p1_statistics = agg_final.partition_statistics(Some(1))?; - assert_eq!(&p1_statistics, &expected_p1_statistics); + assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( agg_final.clone(), @@ -935,8 +935,8 @@ mod test { ], }; - assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?); - assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(1))?); + assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(0))?); + assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(1))?); validate_statistics_with_data( agg_partial.clone(), vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty], @@ -962,8 +962,8 @@ mod test { agg_partial.schema(), )?); - assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(0))?); - assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(1))?); + assert_eq!(empty_stat, *agg_final.partition_statistics(Some(0))?); + assert_eq!(empty_stat, *agg_final.partition_statistics(Some(1))?); validate_statistics_with_data( agg_final, @@ -999,7 +999,7 @@ mod test { column_statistics: vec![ColumnStatistics::new_unknown()], }; - assert_eq!(&expect_stat, &agg_final.partition_statistics(Some(0))?); + assert_eq!(expect_stat, *agg_final.partition_statistics(Some(0))?); // Verify that the aggregate final result has exactly one partition with one row let mut partitions = execute_stream_partitioned( @@ -1033,13 +1033,13 @@ mod test { &schema, None, ); - assert_eq!(actual, expected); + assert_eq!(*actual, expected); all_batches.push(batches); } let actual = plan.partition_statistics(None)?; let expected = compute_record_batch_statistics(&all_batches, &schema, None); - assert_eq!(actual, expected); + assert_eq!(*actual, expected); Ok(()) } @@ -1070,7 +1070,7 @@ mod test { // All partitions should have the same statistics for stat in statistics.iter() { - assert_eq!(stat, &expected_stats); + assert_eq!(**stat, expected_stats); } // Verify that the result has exactly 3 partitions @@ -1135,7 +1135,7 @@ mod test { )?); let result = repartition.partition_statistics(Some(0))?; - assert_eq!(result, Statistics::new_unknown(&scan_schema)); + assert_eq!(*result, Statistics::new_unknown(&scan_schema)); // Verify that the result has exactly 0 partitions let partitions = execute_stream_partitioned( @@ -1174,8 +1174,8 @@ mod test { ColumnStatistics::new_unknown(), ], }; - assert_eq!(stats[0], expected_stats); - assert_eq!(stats[1], expected_stats); + assert_eq!(*stats[0], expected_stats); + assert_eq!(*stats[1], expected_stats); // Verify the repartition execution results let partitions = @@ -1282,8 +1282,8 @@ mod test { ], }; - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Verify the statistics match actual execution results let expected_stats = vec![ diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index f8c91ba272a9f..8b0dabbd78846 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -942,8 +942,8 @@ impl ExecutionPlan for TestScan { internal_err!("TestScan is for testing optimizer only, not for execution") } - fn partition_statistics(&self, _partition: Option) -> Result { - Ok(Statistics::new_unknown(&self.schema)) + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } // This is the key method - implement sort pushdown diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index a9061849795c7..c23f53b8db6b3 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -461,7 +461,10 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 4); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; + let stat_cols = physical_plan + .partition_statistics(None)? + .column_statistics + .clone(); assert_eq!(stat_cols.len(), 4); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(3)); @@ -485,7 +488,10 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 2); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; + let stat_cols = physical_plan + .partition_statistics(None)? + .column_statistics + .clone(); assert_eq!(stat_cols.len(), 2); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(1)); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c3e5cabce7bc2..821e39f14c297 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -777,7 +777,7 @@ impl DataSource for FileScanConfig { SchedulingType::Cooperative } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { // Get statistics for a specific partition // Note: FileGroup statistics include partition columns (computed from partition_values) @@ -787,22 +787,28 @@ impl DataSource for FileScanConfig { // Project the statistics based on the projection let output_schema = self.projected_schema()?; return if let Some(projection) = self.file_source.projection() { - projection.project_statistics(stat.clone(), &output_schema) + Ok(Arc::new( + projection.project_statistics(stat.clone(), &output_schema)?, + )) } else { - Ok(stat.clone()) + Ok(Arc::new(stat.clone())) }; } // If no statistics available for this partition, return unknown - Ok(Statistics::new_unknown(self.projected_schema()?.as_ref())) + Ok(Arc::new(Statistics::new_unknown( + self.projected_schema()?.as_ref(), + ))) } else { // Return aggregate statistics across all partitions let statistics = self.statistics(); let projection = self.file_source.projection(); let output_schema = self.projected_schema()?; if let Some(projection) = &projection { - projection.project_statistics(statistics.clone(), &output_schema) + Ok(Arc::new( + projection.project_statistics(statistics.clone(), &output_schema)?, + )) } else { - Ok(statistics) + Ok(Arc::new(statistics)) } } } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..fda44120cd80a 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -196,26 +196,26 @@ impl DataSource for MemorySourceConfig { SchedulingType::Cooperative } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { // Compute statistics for a specific partition if let Some(batches) = self.partitions.get(partition) { - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( from_ref(batches), &self.schema, self.projection.clone(), - )) + ))) } else { // Invalid partition index - Ok(Statistics::new_unknown(&self.projected_schema)) + Ok(Arc::new(Statistics::new_unknown(&self.projected_schema))) } } else { // Compute statistics across all partitions - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &self.partitions, &self.schema, self.projection.clone(), - )) + ))) } } @@ -953,7 +953,7 @@ mod tests { let values = MemorySourceConfig::try_new_as_values(schema, data)?; assert_eq!( - values.partition_statistics(None)?, + *values.partition_statistics(None)?, Statistics { num_rows: Precision::Exact(rows), total_byte_size: Precision::Exact(8), // not important diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 05028ed0f4683..e520cb60d8ebc 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -156,7 +156,7 @@ pub trait DataSource: Send + Sync + Debug { /// Returns statistics for a specific partition, or aggregate statistics /// across all partitions if `partition` is `None`. - fn partition_statistics(&self, partition: Option) -> Result; + fn partition_statistics(&self, partition: Option) -> Result>; /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; @@ -318,7 +318,7 @@ impl ExecutionPlan for DataSourceExec { Some(self.data_source.metrics().clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.data_source.partition_statistics(partition) } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 75721951f8d81..98e6fb7d69c65 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -244,7 +244,7 @@ impl ExecutionPlan for OutputRequirementExec { unreachable!(); } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 85999938510b6..00e063ae1c5ed 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1416,9 +1416,9 @@ impl ExecutionPlan for AggregateExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let child_statistics = self.input().partition_statistics(partition)?; - self.statistics_inner(&child_statistics) + Ok(Arc::new(self.statistics_inner(&child_statistics)?)) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -2500,16 +2500,19 @@ mod tests { Ok(Box::pin(stream)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics( + &self, + partition: Option, + ) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(self.schema().as_ref())); + return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } let (_, batches) = some_data(); - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &[batches], &self.schema(), None, - )) + ))) } } diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index a59d062929974..ad9a7af5c898d 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -237,7 +237,7 @@ impl ExecutionPlan for BufferExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 663b0b51ea592..e8106ef6aab1d 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -219,10 +219,9 @@ impl ExecutionPlan for CoalesceBatchesExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 39906d3680a4b..458c4f9806e56 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -236,10 +236,9 @@ impl ExecutionPlan for CoalescePartitionsExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { - self.input - .partition_statistics(None)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, _partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index 5f0040b3ddce4..81ab92232e927 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -303,7 +303,7 @@ impl ExecutionPlan for CooperativeExec { Ok(make_cooperative(child_stream)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 44148f2d0e885..a7d5240294dd0 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1176,14 +1176,17 @@ mod tests { todo!() } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics( + &self, + partition: Option, + ) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(self.schema().as_ref())); + return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } match self { Self::Panic => panic!("expected panic"), Self::Error => Err(internal_datafusion_err!("expected error")), - Self::Ok => Ok(Statistics::new_unknown(self.schema().as_ref())), + Self::Ok => Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))), } } } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index e4d4da4e88fc5..23fd28462349f 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -156,7 +156,7 @@ impl ExecutionPlan for EmptyExec { )?)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { assert_or_internal_err!( partition < self.partitions, @@ -183,7 +183,7 @@ impl ExecutionPlan for EmptyExec { }); } - Ok(stats) + Ok(Arc::new(stats)) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 681a1345d8332..522fb2b7a3d24 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -477,7 +477,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// If statistics are not available, should return [`Statistics::new_unknown`] /// (the default), not an error. /// If `partition` is `None`, it returns statistics for the entire plan. - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(idx) = partition { // Validate partition index let partition_count = self.properties().partitioning.partition_count(); @@ -488,7 +488,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { partition_count ); } - Ok(Statistics::new_unknown(&self.schema())) + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } /// Returns `true` if a limit can be safely pushed down through this @@ -1557,7 +1557,10 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { unimplemented!() } } @@ -1620,7 +1623,10 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { unimplemented!() } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ecea4e6ebe9f7..4e633f62082b6 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -388,7 +388,7 @@ impl FilterExec { let schema = input.schema(); let stats = Self::statistics_helper( &schema, - input.partition_statistics(None)?, + Arc::unwrap_or_clone(input.partition_statistics(None)?), predicate, default_selectivity, )?; @@ -555,15 +555,16 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. - fn partition_statistics(&self, partition: Option) -> Result { - let input_stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stats = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let stats = Self::statistics_helper( &self.input.schema(), input_stats, self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(Arc::new(stats.project(self.projection.as_ref()))) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -1336,7 +1337,7 @@ mod tests { ]; let _ = exp_col_stats .into_iter() - .zip(statistics.column_statistics) + .zip(statistics.column_statistics.clone()) .map(|(expected, actual)| { if let Some(val) = actual.min_value.get_value() { if val.data_type().is_floating() { @@ -1407,7 +1408,7 @@ mod tests { )), )); // Since filter predicate passes all entries, statistics after filter shouldn't change. - let expected = input.partition_statistics(None)?.column_statistics; + let expected = input.partition_statistics(None)?.column_statistics.clone(); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); let statistics = filter.partition_statistics(None)?; @@ -1590,7 +1591,7 @@ mod tests { }], }; - assert_eq!(filter_statistics, expected_filter_statistics); + assert_eq!(*filter_statistics, expected_filter_statistics); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 342cb7e70a78b..71cc7a17ac6e2 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -374,12 +374,13 @@ impl ExecutionPlan for CrossJoinExec { } } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // Get the all partitions statistics of the left - let left_stats = self.left.partition_statistics(None)?; - let right_stats = self.right.partition_statistics(partition)?; + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = + Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); - Ok(stats_cartesian_product(left_stats, right_stats)) + Ok(Arc::new(stats_cartesian_product(left_stats, right_stats))) } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index eda7e93effa2c..59d09aa3c3032 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1417,16 +1417,18 @@ impl ExecutionPlan for HashJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(None)?); let stats = estimate_join_statistics( - self.left.partition_statistics(None)?, - self.right.partition_statistics(None)?, + left_stats, + right_stats, &self.on, &self.join_type, &self.join_schema, @@ -1434,7 +1436,7 @@ impl ExecutionPlan for HashJoinExec { // Project statistics if there is a projection let stats = stats.project(self.projection.as_ref()); // Apply fetch limit to statistics - stats.with_fetch(self.fetch, 0, 1) + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } /// Tries to push `projection` down through `hash_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 4fb7dabf673d4..88aae7c8258b2 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -634,7 +634,7 @@ impl ExecutionPlan for NestedLoopJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // NestedLoopJoinExec is designed for joins without equijoin keys in the // ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join // predicates are stored in `self.filter`, but `estimate_join_statistics` @@ -648,11 +648,11 @@ impl ExecutionPlan for NestedLoopJoinExec { // so we always request overall stats with `None`. Right side can have // multiple partitions, so we forward the partition parameter to get // partition-specific statistics when requested. - let left_stats = self.left.partition_statistics(None)?; - let right_stats = match partition { + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = Arc::unwrap_or_clone(match partition { Some(partition) => self.right.partition_statistics(Some(partition))?, None => self.right.partition_statistics(None)?, - }; + }); let stats = estimate_join_statistics( left_stats, @@ -662,7 +662,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(Arc::new(stats.project(self.projection.as_ref()))) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index b34e811f91920..23e7578f3ad96 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -534,7 +534,7 @@ impl ExecutionPlan for SortMergeJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // SortMergeJoinExec uses symmetric hash partitioning where both left and right // inputs are hash-partitioned on the join keys. This means partition `i` of the // left input is joined with partition `i` of the right input. @@ -546,13 +546,16 @@ impl ExecutionPlan for SortMergeJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - estimate_join_statistics( - self.left.partition_statistics(partition)?, - self.right.partition_statistics(partition)?, + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(partition)?); + let right_stats = + Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); + Ok(Arc::new(estimate_join_statistics( + left_stats, + right_stats, &self.on, &self.join_type, &self.schema, - ) + )?)) } /// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index a78e5c067ff1a..2e33c508f06d4 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -224,10 +224,9 @@ impl ExecutionPlan for GlobalLimitExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, self.skip, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(self.fetch, self.skip, 1)?)) } fn fetch(&self) -> Option { @@ -392,10 +391,9 @@ impl ExecutionPlan for LocalLimitExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(Some(self.fetch), 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(Some(self.fetch), 0, 1)?)) } fn fetch(&self) -> Option { diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 5dbd7b3032549..18f711bea634a 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -169,7 +169,7 @@ impl ExecutionPlan for PlaceholderRowExec { Ok(Box::pin(cooperative(ms))) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let batches = self .data() .expect("Create single row placeholder RecordBatch should not fail"); @@ -180,11 +180,11 @@ impl ExecutionPlan for PlaceholderRowExec { None => vec![batches; self.partitions], }; - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &batches, &self.schema, None, - )) + ))) } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index db3a71fc70ae5..ac17aa30d6a5a 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -354,12 +354,15 @@ impl ExecutionPlan for ProjectionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stats = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let output_schema = self.schema(); - self.projector - .projection() - .project_statistics(input_stats, &output_schema) + Ok(Arc::new( + self.projector + .projection() + .project_statistics(input_stats, &output_schema)?, + )) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index da4329e2cc2ae..5ff51b9ff12dc 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1086,11 +1086,11 @@ impl ExecutionPlan for RepartitionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { let partition_count = self.partitioning().partition_count(); if partition_count == 0 { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } assert_or_internal_err!( @@ -1100,7 +1100,7 @@ impl ExecutionPlan for RepartitionExec { partition_count ); - let mut stats = self.input.partition_statistics(None)?; + let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?); // Distribute statistics across partitions stats.num_rows = stats @@ -1121,7 +1121,7 @@ impl ExecutionPlan for RepartitionExec { .map(|_| ColumnStatistics::new_unknown()) .collect(); - Ok(stats) + Ok(Arc::new(stats)) } else { self.input.partition_statistics(None) } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 0dbb75f2ef47a..ac59b8f70f422 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -338,7 +338,7 @@ impl ExecutionPlan for PartialSortExec { Some(self.metrics_set.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index b3ea548d53750..3e0c582710d90 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1325,16 +1325,14 @@ impl ExecutionPlan for SortExec { Some(self.metrics_set.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - if !self.preserve_partitioning() { - return self - .input - .partition_statistics(None)? - .with_fetch(self.fetch, 0, 1); - } - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let p = if !self.preserve_partitioning() { + None + } else { + partition + }; + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(p)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 763b72a660486..f53949bb493ef 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -385,7 +385,7 @@ impl ExecutionPlan for SortPreservingMergeExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics(&self, _partition: Option) -> Result> { self.input.partition_statistics(None) } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 0e7b900eb6fcc..d96f4d6d2a227 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -169,11 +169,11 @@ impl ExecutionPlan for TestMemoryExec { unimplemented!() } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - Ok(Statistics::new_unknown(&self.schema)) + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { - self.statistics_inner() + Ok(Arc::new(self.statistics_inner()?)) } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index d628fb819f85c..418d27b7a4683 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -254,9 +254,9 @@ impl ExecutionPlan for MockExec { } // Panics if one of the batches is an error - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema)); + return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } let data: Result> = self .data @@ -269,11 +269,11 @@ impl ExecutionPlan for MockExec { let data = data?; - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &[data], &self.schema, None, - )) + ))) } } @@ -483,15 +483,15 @@ impl ExecutionPlan for BarrierExec { Ok(builder.build()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema)); + return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &self.data, &self.schema, None, - )) + ))) } } @@ -671,12 +671,12 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { - Ok(if partition.is_some() { + fn partition_statistics(&self, partition: Option) -> Result> { + Ok(Arc::new(if partition.is_some() { Statistics::new_unknown(&self.schema) } else { self.stats.clone() - }) + })) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 9fc02e730d022..fbfbf343669a8 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -323,7 +323,7 @@ impl ExecutionPlan for UnionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition_idx) = partition { // For a specific partition, find which input it belongs to let mut remaining_idx = partition_idx; @@ -336,19 +336,25 @@ impl ExecutionPlan for UnionExec { remaining_idx -= input_partition_count; } // If we get here, the partition index is out of bounds - Ok(Statistics::new_unknown(&self.schema())) + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } else { // Collect statistics from all inputs let stats = self .inputs .iter() - .map(|input_exec| input_exec.partition_statistics(None)) + .map(|input_exec| { + input_exec + .partition_statistics(None) + .map(Arc::unwrap_or_clone) + }) .collect::>>()?; - Ok(stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + Ok(Arc::new( + stats + .into_iter() + .reduce(stats_union) + .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), + )) } } @@ -649,17 +655,22 @@ impl ExecutionPlan for InterleaveExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let stats = self .inputs .iter() - .map(|stat| stat.partition_statistics(partition)) + .map(|stat| { + stat.partition_statistics(partition) + .map(Arc::unwrap_or_clone) + }) .collect::>>()?; - Ok(stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + Ok(Arc::new( + stats + .into_iter() + .reduce(stats_union) + .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), + )) } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index a31268b9c6856..2d2ff83502034 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -380,9 +380,10 @@ impl ExecutionPlan for BoundedWindowAggExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stat = self.input.partition_statistics(partition)?; - self.statistics_helper(input_stat) + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stat = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(self.statistics_helper(input_stat)?)) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 0a146d51d62da..38b418f7beed0 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -284,8 +284,9 @@ impl ExecutionPlan for WindowAggExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stat = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stat = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let win_cols = self.window_expr.len(); let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max... @@ -295,11 +296,11 @@ impl ExecutionPlan for WindowAggExec { for _ in 0..win_cols { column_statistics.push(ColumnStatistics::new_unknown()) } - Ok(Statistics { + Ok(Arc::new(Statistics { num_rows: input_stat.num_rows, column_statistics, total_byte_size: Precision::Absent, - }) + })) } } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 4c7f77e0ff984..3bb58a8296f9b 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -231,8 +231,8 @@ impl ExecutionPlan for WorkTableExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { - Ok(Statistics::new_unknown(&self.schema())) + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } /// Injects run-time state into this `WorkTableExec`.