From e2960893595b422ec99f7e706b81e80566e4ed58 Mon Sep 17 00:00:00 2001 From: Moe Date: Mon, 22 Jun 2026 16:46:33 -0700 Subject: [PATCH 1/6] Kept null-aware anti-join NULLs in the pushed dynamic filter. The hash-join dynamic filter pushed `key IN build_keys` down to the probe scan for null-aware anti joins too. That drops the probe-side NULL, but `NOT IN` three-valued logic needs it to collapse the result to zero rows, so the join silently returned rows. OR `probe_key IS NULL` into the pushed predicate. Non-NULL probe rows still get filtered; only the NULL additionally survives. --- .../physical-plan/src/joins/hash_join/exec.rs | 1 + .../src/joins/hash_join/shared_bounds.rs | 36 +++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b1d387ea74557..a11b89175f3eb 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1363,6 +1363,7 @@ impl ExecutionPlan for HashJoinExec { filter, on_right, repartition_random_state, + self.null_aware, )) }))) }) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 0af4015ff7239..2658a7020cd52 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -37,7 +37,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ - BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit, + BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, IsNullExpr, lit, }; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; @@ -255,6 +255,9 @@ pub(crate) struct SharedBuildAccumulator { repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, + /// Null-aware anti join (`NOT IN`). A probe-side NULL must reach the join so its + /// three-valued logic can collapse the result, so the pushed filter keeps NULL rows. + null_aware: bool, } /// Strategy for filter pushdown (decided at collection time) @@ -358,6 +361,7 @@ impl SharedBuildAccumulator { dynamic_filter: Arc, on_right: Vec, repartition_random_state: SeededRandomState, + null_aware: bool, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -404,6 +408,7 @@ impl SharedBuildAccumulator { on_right, repartition_random_state, probe_schema: right_child.schema(), + null_aware, } } @@ -579,7 +584,8 @@ impl SharedBuildAccumulator { if let Some(filter_expr) = combine_membership_and_bounds(membership_expr, bounds_expr) { - self.dynamic_filter.update(filter_expr)?; + self.dynamic_filter + .update(self.null_aware_filter(filter_expr))?; } } PartitionStatus::Pending => { @@ -685,12 +691,35 @@ impl SharedBuildAccumulator { )?) as Arc }; - self.dynamic_filter.update(filter_expr)?; + self.dynamic_filter + .update(self.null_aware_filter(filter_expr))?; } } Ok(()) } + + /// Wraps a pushdown filter so a null-aware anti join keeps its probe-side NULL rows. + /// + /// The build-side predicate drops probe rows whose key is NULL, but `NOT IN` three-valued + /// logic needs that NULL to reach the join. OR-ing `probe_key IS NULL` preserves the dynamic + /// filter's selectivity for non-NULL rows while letting the NULL through. + fn null_aware_filter( + &self, + filter_expr: Arc, + ) -> Arc { + if !self.null_aware { + return filter_expr; + } + // A null-aware anti join is validated to a single probe key. + let probe_key_is_null: Arc = + Arc::new(IsNullExpr::new(Arc::clone(&self.on_right[0]))); + Arc::new(BinaryExpr::new( + filter_expr, + Operator::Or, + probe_key_is_null, + )) + } } impl fmt::Debug for SharedBuildAccumulator { @@ -722,6 +751,7 @@ pub(super) fn make_partitioned_accumulator_for_test( on_right: vec![], repartition_random_state: SeededRandomState::with_seed(1), probe_schema, + null_aware: false, } } From f30d9bcae5a6f22f738b4da74519353243f48fde Mon Sep 17 00:00:00 2001 From: Moe Date: Mon, 22 Jun 2026 17:37:03 -0700 Subject: [PATCH 2/6] Added a null-aware anti-join dynamic-filter regression test. Exercises the pushdown path the existing in-memory tests miss: parquet with row-level filtering, so the pushed dynamic filter actually drops rows. Without the fix `id NOT IN (SELECT eid ...)` returns 1 and 3 instead of zero rows. --- .../test_files/null_aware_anti_join.slt | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt index b18f3b3ae7a99..3f04f8b865623 100644 --- a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt +++ b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt @@ -451,3 +451,68 @@ DROP TABLE customers_test; statement ok DROP TABLE all_null_banned; + +############# +## Test: dynamic filter pushdown must not drop the inner NULL. +## With join dynamic filter pushdown on, the build-side filter pushed to the probe scan would drop +## the inner NULL, but NOT IN three-valued logic needs it to collapse the result to zero rows. The +## in-memory VALUES scans above never apply the pushed filter, so this case needs a parquet scan. +############# + +statement ok +set datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; + +# Row-level parquet filtering, so the pushed filter actually drops matching rows instead of only +# pruning row groups. Without this the single row group is read whole and the NULL never gets dropped. +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE TABLE asa_outer(id INT) AS VALUES (1), (2), (3); + +statement ok +CREATE TABLE asa_inner(eid INT) AS VALUES (2), (NULL); + +query I +COPY asa_outer TO 'test_files/scratch/null_aware_anti_join/asa_outer.parquet' STORED AS PARQUET; +---- +3 + +query I +COPY asa_inner TO 'test_files/scratch/null_aware_anti_join/asa_inner.parquet' STORED AS PARQUET; +---- +2 + +statement ok +CREATE EXTERNAL TABLE asa_outer_parquet(id INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/null_aware_anti_join/asa_outer.parquet'; + +statement ok +CREATE EXTERNAL TABLE asa_inner_parquet(eid INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/null_aware_anti_join/asa_inner.parquet'; + +# Expected: zero rows. Before the fix the pushed dynamic filter dropped the inner NULL, so the join +# wrongly returned id = 1 and id = 3. +query I +SELECT id FROM asa_outer_parquet WHERE id NOT IN (SELECT eid FROM asa_inner_parquet) ORDER BY id; +---- + +statement ok +DROP TABLE asa_outer; + +statement ok +DROP TABLE asa_inner; + +statement ok +DROP TABLE asa_outer_parquet; + +statement ok +DROP TABLE asa_inner_parquet; + +statement ok +set datafusion.execution.parquet.pushdown_filters = false; + +statement ok +set datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; From d899f8b9b576613a492a079aaf26aa0b205ffa76 Mon Sep 17 00:00:00 2001 From: Moe Date: Wed, 24 Jun 2026 13:09:18 -0700 Subject: [PATCH 3/6] Moved the probe null check ahead of the dynamic filter. It's cheap, so it short-circuits NULL rows before the costlier filter. The `debug_assert` pins the single-key invariant the `on_right[0]` indexing relies on. --- .../src/joins/hash_join/shared_bounds.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 2658a7020cd52..c8ef965186702 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -711,13 +711,18 @@ impl SharedBuildAccumulator { if !self.null_aware { return filter_expr; } - // A null-aware anti join is validated to a single probe key. + debug_assert_eq!( + self.on_right.len(), + 1, + "null_aware anti join must have exactly one probe key" + ); let probe_key_is_null: Arc = Arc::new(IsNullExpr::new(Arc::clone(&self.on_right[0]))); + // Cheap null check first short-circuits before the costlier dynamic filter. Arc::new(BinaryExpr::new( - filter_expr, - Operator::Or, probe_key_is_null, + Operator::Or, + filter_expr, )) } } From 26cda37bf35d9c1e7535f86a093c256da4be76df Mon Sep 17 00:00:00 2001 From: Moe Date: Wed, 24 Jun 2026 13:09:18 -0700 Subject: [PATCH 4/6] Reset the SLT knobs and reworded the probe-NULL comments. RESET restores the defaults at the end instead of re-setting explicit values. A probe can hold several NULLs, so the comments read as plural. --- .../sqllogictest/test_files/null_aware_anti_join.slt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt index 3f04f8b865623..1d12fc33c9a29 100644 --- a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt +++ b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt @@ -453,9 +453,9 @@ statement ok DROP TABLE all_null_banned; ############# -## Test: dynamic filter pushdown must not drop the inner NULL. +## Test: dynamic filter pushdown must not drop inner (probe-side) NULLs. ## With join dynamic filter pushdown on, the build-side filter pushed to the probe scan would drop -## the inner NULL, but NOT IN three-valued logic needs it to collapse the result to zero rows. The +## inner NULLs, but NOT IN three-valued logic needs them to collapse the result to zero rows. The ## in-memory VALUES scans above never apply the pushed filter, so this case needs a parquet scan. ############# @@ -493,7 +493,7 @@ CREATE EXTERNAL TABLE asa_inner_parquet(eid INT) STORED AS PARQUET LOCATION 'test_files/scratch/null_aware_anti_join/asa_inner.parquet'; -# Expected: zero rows. Before the fix the pushed dynamic filter dropped the inner NULL, so the join +# Expected: zero rows. Before the fix the pushed dynamic filter dropped inner NULLs, so the join # wrongly returned id = 1 and id = 3. query I SELECT id FROM asa_outer_parquet WHERE id NOT IN (SELECT eid FROM asa_inner_parquet) ORDER BY id; @@ -512,7 +512,7 @@ statement ok DROP TABLE asa_inner_parquet; statement ok -set datafusion.execution.parquet.pushdown_filters = false; +RESET datafusion.execution.parquet.pushdown_filters; statement ok -set datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; +RESET datafusion.optimizer.enable_join_dynamic_filter_pushdown; From ca5f12262c0dbd57f18fecde1b4b4820a5b8cfab Mon Sep 17 00:00:00 2001 From: Moe Date: Mon, 22 Jun 2026 20:07:08 -0700 Subject: [PATCH 5/6] Re-enabled null-equal join dynamic filters via an IS NULL predicate. build-side predicate prunes a probe-side NULL that can null-match a build-side NULL. Push the filter with `OR key IS NULL` over the nullable probe keys instead, the way #23104 does for null-aware anti joins. A NOT NULL key never widens the filter, so an all-NOT-NULL join keeps full selectivity. --- .../physical-plan/src/joins/hash_join/exec.rs | 15 +-- .../src/joins/hash_join/shared_bounds.rs | 117 ++++++++++++++---- .../test_files/push_down_filter_parquet.slt | 39 +++++- 3 files changed, 134 insertions(+), 37 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a11b89175f3eb..fc1d3b8887a23 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -862,14 +862,6 @@ impl HashJoinExec { return false; } - // Bounds and membership filters derived from the build side do not - // account for null-equal matching: a probe-side NULL key evaluates - // such predicates to NULL and would be pruned, even though it can - // match a build-side NULL when nulls compare equal. - if self.null_equality == NullEquality::NullEqualsNull { - return false; - } - // `preserve_file_partitions` can report Hash partitioning for Hive-style // file groups, but those partitions are not actually hash-distributed. // Partitioned dynamic filters rely on hash routing, so disable them in @@ -1363,6 +1355,7 @@ impl ExecutionPlan for HashJoinExec { filter, on_right, repartition_random_state, + self.null_equality, self.null_aware, )) }))) @@ -6629,7 +6622,7 @@ mod tests { } #[test] - fn test_dynamic_filter_pushdown_rejects_null_equal_join() -> Result<()> { + fn test_dynamic_filter_pushdown_allowed_for_null_equal_join() -> Result<()> { let (_, _, on) = build_schema_and_on()?; let left = build_table(("a1", &vec![1]), ("b1", &vec![1]), ("c1", &vec![1])); let right = build_table(("a2", &vec![1]), ("b1", &vec![1]), ("c2", &vec![1])); @@ -6652,7 +6645,9 @@ mod tests { false, )?; - assert!(!join.allow_join_dynamic_filter_pushdown(session_config.options())); + // Null-equal joins keep dynamic filter pushdown: the pushed predicate carries an + // `IS NULL` disjunct so a probe-side NULL still reaches the join. + assert!(join.allow_join_dynamic_filter_pushdown(session_config.options())); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index c8ef965186702..5bf22f2c7f31c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -33,7 +33,9 @@ use crate::joins::hash_join::partitioned_hash_eval::{ use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; +use datafusion_common::{ + DataFusionError, NullEquality, Result, ScalarValue, SharedResult, +}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ @@ -255,6 +257,9 @@ pub(crate) struct SharedBuildAccumulator { repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, + /// Null equality of the join. Under `NullEqualsNull` a probe-side NULL can match a + /// build-side NULL, so the pushed filter must keep NULL rows here too. + null_equality: NullEquality, /// Null-aware anti join (`NOT IN`). A probe-side NULL must reach the join so its /// three-valued logic can collapse the result, so the pushed filter keeps NULL rows. null_aware: bool, @@ -354,6 +359,7 @@ impl SharedBuildAccumulator { /// We cannot build a partial filter from some partitions - it would incorrectly eliminate /// valid join results. We must wait until we have complete information from ALL /// relevant partitions before updating the dynamic filter. + #[expect(clippy::too_many_arguments)] pub(crate) fn new_from_partition_mode( partition_mode: PartitionMode, left_child: &dyn ExecutionPlan, @@ -361,6 +367,7 @@ impl SharedBuildAccumulator { dynamic_filter: Arc, on_right: Vec, repartition_random_state: SeededRandomState, + null_equality: NullEquality, null_aware: bool, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches @@ -408,6 +415,7 @@ impl SharedBuildAccumulator { on_right, repartition_random_state, probe_schema: right_child.schema(), + null_equality, null_aware, } } @@ -585,7 +593,7 @@ impl SharedBuildAccumulator { combine_membership_and_bounds(membership_expr, bounds_expr) { self.dynamic_filter - .update(self.null_aware_filter(filter_expr))?; + .update(self.preserve_probe_nulls(filter_expr))?; } } PartitionStatus::Pending => { @@ -692,38 +700,48 @@ impl SharedBuildAccumulator { }; self.dynamic_filter - .update(self.null_aware_filter(filter_expr))?; + .update(self.preserve_probe_nulls(filter_expr))?; } } Ok(()) } - /// Wraps a pushdown filter so a null-aware anti join keeps its probe-side NULL rows. + /// Keeps probe rows with a NULL key when the join semantics need them. /// - /// The build-side predicate drops probe rows whose key is NULL, but `NOT IN` three-valued - /// logic needs that NULL to reach the join. OR-ing `probe_key IS NULL` preserves the dynamic - /// filter's selectivity for non-NULL rows while letting the NULL through. - fn null_aware_filter( + /// The build-side predicate drops probe rows whose key is NULL. A null-aware anti join + /// (`NOT IN`) needs that NULL to reach the join so three-valued logic can collapse the + /// result, and a null-equal join needs it to match a build-side NULL. OR-ing `key IS NULL` + /// keeps those rows while preserving the filter's selectivity for the rest; the join refines + /// whatever the widened filter lets through. + fn preserve_probe_nulls( &self, filter_expr: Arc, ) -> Arc { - if !self.null_aware { + if self.null_equality != NullEquality::NullEqualsNull && !self.null_aware { return filter_expr; } - debug_assert_eq!( - self.on_right.len(), - 1, - "null_aware anti join must have exactly one probe key" - ); - let probe_key_is_null: Arc = - Arc::new(IsNullExpr::new(Arc::clone(&self.on_right[0]))); + // Only a key that can actually be NULL needs the disjunct; a NOT NULL key never widens. + // Null-aware joins are single-key; null-equal joins can be multi-key, so OR every nullable + // key. If every key is NOT NULL the filter is left untouched, at full selectivity. + let any_key_is_null = self + .on_right + .iter() + .filter(|key| key.nullable(&self.probe_schema).unwrap_or(true)) + .map(|key| { + Arc::new(IsNullExpr::new(Arc::clone(key))) as Arc + }) + .reduce(|acc, is_null| { + Arc::new(BinaryExpr::new(acc, Operator::Or, is_null)) + as Arc + }); // Cheap null check first short-circuits before the costlier dynamic filter. - Arc::new(BinaryExpr::new( - probe_key_is_null, - Operator::Or, - filter_expr, - )) + match any_key_is_null { + Some(any_key_is_null) => { + Arc::new(BinaryExpr::new(any_key_is_null, Operator::Or, filter_expr)) + } + None => filter_expr, + } } } @@ -756,6 +774,7 @@ pub(super) fn make_partitioned_accumulator_for_test( on_right: vec![], repartition_random_state: SeededRandomState::with_seed(1), probe_schema, + null_equality: NullEquality::NullEqualsNothing, null_aware: false, } } @@ -776,6 +795,7 @@ pub(super) fn completed_partitions_for_test(acc: &SharedBuildAccumulator) -> usi #[cfg(test)] mod tests { use super::*; + use datafusion_physical_expr::expressions::Column; fn partitioned_state(acc: &SharedBuildAccumulator) -> (Vec, usize) { let guard = acc.inner.lock(); @@ -845,4 +865,59 @@ mod tests { assert!(matches!(partitions[0], PartitionStatus::CanceledUnknown)); assert_eq!(completed, 1); } + + fn null_equal_accumulator( + probe_schema: Arc, + on_right: Vec, + ) -> SharedBuildAccumulator { + SharedBuildAccumulator { + inner: Mutex::new(AccumulatorState { + data: AccumulatedBuildData::Partitioned { + partitions: vec![PartitionStatus::Pending; 1], + completed_partitions: 0, + }, + completion: CompletionState::Pending, + }), + completion_notify: Notify::new(), + dynamic_filter: Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))), + on_right, + repartition_random_state: SeededRandomState::with_seed(1), + probe_schema, + null_equality: NullEquality::NullEqualsNull, + null_aware: false, + } + } + + #[test] + fn preserve_probe_nulls_only_widens_nullable_keys() { + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("k_nullable", DataType::Int32, true), + Field::new("k_not_null", DataType::Int32, false), + ])); + let on_right: Vec = vec![ + Arc::new(Column::new("k_nullable", 0)), + Arc::new(Column::new("k_not_null", 1)), + ]; + let acc = null_equal_accumulator(probe_schema, on_right); + + // Only the nullable key earns an IS NULL disjunct; the NOT NULL key is left out. + let widened = acc.preserve_probe_nulls(lit(true)); + assert_eq!(format!("{widened}").matches("IS NULL").count(), 1); + } + + #[test] + fn preserve_probe_nulls_leaves_all_not_null_keys_untouched() { + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let on_right: Vec = + vec![Arc::new(Column::new("a", 0)), Arc::new(Column::new("b", 1))]; + let acc = null_equal_accumulator(probe_schema, on_right); + + // Every key is NOT NULL, so there is nothing to OR in and the filter is returned as-is. + let filter = lit(true); + let result = acc.preserve_probe_nulls(Arc::clone(&filter)); + assert_eq!(format!("{result}"), format!("{filter}")); + } } diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index 6cb92025ca36a..11645ad70e3cc 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -1024,10 +1024,10 @@ drop table int_probe; ######## -# Dynamic filters must not be created for null-equal joins (IS NOT DISTINCT -# FROM, INTERSECT): min/max bounds and membership filters derived from the -# build side evaluate to NULL for probe-side NULL keys and would prune rows -# that can null-match a build-side NULL. +# Null-equal joins (IS NOT DISTINCT FROM, INTERSECT) keep dynamic filter pushdown. +# Min/max bounds and membership filters derived from the build side evaluate to NULL +# for a probe-side NULL key, so the pushed predicate carries an `IS NULL` disjunct that +# lets the probe NULL reach the join and null-match a build-side NULL. ######## statement ok @@ -1049,14 +1049,14 @@ SELECT nej_build.id, nej_probe.id FROM nej_build JOIN nej_probe ON nej_build.id 11 11 NULL NULL -# No DynamicFilter predicate may appear on the probe side of a null-equal join +# The probe side now carries a DynamicFilter for a null-equal join (widened with IS NULL at runtime) query TT EXPLAIN SELECT nej_build.id, nej_probe.id FROM nej_build JOIN nej_probe ON nej_build.id IS NOT DISTINCT FROM nej_probe.id ---- physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], NullsEqual: true 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_build.parquet]]}, projection=[id], file_type=parquet -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_probe.parquet]]}, projection=[id], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nej_probe.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible statement ok drop table nej_build; @@ -1065,6 +1065,33 @@ statement ok drop table nej_probe; +# Multi-key null-equal join: the IS NULL disjunct covers every nullable key, so a probe row with a +# NULL in either key still reaches the join and null-matches the build side. +statement ok +COPY (SELECT * FROM (VALUES (1, 10), (2, NULL), (NULL, 30)) v(a, b)) TO 'test_files/scratch/push_down_filter_parquet/mnej_probe.parquet' STORED AS PARQUET; + +statement ok +COPY (SELECT * FROM (VALUES (1, 10), (2, NULL)) v(a, b)) TO 'test_files/scratch/push_down_filter_parquet/mnej_build.parquet' STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE mnej_probe STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_parquet/mnej_probe.parquet'; + +statement ok +CREATE EXTERNAL TABLE mnej_build STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_parquet/mnej_build.parquet'; + +query IIII rowsort +SELECT mnej_build.a, mnej_build.b, mnej_probe.a, mnej_probe.b FROM mnej_build JOIN mnej_probe ON (mnej_build.a IS NOT DISTINCT FROM mnej_probe.a) AND (mnej_build.b IS NOT DISTINCT FROM mnej_probe.b) +---- +1 10 1 10 +2 NULL 2 NULL + +statement ok +drop table mnej_build; + +statement ok +drop table mnej_probe; + + # Config reset statement ok RESET datafusion.explain.physical_plan_only; From 9620b97843fc6f2d6befa53a92de071ed8323a93 Mon Sep 17 00:00:00 2001 From: Moe Date: Wed, 24 Jun 2026 13:44:54 -0700 Subject: [PATCH 6/6] Documented the conservative nullability fallback. The `unwrap_or(true)` widening on an unresolved nullability check wasn't obvious. An extra NULL row is safe; dropping a needed one isn't. --- datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 5bf22f2c7f31c..6c0e95175e46d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -727,6 +727,7 @@ impl SharedBuildAccumulator { let any_key_is_null = self .on_right .iter() + // Widen on unresolved nullability: an extra NULL row is safe, a dropped one isn't. .filter(|key| key.nullable(&self.probe_schema).unwrap_or(true)) .map(|key| { Arc::new(IsNullExpr::new(Arc::clone(key))) as Arc