Move FileIO close from RecordWriter to RecordWriterManager#37782
Move FileIO close from RecordWriter to RecordWriterManager#37782dejii wants to merge 3 commits intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the management of Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
f4590e7 to
5235f9b
Compare
|
Assigning reviewers: R: @Abacn for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Did some digging to verify some assumptions. It looks like the catalog creates a new TableOperations instance per table: Update: Ahh but RESTCatalog indeed uses the same FileIO instance across each table: I think your change is good, this is just some further nuance: But your change still covers this case because it iterates over each table and checks for unique IOs to close |
ahmedabu98
left a comment
There was a problem hiding this comment.
Thanks Deji! Had a few nits regarding the comments, and one request to add an extra check. Overall looks great
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
Outdated
Show resolved
Hide resolved
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
Outdated
Show resolved
Hide resolved
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
Show resolved
Hide resolved
|
Edited some of my suggestions. It's hard to say with certainty whether it is one FileIO per table, or if a FileIO is shared across all tables. It appears the catalog has the freedom to implement it either way |
|
@ahmedabu98 Thanks for the review! I've addressed your comments:
|
Summary
RecordWriter- it no longer owns the FileIO lifecycleRecordWriterManager.close(), which collects unique FileIO instances across all destinations and closes each once in a finally blockRoot Cause
table.io()returns the catalog's sharedFileIOinstance. Catalogs likeRESTSessionCatalogtrack and reuse a single FileIO across tables viaFileIOTracker(ref). WhenRecordWriter.close()calledio.close(), it shut down the shared connection pool.RecordWriterManager.close()iterates destinations and callscache.invalidateAll()on each, which triggersRecordWriter.close()via the removal listener. The first destination's writers close successfully but kill the shared pool - subsequent destinations fail with:The fix moves FileIO close to
RecordWriterManager.close(), which deduplicates by identity and closes each shared instance exactly once after all writers have flushed.Related: #36438
I can confirm this works fix works great when running in Dataflow.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.