diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b1d387ea7455..fc1d3b8887a2 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -862,14 +862,6 @@ impl HashJoinExec { return false; } - // Bounds and membership filters derived from the build side do not - // account for null-equal matching: a probe-side NULL key evaluates - // such predicates to NULL and would be pruned, even though it can - // match a build-side NULL when nulls compare equal. - if self.null_equality == NullEquality::NullEqualsNull { - return false; - } - // `preserve_file_partitions` can report Hash partitioning for Hive-style // file groups, but those partitions are not actually hash-distributed. // Partitioned dynamic filters rely on hash routing, so disable them in @@ -1363,6 +1355,8 @@ impl ExecutionPlan for HashJoinExec { filter, on_right, repartition_random_state, + self.null_equality, + self.null_aware, )) }))) }) @@ -6628,7 +6622,7 @@ mod tests { } #[test] - fn test_dynamic_filter_pushdown_rejects_null_equal_join() -> Result<()> { + fn test_dynamic_filter_pushdown_allowed_for_null_equal_join() -> Result<()> { let (_, _, on) = build_schema_and_on()?; let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1])); let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1])); @@ -6651,7 +6645,9 @@ mod tests { false, )?; - assert!(!join.allow_join_dynamic_filter_pushdown(session_config.options())); + // Null-equal joins keep dynamic filter pushdown: the pushed predicate carries an + // `IS NULL` disjunct so a probe-side NULL still reaches the join. + assert!(join.allow_join_dynamic_filter_pushdown(session_config.options())); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 0af4015ff723..6c0e95175e46 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -33,11 +33,13 @@ use crate::joins::hash_join::partitioned_hash_eval::{ use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; +use datafusion_common::{ + DataFusionError, NullEquality, Result, ScalarValue, SharedResult, +}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ - BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit, + BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, IsNullExpr, lit, }; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; @@ -255,6 +257,12 @@ pub(crate) struct SharedBuildAccumulator { repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, + /// Null equality of the join. Under `NullEqualsNull` a probe-side NULL can match a + /// build-side NULL, so the pushed filter must keep NULL rows here too. + null_equality: NullEquality, + /// Null-aware anti join (`NOT IN`). A probe-side NULL must reach the join so its + /// three-valued logic can collapse the result, so the pushed filter keeps NULL rows. + null_aware: bool, } /// Strategy for filter pushdown (decided at collection time) @@ -351,6 +359,7 @@ impl SharedBuildAccumulator { /// We cannot build a partial filter from some partitions - it would incorrectly eliminate /// valid join results. We must wait until we have complete information from ALL /// relevant partitions before updating the dynamic filter. + #[expect(clippy::too_many_arguments)] pub(crate) fn new_from_partition_mode( partition_mode: PartitionMode, left_child: &dyn ExecutionPlan, @@ -358,6 +367,8 @@ impl SharedBuildAccumulator { dynamic_filter: Arc, on_right: Vec, repartition_random_state: SeededRandomState, + null_equality: NullEquality, + null_aware: bool, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -404,6 +415,8 @@ impl SharedBuildAccumulator { on_right, repartition_random_state, probe_schema: right_child.schema(), + null_equality, + null_aware, } } @@ -579,7 +592,8 @@ impl SharedBuildAccumulator { if let Some(filter_expr) = combine_membership_and_bounds(membership_expr, bounds_expr) { - self.dynamic_filter.update(filter_expr)?; + self.dynamic_filter + .update(self.preserve_probe_nulls(filter_expr))?; } } PartitionStatus::Pending => { @@ -685,12 +699,51 @@ impl SharedBuildAccumulator { )?) as Arc }; - self.dynamic_filter.update(filter_expr)?; + self.dynamic_filter + .update(self.preserve_probe_nulls(filter_expr))?; } } Ok(()) } + + /// Keeps probe rows with a NULL key when the join semantics need them. + /// + /// The build-side predicate drops probe rows whose key is NULL. A null-aware anti join + /// (`NOT IN`) needs that NULL to reach the join so three-valued logic can collapse the + /// result, and a null-equal join needs it to match a build-side NULL. OR-ing `key IS NULL` + /// keeps those rows while preserving the filter's selectivity for the rest; the join refines + /// whatever the widened filter lets through. + fn preserve_probe_nulls( + &self, + filter_expr: Arc, + ) -> Arc { + if self.null_equality != NullEquality::NullEqualsNull && !self.null_aware { + return filter_expr; + } + // Only a key that can actually be NULL needs the disjunct; a NOT NULL key never widens. + // Null-aware joins are single-key; null-equal joins can be multi-key, so OR every nullable + // key. If every key is NOT NULL the filter is left untouched, at full selectivity. + let any_key_is_null = self + .on_right + .iter() + // Widen on unresolved nullability: an extra NULL row is safe, a dropped one isn't. + .filter(|key| key.nullable(&self.probe_schema).unwrap_or(true)) + .map(|key| { + Arc::new(IsNullExpr::new(Arc::clone(key))) as Arc + }) + .reduce(|acc, is_null| { + Arc::new(BinaryExpr::new(acc, Operator::Or, is_null)) + as Arc + }); + // Cheap null check first short-circuits before the costlier dynamic filter. + match any_key_is_null { + Some(any_key_is_null) => { + Arc::new(BinaryExpr::new(any_key_is_null, Operator::Or, filter_expr)) + } + None => filter_expr, + } + } } impl fmt::Debug for SharedBuildAccumulator { @@ -722,6 +775,8 @@ pub(super) fn make_partitioned_accumulator_for_test( on_right: vec![], repartition_random_state: SeededRandomState::with_seed(1), probe_schema, + null_equality: NullEquality::NullEqualsNothing, + null_aware: false, } } @@ -741,6 +796,7 @@ pub(super) fn completed_partitions_for_test(acc: &SharedBuildAccumulator) -> usi #[cfg(test)] mod tests { use super::*; + use datafusion_physical_expr::expressions::Column; fn partitioned_state(acc: &SharedBuildAccumulator) -> (Vec, usize) { let guard = acc.inner.lock(); @@ -810,4 +866,59 @@ mod tests { assert!(matches!(partitions[0], PartitionStatus::CanceledUnknown)); assert_eq!(completed, 1); } + + fn null_equal_accumulator( + probe_schema: Arc, + on_right: Vec, + ) -> SharedBuildAccumulator { + SharedBuildAccumulator { + inner: Mutex::new(AccumulatorState { + data: AccumulatedBuildData::Partitioned { + partitions: vec![PartitionStatus::Pending; 1], + completed_partitions: 0, + }, + completion: CompletionState::Pending, + }), + completion_notify: Notify::new(), + dynamic_filter: Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))), + on_right, + repartition_random_state: SeededRandomState::with_seed(1), + probe_schema, + null_equality: NullEquality::NullEqualsNull, + null_aware: false, + } + } + + #[test] + fn preserve_probe_nulls_only_widens_nullable_keys() { + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("k_nullable", DataType::Int32, true), + Field::new("k_not_null", DataType::Int32, false), + ])); + let on_right: Vec = vec![ + Arc::new(Column::new("k_nullable", 0)), + Arc::new(Column::new("k_not_null", 1)), + ]; + let acc = null_equal_accumulator(probe_schema, on_right); + + // Only the nullable key earns an IS NULL disjunct; the NOT NULL key is left out. + let widened = acc.preserve_probe_nulls(lit(true)); + assert_eq!(format!("{widened}").matches("IS NULL").count(), 1); + } + + #[test] + fn preserve_probe_nulls_leaves_all_not_null_keys_untouched() { + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let on_right: Vec = + vec![Arc::new(Column::new("a", 0)), Arc::new(Column::new("b", 1))]; + let acc = null_equal_accumulator(probe_schema, on_right); + + // Every key is NOT NULL, so there is nothing to OR in and the filter is returned as-is. + let filter = lit(true); + let result = acc.preserve_probe_nulls(Arc::clone(&filter)); + assert_eq!(format!("{result}"), format!("{filter}")); + } } diff --git a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt index b18f3b3ae7a9..1d12fc33c9a2 100644 --- a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt +++ b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt @@ -451,3 +451,68 @@ DROP TABLE customers_test; statement ok DROP TABLE all_null_banned; + +############# +## Test: dynamic filter pushdown must not drop inner (probe-side) NULLs. +## With join dynamic filter pushdown on, the build-side filter pushed to the probe scan would drop +## inner NULLs, but NOT IN three-valued logic needs them to collapse the result to zero rows. The +## in-memory VALUES scans above never apply the pushed filter, so this case needs a parquet scan. +############# + +statement ok +set datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; + +# Row-level parquet filtering, so the pushed filter actually drops matching rows instead of only +# pruning row groups. Without this the single row group is read whole and the NULL never gets dropped. +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE TABLE asa_outer(id INT) AS VALUES (1), (2), (3); + +statement ok +CREATE TABLE asa_inner(eid INT) AS VALUES (2), (NULL); + +query I +COPY asa_outer TO 'test_files/scratch/null_aware_anti_join/asa_outer.parquet' STORED AS PARQUET; +---- +3 + +query I +COPY asa_inner TO 'test_files/scratch/null_aware_anti_join/asa_inner.parquet' STORED AS PARQUET; +---- +2 + +statement ok +CREATE EXTERNAL TABLE asa_outer_parquet(id INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/null_aware_anti_join/asa_outer.parquet'; + +statement ok +CREATE EXTERNAL TABLE asa_inner_parquet(eid INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/null_aware_anti_join/asa_inner.parquet'; + +# Expected: zero rows. Before the fix the pushed dynamic filter dropped inner NULLs, so the join +# wrongly returned id = 1 and id = 3. +query I +SELECT id FROM asa_outer_parquet WHERE id NOT IN (SELECT eid FROM asa_inner_parquet) ORDER BY id; +---- + +statement ok +DROP TABLE asa_outer; + +statement ok +DROP TABLE asa_inner; + +statement ok +DROP TABLE asa_outer_parquet; + +statement ok +DROP TABLE asa_inner_parquet; + +statement ok +RESET datafusion.execution.parquet.pushdown_filters; + +statement ok +RESET datafusion.optimizer.enable_join_dynamic_filter_pushdown; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index 6cb92025ca36..11645ad70e3c 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -1024,10 +1024,10 @@ drop table int_probe; ######## -# Dynamic filters must not be created for null-equal joins (IS NOT DISTINCT -# FROM, INTERSECT): min/max bounds and membership filters derived from the -# build side evaluate to NULL for probe-side NULL keys and would prune rows -# that can null-match a build-side NULL. +# Null-equal joins (IS NOT DISTINCT FROM, INTERSECT) keep dynamic filter pushdown. +# Min/max bounds and membership filters derived from the build side evaluate to NULL +# for a probe-side NULL key, so the pushed predicate carries an `IS NULL` disjunct that +# lets the probe NULL reach the join and null-match a build-side NULL. ######## statement ok @@ -1049,14 +1049,14 @@ SELECT nej_build.id, nej_probe.id FROM nej_build JOIN nej_probe ON nej_build.id 11 11 NULL NULL -# No DynamicFilter predicate may appear on the probe side of a null-equal join +# The probe side now carries a DynamicFilter for a null-equal join (widened with IS NULL at runtime) query TT EXPLAIN SELECT nej_build.id, nej_probe.id FROM nej_build JOIN nej_probe ON nej_build.id IS NOT DISTINCT FROM nej_probe.id ---- physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], NullsEqual: true 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_build.parquet]]}, projection=[id], file_type=parquet -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_probe.parquet]]}, projection=[id], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_probe.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible statement ok drop table nej_build; @@ -1065,6 +1065,33 @@ statement ok drop table nej_probe; +# Multi-key null-equal join: the IS NULL disjunct covers every nullable key, so a probe row with a +# NULL in either key still reaches the join and null-matches the build side. +statement ok +COPY (SELECT * FROM (VALUES (1, 10), (2, NULL), (NULL, 30)) v(a, b)) TO 'test_files/scratch/push_down_filter_parquet/mnej_probe.parquet' STORED AS PARQUET; + +statement ok +COPY (SELECT * FROM (VALUES (1, 10), (2, NULL)) v(a, b)) TO 'test_files/scratch/push_down_filter_parquet/mnej_build.parquet' STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE mnej_probe STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_parquet/mnej_probe.parquet'; + +statement ok +CREATE EXTERNAL TABLE mnej_build STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_parquet/mnej_build.parquet'; + +query IIII rowsort +SELECT mnej_build.a, mnej_build.b, mnej_probe.a, mnej_probe.b FROM mnej_build JOIN mnej_probe ON (mnej_build.a IS NOT DISTINCT FROM mnej_probe.a) AND (mnej_build.b IS NOT DISTINCT FROM mnej_probe.b) +---- +1 10 1 10 +2 NULL 2 NULL + +statement ok +drop table mnej_build; + +statement ok +drop table mnej_probe; + + # Config reset statement ok RESET datafusion.explain.physical_plan_only;