Skip to content

[SPARK-57091][SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization#56101

Open
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:proto/nearestby-streaming-heap
Open

[SPARK-57091][SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization#56101
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:proto/nearestby-streaming-heap

Conversation

@yadavay-amzn
Copy link
Copy Markdown
Contributor

@yadavay-amzn yadavay-amzn commented May 25, 2026

What changes were proposed in this pull request?

Add BroadcastNearestByJoinExec, a dedicated physical operator for NearestByJoin that replaces the existing cross-product rewrite (RewriteNearestByJoin) when spark.sql.join.nearestBy.broadcast.enabled is set to true (default false).

The operator broadcasts the right side and performs a single-pass iteration per left row with a bounded priority queue of size k. It exploits the asymmetric pattern (small reference table, large fact table) by never materializing the N×M cross product.

Changes:

  • New physical operator: BroadcastNearestByJoinExec
  • New planner strategy: NearestByJoinSelection in SparkStrategies
  • Conditional skip of RewriteNearestByJoin when the conf is enabled and right fits in broadcast threshold
  • New SQLConf: spark.sql.join.nearestBy.broadcast.enabled

Why are the changes needed?

The current NearestByJoin implementation rewrites to a cross-join + aggregate. This materializes all N×M row pairs, shuffles them by synthetic UUID, and then the aggregate discards the vast majority. At moderate scale this becomes the bottleneck.

Benchmark results (k=5, JDK 17, 8g heap, single machine):

Scale BroadcastNearestByJoin Cross-Product Rewrite Speedup
10K×10K 4.9s 41s 8.4x
30K×30K 78s 415s 5.3x
50K×50K 234s >20min (timed out) ≥5x
200K×200K ~62min ~5h (extrapolated) ~5x

The speedup is consistently ≥5x across scales. At smaller scales the ratio appears higher (8x at 10K) because Spark scheduling overhead is a larger fraction of the cross-product's total time. Under memory-constrained configurations (1-2g heap), the speedup is larger as the cross-product approach incurs significant GC pressure from intermediate materialization.

Why a dedicated operator instead of optimizing the existing rewrite?

The cross-join materializes all N×M rows before the aggregate can bound them. There is no "streaming top-k aggregate" that can short-circuit the cross-join mid-execution. The operator avoids this by never materializing the full product — for each left row, only k heap entries exist at any time.

Does this PR introduce any user-facing change?

No. The feature is opt-in via a new SQLConf that defaults to false. When disabled, the existing RewriteNearestByJoin path is used unchanged.

How was this patch tested?

  • BroadcastNearestByJoinExecSuite: 14 tests covering empty tables, k boundary cases, NaN/null exclusion, integer ranking types, similarity direction, tie-breaking, asymmetric sizes, broadcast threshold fallback, and plan verification
  • NearestByJoinBenchmark: comparative benchmark (cross-product vs broadcast-heap)
  • Existing DataFrameNearestByJoinSuite: passes with default conf (no regression)

Was this patch authored or co-authored using generative AI tooling?

Yes.


JIRA: SPARK-57091

@yadavay-amzn yadavay-amzn force-pushed the proto/nearestby-streaming-heap branch 3 times, most recently from 8cabc34 to bc2be19 Compare May 26, 2026 18:14
@yadavay-amzn yadavay-amzn changed the title [SPARK-XXXXX][SQL] Add streaming heap operator for NearestByJoin [SPARK-XXXXX][SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization May 26, 2026
@yadavay-amzn yadavay-amzn force-pushed the proto/nearestby-streaming-heap branch 3 times, most recently from 72c424a to 83eebbf Compare May 26, 2026 20:38
@yadavay-amzn yadavay-amzn changed the title [SPARK-XXXXX][SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization [SPARK-57091][SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization May 26, 2026
@yadavay-amzn yadavay-amzn force-pushed the proto/nearestby-streaming-heap branch from 83eebbf to 80696c1 Compare May 27, 2026 00:00
Implements StreamingNearestByJoinExec that uses a broadcast right side
+ k-sized heap per left row, avoiding the N*M cross-product materialization.

Memory benchmark results (30K x 30K, k=5):
- Streaming Heap: 31s, ~208 MB memory delta
- Cross-product:  404s, ~1733 MB memory delta
- Memory ratio:   8.3x less memory for streaming heap
- Time ratio:     12.9x faster

At constrained heap sizes (<=1GB), cross-product OOMs while
streaming heap completes with ~200MB.
@yadavay-amzn yadavay-amzn force-pushed the proto/nearestby-streaming-heap branch from 80696c1 to abc2136 Compare May 27, 2026 00:35
@yadavay-amzn yadavay-amzn marked this pull request as ready for review May 27, 2026 01:54
@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

@dilipbiswal @cloud-fan Could you please take a look at this change when you get a chance?
This PR adds a dedicated BroadcastNearestByJoinExec operator that avoids the N * M cross-product materialization in RewriteNearestByJoin (noted as an out of scope improvement for SPARK-56395).

This improvement is inspired by @sarutak's SortMergeAsOfJoinExec for AS-OF join (#55912) and is similar in spirit to it by using a dedicated physical operator to replace an expensive rewrite for a specialized join type.

The operator broadcasts the right side and maintains a bounded k-heap per left row to avoid the shuffling cost and full materialization when the right table fits within autoBroadcastJoinThreshold. Benchmark numbers have been promising (see description). I've also added several unit tests covering correctness and edge cases.

Does this approach align with the planned evolution of the feature? Any concerns about adding a dedicated physical operator vs. optimizing the existing rewrite?
Happy to collaborate and adjust the approach based on your feedback. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant