Spark 4.1: Fix async microbatch plan bugs#15670
Conversation
|
Regarding @yingjianwu98 comment on #15299 (comment)
I think no, fillQueue should be at least run once per refresh, but we want to run more frequently as refresh might bring more snapshots that fillQueue will go through one by one. |
8155a32 to
ce9b293
Compare
|
@bryanck @singhpk234 please take a look and review, thank you! |
| queuedRowCount.get()); | ||
|
|
||
| // Convert to list for indexed access | ||
| List<Pair<StreamingOffset, FileScanTask>> queueList = Lists.newArrayList(queue); |
There was a problem hiding this comment.
Why are we building this list? We can get the tail now with peekLast()
There was a problem hiding this comment.
Nvm, I see the issues with this
There was a problem hiding this comment.
Let me know if there's anything I can do (comments, etc) that could make it easier to understand
There was a problem hiding this comment.
I would probably rename this something like "queueSnapshot" since we are basically trying to extract a point in time view of the queue, the indexing isn't important any more (or the list type)
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
Show resolved
Hide resolved
| for (FileScanTask task : tasks) { | ||
| expectedBatchCount += 1; | ||
| } | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
We can just throw, we don't need to catch here
| table.refresh(); | ||
| int expectedBatchCount = 0; | ||
| try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) { | ||
| for (FileScanTask task : tasks) { |
There was a problem hiding this comment.
Could replace this with Iterables.size(tasks)
|
|
||
| // Synchronously add data to the queue to meet our initial constraints. | ||
| // For Trigger.AvailableNow, constructor-time preload is normally initialized from | ||
| // latestOffset(...) with no explicit end offset, so bounded preload must stop at the cap. |
There was a problem hiding this comment.
i'm not sure I understand this comment, what is the cap?
There was a problem hiding this comment.
The cap here refers to the AvailableNowTrigger limit, which I understand prevents reading beyond what's available now, even if later there's more data. I would check to make sure initial preloading doesn't cross that.
ce9b293 to
8cde4b2
Compare
|
Thank you @RussellSpitzer for the review! I addressed your comments please let me know if I missed anything |
|
That's it for me, but I really think @bryanck should take a look before we merge |
|
This LGTM as well, thanks for the updates @RjLi13 and for the review @RussellSpitzer ! |
|
Also, thanks @yingjianwu98 for your review as well! |
|
Thank you @bryanck @yingjianwu98 @singhpk234 @RussellSpitzer for the reviews and helping me get this feature in! |
This PR targets fixes in response to @yingjianwu98 review. See #15299
This addresses in the review:
Two more bug fixes
One improvement:
The async planner feature is disabled for users by default.