[SPARK-57091][SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization#56101
[SPARK-57091][SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization#56101yadavay-amzn wants to merge 1 commit into
Conversation
8cabc34 to
bc2be19
Compare
72c424a to
83eebbf
Compare
83eebbf to
80696c1
Compare
Implements StreamingNearestByJoinExec that uses a broadcast right side + k-sized heap per left row, avoiding the N*M cross-product materialization. Memory benchmark results (30K x 30K, k=5): - Streaming Heap: 31s, ~208 MB memory delta - Cross-product: 404s, ~1733 MB memory delta - Memory ratio: 8.3x less memory for streaming heap - Time ratio: 12.9x faster At constrained heap sizes (<=1GB), cross-product OOMs while streaming heap completes with ~200MB.
80696c1 to
abc2136
Compare
|
@dilipbiswal @cloud-fan Could you please take a look at this change when you get a chance? This improvement is inspired by @sarutak's SortMergeAsOfJoinExec for AS-OF join (#55912) and is similar in spirit to it by using a dedicated physical operator to replace an expensive rewrite for a specialized join type. The operator broadcasts the right side and maintains a bounded k-heap per left row to avoid the shuffling cost and full materialization when the right table fits within Does this approach align with the planned evolution of the feature? Any concerns about adding a dedicated physical operator vs. optimizing the existing rewrite? |
What changes were proposed in this pull request?
Add
BroadcastNearestByJoinExec, a dedicated physical operator for NearestByJoin that replaces the existing cross-product rewrite (RewriteNearestByJoin) whenspark.sql.join.nearestBy.broadcast.enabledis set totrue(defaultfalse).The operator broadcasts the right side and performs a single-pass iteration per left row with a bounded priority queue of size k. It exploits the asymmetric pattern (small reference table, large fact table) by never materializing the N×M cross product.
Changes:
BroadcastNearestByJoinExecNearestByJoinSelectioninSparkStrategiesRewriteNearestByJoinwhen the conf is enabled and right fits in broadcast thresholdspark.sql.join.nearestBy.broadcast.enabledWhy are the changes needed?
The current NearestByJoin implementation rewrites to a cross-join + aggregate. This materializes all N×M row pairs, shuffles them by synthetic UUID, and then the aggregate discards the vast majority. At moderate scale this becomes the bottleneck.
Benchmark results (k=5, JDK 17, 8g heap, single machine):
The speedup is consistently ≥5x across scales. At smaller scales the ratio appears higher (8x at 10K) because Spark scheduling overhead is a larger fraction of the cross-product's total time. Under memory-constrained configurations (1-2g heap), the speedup is larger as the cross-product approach incurs significant GC pressure from intermediate materialization.
Why a dedicated operator instead of optimizing the existing rewrite?
The cross-join materializes all N×M rows before the aggregate can bound them. There is no "streaming top-k aggregate" that can short-circuit the cross-join mid-execution. The operator avoids this by never materializing the full product — for each left row, only k heap entries exist at any time.
Does this PR introduce any user-facing change?
No. The feature is opt-in via a new SQLConf that defaults to
false. When disabled, the existingRewriteNearestByJoinpath is used unchanged.How was this patch tested?
BroadcastNearestByJoinExecSuite: 14 tests covering empty tables, k boundary cases, NaN/null exclusion, integer ranking types, similarity direction, tie-breaking, asymmetric sizes, broadcast threshold fallback, and plan verificationNearestByJoinBenchmark: comparative benchmark (cross-product vs broadcast-heap)DataFrameNearestByJoinSuite: passes with default conf (no regression)Was this patch authored or co-authored using generative AI tooling?
Yes.
JIRA: SPARK-57091