[Python] Add UnboundedSource ValidatesRunner test on portable runners#38892
[Python] Add UnboundedSource ValidatesRunner test on portable runners#38892Eliaaazzz wants to merge 1 commit into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces portable ValidatesRunner coverage for the UnboundedSource SDF wrapper in the Python SDK. By integrating these tests into the PortableRunnerTest suite, the changes ensure that unbounded data sources function correctly across various portable runner environments, including Flink and Prism, while validating watermark propagation and runner-API round-trip consistency. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces experimental support for UnboundedSource in the Python SDK by implementing an SDF-based wrapper. It includes the necessary base classes, restriction providers, and DoFn logic to handle unbounded data streams, along with comprehensive unit tests and integration tests. My review identified two potential issues: one regarding the potential breakage of legacy unbounded sources due to hardcoded boundedness in iobase.py, and another concerning the brittle use of private attributes to close readers in unbounded_source.py.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED | ||
| if self.source.is_bounded() else | ||
| beam_runner_api_pb2.IsBounded.UNBOUNDED)) | ||
| is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED)) |
There was a problem hiding this comment.
Hardcoding is_bounded to BOUNDED seems incorrect. This code path is taken by legacy sources that are not handled by the SDF-based BoundedSource or the new UnboundedSource paths. These legacy sources can be unbounded, and they rely on the is_bounded() method to signal this. This change appears to break support for such legacy unbounded sources by always treating them as bounded.
is_bounded=(beam_runner_api_pb2.IsBounded.BOUNDED if self.source.is_bounded() else beam_runner_api_pb2.IsBounded.UNBOUNDED))There was a problem hiding this comment.
The branch condition is unchanged from master (isinstance(self.source, BoundedSource)), so legacy non-BoundedSource sources are unaffected: _PubSubSource keeps its dedicated branch above, and other sources raised NotImplementedError on master as well. The actual delta was a BoundedSource subclass overriding is_bounded() to False; restored master's conditional in 6e6a40c.
| inner_tracker = tracker | ||
| if hasattr(inner_tracker, '_threadsafe_restriction_tracker'): | ||
| inner_tracker = inner_tracker._threadsafe_restriction_tracker | ||
| if hasattr(inner_tracker, '_restriction_tracker'): | ||
| inner_tracker = inner_tracker._restriction_tracker | ||
| if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker): | ||
| inner_tracker._close_reader_if_open() | ||
| else: | ||
| _LOGGER.warning( | ||
| 'UnboundedSource DoFn could not close a reader because the SDF ' | ||
| 'tracker wrapper did not expose ' | ||
| '_UnboundedSourceRestrictionTracker (got %s). Reader resources ' | ||
| 'may remain open until garbage collection.', | ||
| type(inner_tracker).__name__) |
There was a problem hiding this comment.
This logic to release the reader by accessing private attributes (_threadsafe_restriction_tracker, _restriction_tracker) is brittle and breaks encapsulation. It creates a tight coupling with the internal implementation of the SDF tracker wrappers. If the wrapper implementation changes, this code is likely to break.
Consider exploring alternatives that don't rely on implementation details. For example, could the RestrictionTracker interface be extended with a close() method that the framework would call automatically at the end of a bundle?
There was a problem hiding this comment.
The unwrapping is guarded by hasattr at each level and an isinstance check before the close call, with a warning fallback that names the unexpected tracker type, so a wrapper change degrades gracefully (the reader is closed at GC) rather than breaking. A framework-invoked close() on RestrictionTracker would be an SDK-wide interface change; deferring that from this experimental feature.
893f040 to
1ae3fe7
Compare
Add test_unbounded_source_read to PortableRunnerTest so the portable ValidatesRunner suites exercise the UnboundedSource SDF wrapper end to end: read a self-terminating source through the job service, assert the elements and that the EOF MAX_TIMESTAMP watermark lets a downstream FixedWindows + GroupByKey fire. The embedded portable runner variants, the Flink suites, and PrismRunnerTest inherit the test. SparkRunnerTest skips it because portable Spark does not execute SDFs (apache#19468), matching its other SDF test skips. Includes the UnboundedSource SDF wrapper from apache#38724, which this PR is stacked on.
1ae3fe7 to
d764862
Compare
|
Assigning reviewers: R: @damccorm for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Stacked on #38724, squashed into a single commit. The new change relative to #38724 is
test_unbounded_source_readinsdks/python/apache_beam/runners/portability/portable_runner_test.py; the rest of the diff is theUnboundedSourceimplementation under review there. Will rebase onto master once #38724 merges. Addresses #19137.This adds portable ValidatesRunner coverage for the
UnboundedSourceSDF wrapper introduced in #38724.test_unbounded_source_readonPortableRunnerTestreads a self-terminatingUnboundedCountingSource(5)through a real job service and asserts:[0..4]arrive,MAX_TIMESTAMPwatermark propagates so a downstreamFixedWindows(100)+GroupByKeyfires.This complements the DirectRunner tests in
apache_beam.io.unbounded_source_testby exercising the runner-API round trip and watermark propagation through the portable job service path.Suites inheriting the test:
PortableRunnerTestand its embedded variants (external env, subprocesses, subprocesses + multi-worker, local Docker)FlinkRunnerTest,FlinkRunnerTestOptimized,FlinkRunnerTestStreamingPrismRunnerTestSparkRunnerTestskips it: portable Spark does not execute SDFs (Spark portable runner: support SDF #19468), matching its existingtest_sdf*skips.Local verification (Windows, Python 3.11):
pytest apache_beam/runners/portability/portable_runner_test.py -k test_unbounded_source_read→ 6 passed, 1 skipped (PortableRunnerOptimizedis class-level skipped, Support "beam:runner:executable_stage:v1" on fn_api_runner #19422):runners:flink:1.20:job-server:shadowJar,--environment_type=LOOPBACK):FlinkRunnerTest,FlinkRunnerTestOptimized,FlinkRunnerTestStreamingall pass--environment_type=LOOPBACK): passesCHANGES.mdis intentionally left untouched; the announcement is deferred until the ValidatesRunner milestones complete, per review on #38724.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.