Skip to content

Spark 4.1: Fix async microbatch plan bugs#15670

Merged
bryanck merged 9 commits intoapache:mainfrom
RjLi13:fix-async-microbatch-plan-bugs
Apr 1, 2026
Merged

Spark 4.1: Fix async microbatch plan bugs#15670
bryanck merged 9 commits intoapache:mainfrom
RjLi13:fix-async-microbatch-plan-bugs

Conversation

@RjLi13
Copy link
Copy Markdown
Contributor

@RjLi13 RjLi13 commented Mar 17, 2026

This PR targets fixes in response to @yingjianwu98 review. See #15299

This addresses in the review:

  • Fixing comparing iceberg snapshot Id (changed to equality as soon as we hit AvailableNow cap to stop) so we do not stop early since iceberg snapshot ids are random
  • handle fillQueueFailedThrowable in planFiles to mirror latestOffset

Two more bug fixes

  • Handle AvailableNow cap during queue preload so that we do not load more into queue than the AvailableNowTrigger cap.
  • Handle a race condition where the queue can continue filling up while iterating through the copied queueList, but end up using the actual queue's tail instead of the copy, which can be different.

One improvement:

  • Remove synchronization of queue to natively use LinkedBlockingDeque (and remove tracking tail variable)
  • Tests written to catch all of this

The async planner feature is disabled for users by default.

@github-actions github-actions bot added the spark label Mar 17, 2026
@RjLi13
Copy link
Copy Markdown
Contributor Author

RjLi13 commented Mar 17, 2026

Regarding @yingjianwu98 comment on #15299 (comment)

Does it make sense to combine refresh and fillQueue in the single thread?

I feel fillQueue will probably want the latest state of the table

I think no, fillQueue should be at least run once per refresh, but we want to run more frequently as refresh might bring more snapshots that fillQueue will go through one by one.

@RjLi13 RjLi13 marked this pull request as ready for review March 20, 2026 17:41
@RjLi13 RjLi13 force-pushed the fix-async-microbatch-plan-bugs branch from 8155a32 to ce9b293 Compare March 26, 2026 00:58
@RjLi13
Copy link
Copy Markdown
Contributor Author

RjLi13 commented Mar 26, 2026

@bryanck @singhpk234 please take a look and review, thank you!

queuedRowCount.get());

// Convert to list for indexed access
List<Pair<StreamingOffset, FileScanTask>> queueList = Lists.newArrayList(queue);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we building this list? We can get the tail now with peekLast()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, I see the issues with this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if there's anything I can do (comments, etc) that could make it easier to understand

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably rename this something like "queueSnapshot" since we are basically trying to extract a point in time view of the queue, the indexing isn't important any more (or the list type)

for (FileScanTask task : tasks) {
expectedBatchCount += 1;
}
} catch (IOException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just throw, we don't need to catch here

table.refresh();
int expectedBatchCount = 0;
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
for (FileScanTask task : tasks) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could replace this with Iterables.size(tasks)


// Synchronously add data to the queue to meet our initial constraints.
// For Trigger.AvailableNow, constructor-time preload is normally initialized from
// latestOffset(...) with no explicit end offset, so bounded preload must stop at the cap.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure I understand this comment, what is the cap?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cap here refers to the AvailableNowTrigger limit, which I understand prevents reading beyond what's available now, even if later there's more data. I would check to make sure initial preloading doesn't cross that.

@RjLi13 RjLi13 force-pushed the fix-async-microbatch-plan-bugs branch from ce9b293 to 8cde4b2 Compare March 31, 2026 19:12
@RjLi13
Copy link
Copy Markdown
Contributor Author

RjLi13 commented Mar 31, 2026

Thank you @RussellSpitzer for the review! I addressed your comments please let me know if I missed anything

@RussellSpitzer
Copy link
Copy Markdown
Member

That's it for me, but I really think @bryanck should take a look before we merge

@RussellSpitzer RussellSpitzer requested a review from bryanck March 31, 2026 21:50
@bryanck
Copy link
Copy Markdown
Contributor

bryanck commented Mar 31, 2026

This LGTM as well, thanks for the updates @RjLi13 and for the review @RussellSpitzer !

@bryanck bryanck merged commit 8504800 into apache:main Apr 1, 2026
25 checks passed
@bryanck
Copy link
Copy Markdown
Contributor

bryanck commented Apr 1, 2026

Also, thanks @yingjianwu98 for your review as well!

@RjLi13 RjLi13 deleted the fix-async-microbatch-plan-bugs branch April 1, 2026 17:49
@RjLi13
Copy link
Copy Markdown
Contributor Author

RjLi13 commented Apr 1, 2026

Thank you @bryanck @yingjianwu98 @singhpk234 @RussellSpitzer for the reviews and helping me get this feature in!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants