Forward reserved_ip_ranges through Vertex AI pipeline_job hook (#62733)#66625
Forward reserved_ip_ranges through Vertex AI pipeline_job hook (#62733)#66625jbbqqf wants to merge 1 commit intoapache:mainfrom
Conversation
…e#62733) `PipelineJobHook.run_pipeline_job` and `submit_pipeline_job` accepted `service_account`, `network`, `create_request_timeout`, and `experiment` but never plumbed `reserved_ip_ranges` to the underlying `PipelineJob.submit()` call, even though google-cloud-aiplatform supports the kwarg. Users had no way to pin a Pipeline Job to a reserved IP range through `RunPipelineJobOperator`. Add the optional parameter end-to-end (operator → hook → SDK) and only forward it to `submit()` when set, so deployments still pinned to older aiplatform releases keep working. Add three regression tests covering the forward, the omit-when-unset case, and the synchronous variant. Closes: apache#62733 Signed-off-by: jbbqqf <jbbqqf@users.noreply.github.com>
SameerMesiah97
left a comment
There was a problem hiding this comment.
Left a few comments. I think this just needs some refinement before merge.
| } | ||
|
|
||
| def execute(self, context: Context): | ||
| self.log.info("Running Pipeline job") |
There was a problem hiding this comment.
Looks like you will need add the check you added above here too:
submit_kwargs = {
"project_id": self.project_id,
"region": self.region,
"display_name": self.display_name,
"template_path": self.template_path,
"job_id": self.job_id,
"pipeline_root": self.pipeline_root,
"parameter_values": self.parameter_values,
"input_artifacts": self.input_artifacts,
"enable_caching": self.enable_caching,
"encryption_spec_key_name": self.encryption_spec_key_name,
"labels": self.labels,
"failure_policy": self.failure_policy,
"service_account": self.service_account,
"network": self.network,
"create_request_timeout": self.create_request_timeout,
"experiment": self.experiment,
}
if self.reserved_ip_ranges is not None:
submit_kwargs["reserved_ip_ranges"] = self.reserved_ip_ranges
Right now, it is always passing None for reserved_ip_ranges which is causing TestVertexAIRunPipelineJobOperator::test_execute to fail.
| # through both submit_pipeline_job and run_pipeline_job, to keep the kwarg | ||
| # forwarded to PipelineJob.submit() in lockstep with the operator surface. | ||
| # We assert by-keyword to make sure the value lands on the SDK boundary | ||
| # rather than being silently swallowed (the bug behavior on origin/main). |
There was a problem hiding this comment.
I think this would be better communicated as shorter comments and/or docstrings on the individual tests. Right now the large block reads more like PR rationale than test-specific context.
| assert submit_kwargs.get("reserved_ip_ranges") == ["range-1"] | ||
| # run_pipeline_job is the synchronous variant: it must also block on wait() | ||
| mock_job.wait.assert_called_once() | ||
|
|
There was a problem hiding this comment.
I would add a test for empty reserved_ip_ranges. Please refer to the below test for guidance:
@mock.patch(PIPELINE_JOB_STRING.format("PipelineJobHook.get_pipeline_job_object"))
def test_submit_pipeline_job_forwards_empty_reserved_ip_ranges(self, mock_get_job) -> None:
mock_job = mock_get_job.return_value
self.hook.submit_pipeline_job(
project_id=TEST_PROJECT_ID,
region=TEST_REGION,
display_name="display",
template_path="gs://bucket/template.json",
reserved_ip_ranges=[],
)
mock_job.submit.assert_called_once()
_, submit_kwargs = mock_job.submit.call_args
assert submit_kwargs["reserved_ip_ranges"] == []
| that can be used for this PipelineJob. If set, only IP addresses from these reserved ranges will | ||
| be used; otherwise, all IPs in the VPC are eligible. Requires ``network`` to be configured. | ||
| See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit | ||
| :param gcp_conn_id: The connection ID to use connecting to Google Cloud. |
There was a problem hiding this comment.
The link could be removed to make it consistent with the other docstrings for the same parameter.
|
@jbbqqf Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.
See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
closes: #62733
Summary
PipelineJobHook.run_pipeline_jobandPipelineJobHook.submit_pipeline_jobaccept
service_account,network,create_request_timeout, andexperiment, but they never forwardedreserved_ip_rangesto theunderlying
PipelineJob.submit()call — even thoughgoogle-cloud-aiplatformsupports the kwarg and the Vertex AI Ray clusteroperator already plumbs it through.
Users running Vertex AI pipelines on a VPC with reserved peering ranges had
no way to constrain a pipeline to those ranges through
RunPipelineJobOperator.This PR adds the optional
reserved_ip_ranges: list[str] | Noneparameterto both hook entry points and to
RunPipelineJobOperator, and forwards itto
PipelineJob.submit()only when explicitly set so that users stillon an older aiplatform release that doesn't accept the kwarg keep working.
Context
PipelineJob.submit()—
reserved_ip_ranges: Optional[List[str]] = None.providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/ray.py:189already plumbs
reserved_ip_rangesthrough to the Ray cluster create call.Changes
providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/pipeline_job.py— add
reserved_ip_rangestorun_pipeline_jobandsubmit_pipeline_job. Thesubmit()call uses a kwargs dict that onlyincludes
reserved_ip_rangeswhen it's set; a one-line code commentdocuments why (older aiplatform versions reject unknown kwargs).
providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/pipeline_job.py— add
reserved_ip_rangestoRunPipelineJobOperator.__init__andforward it to
submit_pipeline_job(...).providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py— three regression tests:
test_submit_pipeline_job_forwards_reserved_ip_ranges— value lands onthe SDK boundary;
test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset—backwards-compat: kwarg absent when
None;test_run_pipeline_job_forwards_reserved_ip_ranges— same for thesynchronous variant + asserts
wait()is called.Reproduce BEFORE/AFTER yourself (copy-paste)
The reviewer can also run the full hook test file to confirm no regression
elsewhere: same command without
-k reserved_ip_ranges(22 tests).What I ran locally
pytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -k reserved_ip_ranges -xvs→ 3/3 passedpytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -v→ 22/22 passed (full file, no regression)TypeError: ... unexpected keyword argument 'reserved_ip_ranges'— which is the bug behavior reported in the issue.Edge cases tested
reserved_ip_ranges=Nonesubmit()so older aiplatform releases keep workingtest_submit_pipeline_job_omits_reserved_ip_ranges_when_unset["range-1"]submit()unchanged;wait()is also called for the sync varianttest_run_pipeline_job_forwards_reserved_ip_ranges["range-1", "range-2"]submit()unchangedtest_submit_pipeline_job_forwards_reserved_ip_rangesRisk / blast radius
Additive only. No existing call site changes signature semantics:
reserved_ip_rangesdefaults toNone, and thesubmit()call onlyforwards it when non-
None. Users on the deferrable path (deferrable=True)are unaffected —
submit_pipeline_jobis what feeds the deferrable triggerand it already returns the same
PipelineJob.Release note
PR drafted with assistance from Claude Code (per
contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions).The change was reviewed manually against
google-cloud-aiplatform'sPipelineJob.submitsignature and the precedent set byproviders/google/.../operators/vertex_ai/ray.py:189. The reproducerblock above was used during development; it is the same one a reviewer can
paste verbatim. The contributor takes full responsibility for the patch.