[#19711] Fix missing step_id in Python SDK worker logs during DoFn setup #37413
+16
−6
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #19711
This PR addresses the issue where
step_id(instruction ID) was consistently missing or empty in worker logs generated during theDoFn.setup()lifecycle method.Rationale
The
FnApiLogRecordHandlerrelies onstatesamplerthread-local storage to populate theinstruction_idin log entries. Previously, theBundleProcessorexecuted thesetup()method for operations before the thread-local context was fully initialized for that instruction, causing logs emitted during setup to become orphaned (missing metadata).Changes
sdks/python/apache_beam/runners/worker/sdk_worker.py: Updatedcreate_bundle_processorto pass the activeinstruction_idinto theBundleProcessorconstructor.sdks/python/apache_beam/runners/worker/bundle_processor.py:__init__to acceptinstruction_id.instruction_idinto thestatesamplercontext specifically while iterating through operations to callop.setup().sdks/python/apache_beam/runners/worker/log_handler.py: Updatedemit()to checkrecord.instruction_idbefore falling back to thread-local storage, ensuring explicitly injected IDs are respected.Verification
I verified this fix locally using a reproduction script which forces a log during
setup().setup()hadinstruction_id: None.setup()correctly display theinstruction_id(e.g.,bundle_...).fixes #19711).CHANGES.mdwith noteworthy changes.