Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion benchmarks/src/hj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct HashJoinQuery {
prob_hit: f64,
build_size: &'static str,
probe_size: &'static str,
isolate_partitioned_join: bool,
}

/// Inline SQL queries for Hash Join benchmarks
Expand All @@ -69,6 +70,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "25",
probe_size: "1.5M",
isolate_partitioned_join: false,
},
// Q2: Very Small Build Side (Sparse, range < 1024)
// Build Side: nation (25 rows, range 961) | Probe Side: customer (1.5M rows)
Expand All @@ -85,6 +87,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "25",
probe_size: "1.5M",
isolate_partitioned_join: false,
},
// Q3: 100% Density, 100% Hit rate
HashJoinQuery {
Expand All @@ -93,6 +96,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q4: 100% Density, 10% Hit rate
HashJoinQuery {
Expand All @@ -108,6 +112,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q5: 75% Density, 100% Hit rate
HashJoinQuery {
Expand All @@ -123,6 +128,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q6: 75% Density, 10% Hit rate
HashJoinQuery {
Expand All @@ -142,6 +148,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q7: 50% Density, 100% Hit rate
HashJoinQuery {
Expand All @@ -157,6 +164,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q8: 50% Density, 10% Hit rate
HashJoinQuery {
Expand All @@ -176,6 +184,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q9: 20% Density, 100% Hit rate
HashJoinQuery {
Expand All @@ -191,6 +200,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q10: 20% Density, 10% Hit rate
HashJoinQuery {
Expand All @@ -210,6 +220,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q11: 10% Density, 100% Hit rate
HashJoinQuery {
Expand All @@ -225,6 +236,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q12: 10% Density, 10% Hit rate
HashJoinQuery {
Expand All @@ -244,6 +256,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q13: 1% Density, 100% Hit rate
HashJoinQuery {
Expand All @@ -259,6 +272,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q14: 1% Density, 10% Hit rate
HashJoinQuery {
Expand All @@ -278,6 +292,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M",
isolate_partitioned_join: false,
},
// Q15: 20% Density, 10% Hit rate, 20% Duplicates in Build Side
HashJoinQuery {
Expand All @@ -300,6 +315,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K_(20%_dups)",
probe_size: "60M",
isolate_partitioned_join: false,
},
// RightSemi Join benchmarks with Int32 keys
//
Expand All @@ -325,6 +341,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "25",
probe_size: "1.5M_RightSemi",
isolate_partitioned_join: false,
},
// Q17: RightSemi, Medium build (100K rows), 100% Hit rate
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
Expand All @@ -337,6 +354,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M_RightSemi",
isolate_partitioned_join: false,
},
// Q18: RightSemi, Medium build (100K rows), 10% Hit rate
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
Expand All @@ -352,6 +370,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M_RightSemi",
isolate_partitioned_join: false,
},
// RightAnti Join benchmarks with Int32 keys
//
Expand All @@ -377,6 +396,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "25",
probe_size: "1.5M_RightAnti",
isolate_partitioned_join: false,
},
// Q20: RightAnti, Medium build (100K rows), 100% Hit rate (no output)
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
Expand All @@ -389,6 +409,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M_RightAnti",
isolate_partitioned_join: false,
},
// Q21: RightAnti, Medium build (100K rows), 10% Hit rate (90% output)
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
Expand All @@ -404,6 +425,7 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M_RightAnti",
isolate_partitioned_join: false,
},
// Q22: RightSemi, Medium build (100K rows), ~1% Hit rate, fanout ~100
//
Expand All @@ -425,6 +447,31 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
prob_hit: 0.01,
build_size: "100K_(fanout_100)",
probe_size: "60M_RightSemi",
isolate_partitioned_join: false,
},
// Q23: high-fanout string-key inner join calibrated to a real production
// HashJoinExec: build ~32K rows / ~415 distinct keys (fanout ~78),
// probe ~2.3M rows (all carrying the dominant key), output ~176M pairs.
// Long keys (~26 chars) make the per-pair recheck expensive; count(*)
// isolates the match path. `isolate_partitioned_join` disables stats so
// the planner picks Partitioned mode.
HashJoinQuery {
sql: r###"SELECT count(*)
FROM (
SELECT 'svc_workload_mtls_status_' || CAST((s_suppkey % 415) + 1 AS VARCHAR) as k
FROM supplier
WHERE s_suppkey <= 32340
) s
JOIN (
SELECT 'svc_workload_mtls_status_1' as k
FROM lineitem
WHERE l_orderkey % 265 = 0
) l ON s.k = l.k"###,
density: 1.0,
prob_hit: 1.0,
build_size: "32K_(fanout~78)",
probe_size: "2.3M_long_keys_count",
isolate_partitioned_join: true,
},
];

Expand All @@ -447,7 +494,6 @@ impl RunOpt {
};

let mut config = self.common.config()?;
// Disable join reordering to ensure the optimizer doesn't swap join sides
config.options_mut().optimizer.join_reordering = false;
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);
Expand Down Expand Up @@ -487,6 +533,14 @@ impl RunOpt {
);
benchmark_run.start_new_case(&case_name);

// Disabling stats for Q23 makes the planner pick Partitioned mode
// (can't prove small build → repartitions both sides).
let collect_statistics = !query.isolate_partitioned_join;
ctx.sql(&format!(
"SET datafusion.execution.collect_statistics = {collect_statistics}"
))
.await?;

let query_run = self
.benchmark_query(query.sql, &query_id.to_string(), &ctx)
.await;
Expand Down
28 changes: 28 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,21 @@ pub(super) struct JoinLeftData {
pub(super) probe_side_non_empty: AtomicBool,
/// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins)
pub(super) probe_side_has_null: AtomicBool,
/// `true` if any hash bucket holds build rows with differing join keys
/// (hash collisions). When `false`, every chain is "pure" and the
/// probe side can validate a chain with a single key check at its head
/// instead of re-checking every duplicate. Computed once at build time.
has_key_collisions: bool,
}

impl JoinLeftData {
/// Returns `true` if the build side has any real hash collisions (a bucket
/// holding rows with differing join keys). When `false`, the probe side can
/// skip the per-duplicate key recheck. See [`Self::has_key_collisions`].
pub(super) fn has_key_collisions(&self) -> bool {
self.has_key_collisions
}

/// return a reference to the map
pub(super) fn map(&self) -> &Map {
&self.map
Expand Down Expand Up @@ -2088,6 +2100,17 @@ async fn collect_left_input(
(Map::HashMap(hashmap), batch, left_values)
};

// Detect whether the build side has real hash collisions (a bucket with
// differing keys). When it doesn't, the probe side can validate each chain
// with a single key check at its head instead of re-checking every
// duplicate.
let has_key_collisions = match &join_hash_map {
Map::HashMap(hashmap) => {
hashmap.has_key_collisions(&left_values, null_equality)?
}
Map::ArrayMap(_) => false,
};

// Reserve additional memory for visited indices bitmap and create shared builder
let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
Expand Down Expand Up @@ -2144,6 +2167,7 @@ async fn collect_left_input(
membership,
probe_side_non_empty: AtomicBool::new(false),
probe_side_has_null: AtomicBool::new(false),
has_key_collisions,
};

Ok(data)
Expand Down Expand Up @@ -4688,6 +4712,8 @@ mod tests {
None,
8192,
(0, None),
// Exercise the per-pair recheck path.
true,
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
Expand Down Expand Up @@ -4750,6 +4776,8 @@ mod tests {
None,
8192,
(0, None),
// Exercise the per-pair recheck path.
true,
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;
Expand Down
59 changes: 44 additions & 15 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::joins::hash_join::shared_bounds::{
PartitionBounds, PartitionBuildData, SharedBuildAccumulator,
};
use crate::joins::utils::{
OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, matchable_join_keys,
JoinKeyComparator, OnceFut, get_final_indices_from_shared_bitmap, matchable_join_keys,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
Expand Down Expand Up @@ -402,6 +402,7 @@ pub(super) fn lookup_join_hashmap(
valid_keys: Option<&NullBuffer>,
limit: usize,
offset: MapOffset,
has_key_collisions: bool,
probe_indices_buffer: &mut Vec<u32>,
build_indices_buffer: &mut Vec<u64>,
) -> Result<(UInt64Array, UInt32Array, Option<MapOffset>)> {
Expand All @@ -414,26 +415,53 @@ pub(super) fn lookup_join_hashmap(
build_indices_buffer,
);

let build_indices_unfiltered: UInt64Array =
std::mem::take(build_indices_buffer).into();
let probe_indices_unfiltered: UInt32Array =
std::mem::take(probe_indices_buffer).into();

// TODO: optimize equal_rows_arr to avoid allocation of intermediate arrays
// https://github.com/apache/datafusion/issues/12131
let (build_indices, probe_indices) = equal_rows_arr(
&build_indices_unfiltered,
&probe_indices_unfiltered,
// Validate the candidate (build, probe) pairs against the join key to drop
// hash collisions. We compare values in place via a prebuilt comparator,
// avoiding the take() + eq_dyn_null() + FilterBuilder allocations that
// equal_rows_arr performs at O(matched_pairs) scale.
// See: https://github.com/apache/datafusion/issues/12131
let comparator = JoinKeyComparator::for_equality(
build_side_values,
probe_side_values,
null_equality,
)?;

// Reclaim buffers
*build_indices_buffer = build_indices_unfiltered.into_parts().1.into();
*probe_indices_buffer = probe_indices_unfiltered.into_parts().1.into();
let mut build_out: Vec<u64> = Vec::with_capacity(build_indices_buffer.len());
let mut probe_out: Vec<u32> = Vec::with_capacity(probe_indices_buffer.len());

if has_key_collisions {
// A bucket may mix keys, so every candidate pair must be rechecked.
for (b, p) in build_indices_buffer.iter().zip(probe_indices_buffer.iter()) {
if comparator.is_equal(*b as usize, *p as usize) {
build_out.push(*b);
probe_out.push(*p);
}
}
} else {
// Collision-free build side: every bucket holds a single key, so all
// pairs sharing one probe row (a contiguous run, since the chain walk
// emits a probe row's matches consecutively) have identical build
// keys. Check the key once per run at its head and accept or reject
// the whole run — turning F key comparisons per probe row into 1.
let builds = build_indices_buffer.as_slice();
let probes = probe_indices_buffer.as_slice();
let mut start = 0;
while start < probes.len() {
let probe_idx = probes[start];
let end = start + probes[start..].partition_point(|&p| p == probe_idx);
if comparator.is_equal(builds[start] as usize, probe_idx as usize) {
build_out.extend_from_slice(&builds[start..end]);
probe_out.extend_from_slice(&probes[start..end]);
}
start = end;
}
}

// Reclaim buffers for the next call
build_indices_buffer.clear();
probe_indices_buffer.clear();

Ok((build_indices, probe_indices, next_offset))
Ok((build_out.into(), probe_out.into(), next_offset))
}

/// Counts the number of distinct elements in the input array.
Expand Down Expand Up @@ -808,6 +836,7 @@ impl HashJoinStream {
state.valid_keys.as_ref(),
self.batch_size,
state.offset,
build_side.left_data.has_key_collisions(),
&mut self.probe_indices_buffer,
&mut self.build_indices_buffer,
)?,
Expand Down
Loading
Loading