Skip to content

Forward reserved_ip_ranges through Vertex AI pipeline_job hook (#62733)#66625

Draft
jbbqqf wants to merge 1 commit intoapache:mainfrom
jbbqqf:feat/62733-reserved-ip-ranges-pipeline-job
Draft

Forward reserved_ip_ranges through Vertex AI pipeline_job hook (#62733)#66625
jbbqqf wants to merge 1 commit intoapache:mainfrom
jbbqqf:feat/62733-reserved-ip-ranges-pipeline-job

Conversation

@jbbqqf
Copy link
Copy Markdown
Contributor

@jbbqqf jbbqqf commented May 9, 2026

closes: #62733

Summary

PipelineJobHook.run_pipeline_job and PipelineJobHook.submit_pipeline_job
accept service_account, network, create_request_timeout, and
experiment, but they never forwarded reserved_ip_ranges to the
underlying PipelineJob.submit() call — even though
google-cloud-aiplatform supports the kwarg and the Vertex AI Ray cluster
operator already plumbs it through.

Users running Vertex AI pipelines on a VPC with reserved peering ranges had
no way to constrain a pipeline to those ranges through
RunPipelineJobOperator.

This PR adds the optional reserved_ip_ranges: list[str] | None parameter
to both hook entry points and to RunPipelineJobOperator, and forwards it
to PipelineJob.submit() only when explicitly set so that users still
on an older aiplatform release that doesn't accept the kwarg keep working.

Context

  • aiplatform reference: PipelineJob.submit()
    reserved_ip_ranges: Optional[List[str]] = None.
  • Existing precedent in this provider:
    providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/ray.py:189
    already plumbs reserved_ip_ranges through to the Ray cluster create call.

Changes

  • providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/pipeline_job.py
    — add reserved_ip_ranges to run_pipeline_job and
    submit_pipeline_job. The submit() call uses a kwargs dict that only
    includes reserved_ip_ranges when it's set; a one-line code comment
    documents why (older aiplatform versions reject unknown kwargs).
  • providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/pipeline_job.py
    — add reserved_ip_ranges to RunPipelineJobOperator.__init__ and
    forward it to submit_pipeline_job(...).
  • providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py
    — three regression tests:
    • test_submit_pipeline_job_forwards_reserved_ip_ranges — value lands on
      the SDK boundary;
    • test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset
      backwards-compat: kwarg absent when None;
    • test_run_pipeline_job_forwards_reserved_ip_ranges — same for the
      synchronous variant + asserts wait() is called.

Reproduce BEFORE/AFTER yourself (copy-paste)

# --- one-time setup (skip if you already have a working Airflow dev env) ---
git clone https://github.com/apache/airflow.git /tmp/repro-62733 && cd /tmp/repro-62733
python -m venv .venv && source .venv/bin/activate
pip install -e ./task-sdk -e ./airflow-core -e ./providers/google
pip install pytest google-cloud-aiplatform google-auth-httplib2 \
    google-api-python-client google-cloud-secret-manager \
    gcloud-aio-bigquery gcloud-aio-storage gcloud-aio-auth time-machine

# --- BEFORE (origin/main) — should FAIL ---
git checkout origin/main
git checkout feat/62733-reserved-ip-ranges-pipeline-job -- \
    providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py
cd providers/google
PYTHONPATH=../../devel-common/src:. python -m pytest \
    tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py \
    -k reserved_ip_ranges -xvs --rootdir=.
# Expected: TypeError: PipelineJobHook.submit_pipeline_job() got an unexpected
#           keyword argument 'reserved_ip_ranges'  → FAILED.

# --- AFTER (this PR) — should PASS ---
cd /tmp/repro-62733
git checkout feat/62733-reserved-ip-ranges-pipeline-job
cd providers/google
PYTHONPATH=../../devel-common/src:. python -m pytest \
    tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py \
    -k reserved_ip_ranges -xvs --rootdir=.
# Expected: 3 passed.

The reviewer can also run the full hook test file to confirm no regression
elsewhere: same command without -k reserved_ip_ranges (22 tests).

What I ran locally

  • pytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -k reserved_ip_ranges -xvs3/3 passed
  • pytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -v22/22 passed (full file, no regression)
  • BEFORE on a stash of the implementation (tests only): the three new tests fail with TypeError: ... unexpected keyword argument 'reserved_ip_ranges' — which is the bug behavior reported in the issue.

Edge cases tested

# Scenario Input Expected Verified by
1 Default (unset) reserved_ip_ranges=None kwarg is not forwarded to submit() so older aiplatform releases keep working test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset
2 Single range ["range-1"] value lands on submit() unchanged; wait() is also called for the sync variant test_run_pipeline_job_forwards_reserved_ip_ranges
3 Multiple ranges ["range-1", "range-2"] full list lands on submit() unchanged test_submit_pipeline_job_forwards_reserved_ip_ranges

Risk / blast radius

Additive only. No existing call site changes signature semantics:
reserved_ip_ranges defaults to None, and the submit() call only
forwards it when non-None. Users on the deferrable path (deferrable=True)
are unaffected — submit_pipeline_job is what feeds the deferrable trigger
and it already returns the same PipelineJob.

Release note

Add ``reserved_ip_ranges`` parameter to ``RunPipelineJobOperator`` and the
underlying Vertex AI ``PipelineJobHook.run_pipeline_job`` /
``submit_pipeline_job`` methods, allowing users to pin a Vertex AI Pipeline
Job to specific reserved IP ranges of a peered VPC.

PR drafted with assistance from Claude Code (per
contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions).
The change was reviewed manually against google-cloud-aiplatform's
PipelineJob.submit signature and the precedent set by
providers/google/.../operators/vertex_ai/ray.py:189. The reproducer
block above was used during development; it is the same one a reviewer can
paste verbatim. The contributor takes full responsibility for the patch.

…e#62733)

`PipelineJobHook.run_pipeline_job` and `submit_pipeline_job` accepted
`service_account`, `network`, `create_request_timeout`, and `experiment`
but never plumbed `reserved_ip_ranges` to the underlying
`PipelineJob.submit()` call, even though google-cloud-aiplatform supports
the kwarg. Users had no way to pin a Pipeline Job to a reserved IP range
through `RunPipelineJobOperator`. Add the optional parameter end-to-end
(operator → hook → SDK) and only forward it to `submit()` when set, so
deployments still pinned to older aiplatform releases keep working.
Add three regression tests covering the forward, the omit-when-unset
case, and the synchronous variant.

Closes: apache#62733
Signed-off-by: jbbqqf <jbbqqf@users.noreply.github.com>
@jbbqqf jbbqqf requested a review from shahar1 as a code owner May 9, 2026 16:05
@boring-cyborg boring-cyborg Bot added area:providers provider:google Google (including GCP) related issues labels May 9, 2026
Copy link
Copy Markdown
Contributor

@SameerMesiah97 SameerMesiah97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments. I think this just needs some refinement before merge.

}

def execute(self, context: Context):
self.log.info("Running Pipeline job")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you will need add the check you added above here too:

submit_kwargs = {
    "project_id": self.project_id,
    "region": self.region,
    "display_name": self.display_name,
    "template_path": self.template_path,
    "job_id": self.job_id,
    "pipeline_root": self.pipeline_root,
    "parameter_values": self.parameter_values,
    "input_artifacts": self.input_artifacts,
    "enable_caching": self.enable_caching,
    "encryption_spec_key_name": self.encryption_spec_key_name,
    "labels": self.labels,
    "failure_policy": self.failure_policy,
    "service_account": self.service_account,
    "network": self.network,
    "create_request_timeout": self.create_request_timeout,
    "experiment": self.experiment,
}

if self.reserved_ip_ranges is not None:
    submit_kwargs["reserved_ip_ranges"] = self.reserved_ip_ranges

Right now, it is always passing None for reserved_ip_ranges which is causing TestVertexAIRunPipelineJobOperator::test_execute to fail.

# through both submit_pipeline_job and run_pipeline_job, to keep the kwarg
# forwarded to PipelineJob.submit() in lockstep with the operator surface.
# We assert by-keyword to make sure the value lands on the SDK boundary
# rather than being silently swallowed (the bug behavior on origin/main).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better communicated as shorter comments and/or docstrings on the individual tests. Right now the large block reads more like PR rationale than test-specific context.

assert submit_kwargs.get("reserved_ip_ranges") == ["range-1"]
# run_pipeline_job is the synchronous variant: it must also block on wait()
mock_job.wait.assert_called_once()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a test for empty reserved_ip_ranges. Please refer to the below test for guidance:

@mock.patch(PIPELINE_JOB_STRING.format("PipelineJobHook.get_pipeline_job_object"))
def test_submit_pipeline_job_forwards_empty_reserved_ip_ranges(self, mock_get_job) -> None:
    mock_job = mock_get_job.return_value

    self.hook.submit_pipeline_job(
        project_id=TEST_PROJECT_ID,
        region=TEST_REGION,
        display_name="display",
        template_path="gs://bucket/template.json",
        reserved_ip_ranges=[],
    )

    mock_job.submit.assert_called_once()
    _, submit_kwargs = mock_job.submit.call_args
    assert submit_kwargs["reserved_ip_ranges"] == []

that can be used for this PipelineJob. If set, only IP addresses from these reserved ranges will
be used; otherwise, all IPs in the VPC are eligible. Requires ``network`` to be configured.
See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The link could be removed to make it consistent with the other docstrings for the same parameter.

@potiuk potiuk marked this pull request as draft May 11, 2026 03:55
@potiuk
Copy link
Copy Markdown
Member

potiuk commented May 11, 2026

@jbbqqf Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.

  • Provider tests — Failing: 4 provider distributions tests / Compat runs against Airflow 2.11.1, 3.0.6, 3.1.8, 3.2.1 (P3.10). See docs.

See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.


Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Include reserved_ip_ranges to airflow.providers.google.cloud.operators.vertex_ai.pipeline_job.RunPipelineJobTrigger

3 participants