Share per-chunk JoinLeftData across right partitions in NLJ memory-limited fallback#22038
Open
viirya wants to merge 4 commits intoapache:mainfrom
Open
Share per-chunk JoinLeftData across right partitions in NLJ memory-limited fallback#22038viirya wants to merge 4 commits intoapache:mainfrom
viirya wants to merge 4 commits intoapache:mainfrom
Conversation
Adds five tests reproducing the cross-partition coordination bug in NestedLoopJoinExec's memory-limited fallback path: each output partition independently constructs a per-chunk JoinLeftData with AtomicUsize::new(1), so the left visited bitmap and probe-thread counter are not shared across right partitions. For LEFT, FULL, LEFT SEMI, LEFT ANTI, and LEFT MARK, this causes wrong results when target_partitions > 1 (duplicate unmatched rows; for LEFT SEMI, duplicate matched rows when multiple right partitions match the same left row). The tests are #[ignore]'d for now (with reason) and re-enabled in a follow-up commit that implements the shared-state fix. Discussed in apache#21833 (comment) and apache#21833 (comment) Co-authored-by: Claude Code
…ons in NLJ memory-limited fallback When `target_partitions > 1`, the memory-limited fallback path was building a per-output-partition `JoinLeftData` with `AtomicUsize::new(1)` for each left chunk, so each partition emitted unmatched left rows based only on its own right-side matches. For LEFT, FULL, LEFT SEMI, LEFT ANTI, and LEFT MARK, this produced wrong results (duplicate unmatched rows; for LEFT SEMI, duplicate matched rows when multiple right partitions matched the same left row). This change introduces a plan-level `FallbackCoordinator` that: - Owns the left spill stream and a single chunk-sized `MemoryReservation`, - Has the first partition reaching a chunk become its "leader": it loads the chunk and publishes an `Arc<JoinLeftData>` (with `probe_threads_counter == right_partition_count`) into a shared slot, - Lets every other right partition take an `Arc` clone of the same `JoinLeftData`, so the visited bitmap and probe-thread counter are shared exactly as in the single-pass `collect_left_input` path, - Releases the slot only after the partition that brings the counter to zero finishes emitting unmatched left rows for the chunk, then notifies waiters so the next chunk can be loaded. The per-chunk in-flight fetch and release are driven through `BoxFuture` fields on `SpillStateActive`, polled across `poll_next` iterations. The FULL-join multi-partition guard added in apache#21833 is removed; FULL joins now use the shared coordination path. The five `#[ignore]`-d multi-partition correctness tests added in the previous commit are unignored and now pass. `test_overallocation` is updated to expect FULL multi-partition to spill (not OOM). Discussed in apache#21833 (comment) and apache#21833 (comment) Co-authored-by: Claude Code
Adds end-to-end SLT cases under target_partitions=4 + tight memory_limit, covering LEFT, FULL, LEFT SEMI, and LEFT ANTI joins. Each query uses a non-equi predicate that forces NLJ and verifies that left-row counts (matched/unmatched) match the single-partition expectation, exercising the cross-partition shared-state path introduced in the previous commit. Co-authored-by: Claude Code
The previous SLT cases verified output correctness under target_partitions=4 + tight memory_limit, but did not assert that the memory-limited fallback path was actually taken. Add an EXPLAIN ANALYZE assertion that the NestedLoopJoinExec line shows spill_count=2, confirming both the left-side and right-side spills fired. Co-authored-by: Claude Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
When
target_partitions > 1, the memory-limited fallback path was building a per-output-partitionJoinLeftDatawithAtomicUsize::new(1)for each left chunk, so each partition emitted unmatched left rows based only on its own right-side matches. For LEFT, FULL, LEFT SEMI, LEFT ANTI, and LEFT MARK, this produced wrong results (duplicate unmatched rows; for LEFT SEMI, duplicate matched rows when multiple right partitions matched the same left row).What changes are included in this PR?
This change introduces a plan-level
FallbackCoordinatorthat:MemoryReservation,Arc<JoinLeftData>(withprobe_threads_counter == right_partition_count) into a shared slot,Arcclone of the sameJoinLeftData, so the visited bitmap and probe-thread counter are shared exactly as in the single-passcollect_left_inputpath,The per-chunk in-flight fetch and release are driven through
BoxFuturefields onSpillStateActive, polled acrosspoll_nextiterations.The FULL-join multi-partition guard added in #21833 is removed; FULL joins now use the shared coordination path.
Discussed in #21833 (comment) and #21833 (comment).
Are these changes tested?
Added five multi-partition correctness tests.
test_overallocationis updated to expect FULL multi-partition to spill (not OOM). Added multi-partition NLJ spill SLT cases.Are there any user-facing changes?
No