diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 7b56e75ea9ebd..03894a8b02305 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -57,6 +57,7 @@ struct HashJoinQuery { prob_hit: f64, build_size: &'static str, probe_size: &'static str, + isolate_partitioned_join: bool, } /// Inline SQL queries for Hash Join benchmarks @@ -69,6 +70,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M", + isolate_partitioned_join: false, }, // Q2: Very Small Build Side (Sparse, range < 1024) // Build Side: nation (25 rows, range 961) | Probe Side: customer (1.5M rows) @@ -85,6 +87,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M", + isolate_partitioned_join: false, }, // Q3: 100% Density, 100% Hit rate HashJoinQuery { @@ -93,6 +96,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q4: 100% Density, 10% Hit rate HashJoinQuery { @@ -108,6 +112,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q5: 75% Density, 100% Hit rate HashJoinQuery { @@ -123,6 +128,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q6: 75% Density, 10% Hit rate HashJoinQuery { @@ -142,6 +148,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q7: 50% Density, 100% Hit rate HashJoinQuery { @@ -157,6 +164,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q8: 50% Density, 10% Hit rate HashJoinQuery { @@ -176,6 +184,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q9: 20% Density, 100% Hit rate HashJoinQuery { @@ -191,6 +200,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q10: 20% Density, 10% Hit rate HashJoinQuery { @@ -210,6 +220,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q11: 10% Density, 100% Hit rate HashJoinQuery { @@ -225,6 +236,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q12: 10% Density, 10% Hit rate HashJoinQuery { @@ -244,6 +256,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q13: 1% Density, 100% Hit rate HashJoinQuery { @@ -259,6 +272,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q14: 1% Density, 10% Hit rate HashJoinQuery { @@ -278,6 +292,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M", + isolate_partitioned_join: false, }, // Q15: 20% Density, 10% Hit rate, 20% Duplicates in Build Side HashJoinQuery { @@ -300,6 +315,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K_(20%_dups)", probe_size: "60M", + isolate_partitioned_join: false, }, // RightSemi Join benchmarks with Int32 keys // @@ -325,6 +341,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M_RightSemi", + isolate_partitioned_join: false, }, // Q17: RightSemi, Medium build (100K rows), 100% Hit rate // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -337,6 +354,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M_RightSemi", + isolate_partitioned_join: false, }, // Q18: RightSemi, Medium build (100K rows), 10% Hit rate // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -352,6 +370,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M_RightSemi", + isolate_partitioned_join: false, }, // RightAnti Join benchmarks with Int32 keys // @@ -377,6 +396,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "25", probe_size: "1.5M_RightAnti", + isolate_partitioned_join: false, }, // Q20: RightAnti, Medium build (100K rows), 100% Hit rate (no output) // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -389,6 +409,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 1.0, build_size: "100K", probe_size: "60M_RightAnti", + isolate_partitioned_join: false, }, // Q21: RightAnti, Medium build (100K rows), 10% Hit rate (90% output) // Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows) @@ -404,6 +425,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.1, build_size: "100K", probe_size: "60M_RightAnti", + isolate_partitioned_join: false, }, // Q22: RightSemi, Medium build (100K rows), ~1% Hit rate, fanout ~100 // @@ -425,6 +447,31 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ prob_hit: 0.01, build_size: "100K_(fanout_100)", probe_size: "60M_RightSemi", + isolate_partitioned_join: false, + }, + // Q23: high-fanout string-key inner join calibrated to a real production + // HashJoinExec: build ~32K rows / ~415 distinct keys (fanout ~78), + // probe ~2.3M rows (all carrying the dominant key), output ~176M pairs. + // Long keys (~26 chars) make the per-pair recheck expensive; count(*) + // isolates the match path. `isolate_partitioned_join` disables stats so + // the planner picks Partitioned mode. + HashJoinQuery { + sql: r###"SELECT count(*) + FROM ( + SELECT 'svc_workload_mtls_status_' || CAST((s_suppkey % 415) + 1 AS VARCHAR) as k + FROM supplier + WHERE s_suppkey <= 32340 + ) s + JOIN ( + SELECT 'svc_workload_mtls_status_1' as k + FROM lineitem + WHERE l_orderkey % 265 = 0 + ) l ON s.k = l.k"###, + density: 1.0, + prob_hit: 1.0, + build_size: "32K_(fanout~78)", + probe_size: "2.3M_long_keys_count", + isolate_partitioned_join: true, }, ]; @@ -447,7 +494,6 @@ impl RunOpt { }; let mut config = self.common.config()?; - // Disable join reordering to ensure the optimizer doesn't swap join sides config.options_mut().optimizer.join_reordering = false; let rt = self.common.build_runtime()?; let ctx = SessionContext::new_with_config_rt(config, rt); @@ -487,6 +533,14 @@ impl RunOpt { ); benchmark_run.start_new_case(&case_name); + // Disabling stats for Q23 makes the planner pick Partitioned mode + // (can't prove small build → repartitions both sides). + let collect_statistics = !query.isolate_partitioned_join; + ctx.sql(&format!( + "SET datafusion.execution.collect_statistics = {collect_statistics}" + )) + .await?; + let query_run = self .benchmark_query(query.sql, &query_id.to_string(), &ctx) .await; diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a5da391ee7635..a88852f4fc53a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -216,9 +216,21 @@ pub(super) struct JoinLeftData { pub(super) probe_side_non_empty: AtomicBool, /// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins) pub(super) probe_side_has_null: AtomicBool, + /// `true` if any hash bucket holds build rows with differing join keys + /// (hash collisions). When `false`, every chain is "pure" and the + /// probe side can validate a chain with a single key check at its head + /// instead of re-checking every duplicate. Computed once at build time. + has_key_collisions: bool, } impl JoinLeftData { + /// Returns `true` if the build side has any real hash collisions (a bucket + /// holding rows with differing join keys). When `false`, the probe side can + /// skip the per-duplicate key recheck. See [`Self::has_key_collisions`]. + pub(super) fn has_key_collisions(&self) -> bool { + self.has_key_collisions + } + /// return a reference to the map pub(super) fn map(&self) -> &Map { &self.map @@ -2088,6 +2100,17 @@ async fn collect_left_input( (Map::HashMap(hashmap), batch, left_values) }; + // Detect whether the build side has real hash collisions (a bucket with + // differing keys). When it doesn't, the probe side can validate each chain + // with a single key check at its head instead of re-checking every + // duplicate. + let has_key_collisions = match &join_hash_map { + Map::HashMap(hashmap) => { + hashmap.has_key_collisions(&left_values, null_equality)? + } + Map::ArrayMap(_) => false, + }; + // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { let bitmap_size = bit_util::ceil(batch.num_rows(), 8); @@ -2144,6 +2167,7 @@ async fn collect_left_input( membership, probe_side_non_empty: AtomicBool::new(false), probe_side_has_null: AtomicBool::new(false), + has_key_collisions, }; Ok(data) @@ -4688,6 +4712,8 @@ mod tests { None, 8192, (0, None), + // Exercise the per-pair recheck path. + true, &mut probe_indices_buffer, &mut build_indices_buffer, )?; @@ -4750,6 +4776,8 @@ mod tests { None, 8192, (0, None), + // Exercise the per-pair recheck path. + true, &mut probe_indices_buffer, &mut build_indices_buffer, )?; diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 2aa6e69dff807..83f84672e588c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -33,7 +33,7 @@ use crate::joins::hash_join::shared_bounds::{ PartitionBounds, PartitionBuildData, SharedBuildAccumulator, }; use crate::joins::utils::{ - OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, matchable_join_keys, + JoinKeyComparator, OnceFut, get_final_indices_from_shared_bitmap, matchable_join_keys, }; use crate::stream::EmptyRecordBatchStream; use crate::{ @@ -402,6 +402,7 @@ pub(super) fn lookup_join_hashmap( valid_keys: Option<&NullBuffer>, limit: usize, offset: MapOffset, + has_key_collisions: bool, probe_indices_buffer: &mut Vec, build_indices_buffer: &mut Vec, ) -> Result<(UInt64Array, UInt32Array, Option)> { @@ -414,26 +415,53 @@ pub(super) fn lookup_join_hashmap( build_indices_buffer, ); - let build_indices_unfiltered: UInt64Array = - std::mem::take(build_indices_buffer).into(); - let probe_indices_unfiltered: UInt32Array = - std::mem::take(probe_indices_buffer).into(); - - // TODO: optimize equal_rows_arr to avoid allocation of intermediate arrays - // https://github.com/apache/datafusion/issues/12131 - let (build_indices, probe_indices) = equal_rows_arr( - &build_indices_unfiltered, - &probe_indices_unfiltered, + // Validate the candidate (build, probe) pairs against the join key to drop + // hash collisions. We compare values in place via a prebuilt comparator, + // avoiding the take() + eq_dyn_null() + FilterBuilder allocations that + // equal_rows_arr performs at O(matched_pairs) scale. + // See: https://github.com/apache/datafusion/issues/12131 + let comparator = JoinKeyComparator::for_equality( build_side_values, probe_side_values, null_equality, )?; - // Reclaim buffers - *build_indices_buffer = build_indices_unfiltered.into_parts().1.into(); - *probe_indices_buffer = probe_indices_unfiltered.into_parts().1.into(); + let mut build_out: Vec = Vec::with_capacity(build_indices_buffer.len()); + let mut probe_out: Vec = Vec::with_capacity(probe_indices_buffer.len()); + + if has_key_collisions { + // A bucket may mix keys, so every candidate pair must be rechecked. + for (b, p) in build_indices_buffer.iter().zip(probe_indices_buffer.iter()) { + if comparator.is_equal(*b as usize, *p as usize) { + build_out.push(*b); + probe_out.push(*p); + } + } + } else { + // Collision-free build side: every bucket holds a single key, so all + // pairs sharing one probe row (a contiguous run, since the chain walk + // emits a probe row's matches consecutively) have identical build + // keys. Check the key once per run at its head and accept or reject + // the whole run — turning F key comparisons per probe row into 1. + let builds = build_indices_buffer.as_slice(); + let probes = probe_indices_buffer.as_slice(); + let mut start = 0; + while start < probes.len() { + let probe_idx = probes[start]; + let end = start + probes[start..].partition_point(|&p| p == probe_idx); + if comparator.is_equal(builds[start] as usize, probe_idx as usize) { + build_out.extend_from_slice(&builds[start..end]); + probe_out.extend_from_slice(&probes[start..end]); + } + start = end; + } + } + + // Reclaim buffers for the next call + build_indices_buffer.clear(); + probe_indices_buffer.clear(); - Ok((build_indices, probe_indices, next_offset)) + Ok((build_out.into(), probe_out.into(), next_offset)) } /// Counts the number of distinct elements in the input array. @@ -808,6 +836,7 @@ impl HashJoinStream { state.valid_keys.as_ref(), self.batch_size, state.offset, + build_side.left_data.has_key_collisions(), &mut self.probe_indices_buffer, &mut self.build_indices_buffer, )?, diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index 454cc916aeb12..eafbec34ecdd7 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -22,9 +22,11 @@ use std::fmt::{self, Debug}; use std::ops::Sub; -use arrow::array::BooleanArray; +use crate::joins::utils::JoinKeyComparator; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowNativeType; +use datafusion_common::{NullEquality, Result}; use hashbrown::HashTable; use hashbrown::hash_table::Entry::{Occupied, Vacant}; @@ -131,6 +133,18 @@ pub trait JoinHashMapType: Send + Sync { match_indices: &mut Vec, ) -> Option; + /// Returns `true` if any bucket holds build rows with differing join keys + /// (real hash collisions). When `false`, the probe can check once per + /// chain head and accept the whole run. Scanned once at build time. + fn has_key_collisions( + &self, + left_values: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + let _ = (left_values, null_equality); + Ok(true) + } + /// Returns a BooleanArray indicating which of the provided hashes exist in the map. fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray; @@ -208,6 +222,14 @@ impl JoinHashMapType for JoinHashMapU32 { ) } + fn has_key_collisions( + &self, + left_values: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + detect_key_collisions(&self.next, left_values, null_equality) + } + fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray { contain_hashes(&self.map, hash_values) } @@ -288,6 +310,14 @@ impl JoinHashMapType for JoinHashMapU64 { ) } + fn has_key_collisions( + &self, + left_values: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + detect_key_collisions(&self.next, left_values, null_equality) + } + fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray { contain_hashes(&self.map, hash_values) } @@ -491,6 +521,40 @@ pub fn contain_hashes(map: &HashTable<(u64, T)>, hash_values: &[u64]) -> Bool BooleanArray::new(buffer, None) } +/// Scans the `next` chain to detect real hash collisions (two build rows in +/// the same bucket with different keys). `next[i]` stores `prev_row + 1` +/// (`0` = end of chain). Checking every adjacent linked pair is sufficient: +/// any two distinct keys in the same bucket must appear as neighbors somewhere. +/// +/// Example — keys `["cat", "cat", "dog"]`, next `[0, 1, 2]`: +/// row 1 → prev 0: "cat"=="cat" ✓ +/// row 2 → prev 1: "dog"!="cat" → return true (collision found) +fn detect_key_collisions( + next: &[T], + left_values: &[ArrayRef], + null_equality: NullEquality, +) -> Result +where + T: ArrowNativeType + Into, +{ + if next.is_empty() { + return Ok(false); + } + let comparator = + JoinKeyComparator::for_equality(left_values, left_values, null_equality)?; + for (row, &link) in next.iter().enumerate() { + let link: u64 = link.into(); + if link != 0 { + // `link` is `prev_row + 1`; both rows live in the same bucket. + let prev = (link - 1) as usize; + if !comparator.is_equal(row, prev) { + return Ok(true); + } + } + } + Ok(false) +} + #[cfg(test)] mod tests { use super::*; @@ -569,4 +633,31 @@ mod tests { assert_eq!(input_indices, vec![1, 1]); assert_eq!(match_indices, vec![3, 1]); } + + #[test] + fn test_has_key_collisions_same_key() -> Result<()> { + // 5 build rows all with key 10 chained in the same bucket — no collision. + // next: [0, 1, 2, 3, 4] → chain 4→3→2→1→0→end + use arrow::array::Int32Array; + use std::sync::Arc; + let next: Vec = vec![0, 1, 2, 3, 4]; + let map = JoinHashMapU32::new(HashTable::new(), next); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![10, 10, 10, 10, 10])); + assert!(!map.has_key_collisions(&[keys], NullEquality::NullEqualsNothing)?); + Ok(()) + } + + #[test] + fn test_has_key_collisions_distinct_keys() -> Result<()> { + // 5 build rows, 4 with key 10 and 1 with key 20 buried in the chain. + // next: [0, 1, 2, 3, 4] → chain 4→3→2→1→0→end + // Row 2 has key 20 — adjacent pair (row 2, row 1) differs → collision. + use arrow::array::Int32Array; + use std::sync::Arc; + let next: Vec = vec![0, 1, 2, 3, 4]; + let map = JoinHashMapU32::new(HashTable::new(), next); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![10, 10, 20, 10, 10])); + assert!(map.has_key_collisions(&[keys], NullEquality::NullEqualsNothing)?); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 39a4c178ca4b6..96f127879b41e 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -2332,6 +2332,19 @@ impl JoinKeyComparator { Ok(Self { first, rest }) } + /// Build equality-only comparators for each join key column pair. + /// + /// Unlike [`Self::new`], no `SortOptions` are required — `SortOptions::default()` + /// is used internally, which is correct because callers only test `== Equal`. + pub fn for_equality( + left_arrays: &[ArrayRef], + right_arrays: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + let sort_options = vec![SortOptions::default(); left_arrays.len()]; + Self::new(left_arrays, right_arrays, &sort_options, null_equality) + } + /// Compare row `left` (in the left arrays) with row `right` (in the right /// arrays). Returns the lexicographic ordering across all key columns. #[inline]