Skip to content

Commit ab99e00

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: Support "reservedIpRanges" parameter in PipelineJob run() and submit() methods.
PiperOrigin-RevId: 592024354
1 parent 17dc9b7 commit ab99e00

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed

google/cloud/aiplatform/pipeline_job_schedules.py

+4
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ def __init__(
110110
create_pipeline_job_request["pipeline_job"][
111111
"encryption_spec"
112112
] = pipeline_job._gca_resource.encryption_spec
113+
if "reserved_ip_ranges" in pipeline_job._gca_resource:
114+
create_pipeline_job_request["pipeline_job"][
115+
"reserved_ip_ranges"
116+
] = pipeline_job._gca_resource.reserved_ip_ranges
113117
pipeline_job_schedule_args = {
114118
"display_name": display_name,
115119
"create_pipeline_job_request": create_pipeline_job_request,

google/cloud/aiplatform/pipeline_jobs.py

+20
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ def run(
294294
self,
295295
service_account: Optional[str] = None,
296296
network: Optional[str] = None,
297+
reserved_ip_ranges: Optional[List[str]] = None,
297298
sync: Optional[bool] = True,
298299
create_request_timeout: Optional[float] = None,
299300
) -> None:
@@ -309,6 +310,9 @@ def run(
309310
Private services access must already be configured for the network.
310311
If left unspecified, the network set in aiplatform.init will be used.
311312
Otherwise, the job is not peered with any network.
313+
reserved_ip_ranges (List[str]):
314+
Optional. A list of names for the reserved IP ranges under the VPC network that can be used for this PipelineJob's workload.
315+
For example: ['vertex-ai-ip-range'].
312316
sync (bool):
313317
Optional. Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future.
314318
create_request_timeout (float):
@@ -319,6 +323,7 @@ def run(
319323
self._run(
320324
service_account=service_account,
321325
network=network,
326+
reserved_ip_ranges=reserved_ip_ranges,
322327
sync=sync,
323328
create_request_timeout=create_request_timeout,
324329
)
@@ -328,6 +333,7 @@ def _run(
328333
self,
329334
service_account: Optional[str] = None,
330335
network: Optional[str] = None,
336+
reserved_ip_ranges: Optional[List[str]] = None,
331337
sync: Optional[bool] = True,
332338
create_request_timeout: Optional[float] = None,
333339
) -> None:
@@ -342,6 +348,9 @@ def _run(
342348
Optional. The full name of the Compute Engine network to which the job
343349
should be peered. For example, projects/12345/global/networks/myVPC.
344350
Private services access must already be configured for the network.
351+
reserved_ip_ranges (List[str]):
352+
Optional. A list of names for the reserved IP ranges under the VPC network that can be used for this PipelineJob's workload.
353+
For example: ['vertex-ai-ip-range'].
345354
sync (bool):
346355
Optional. Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future.
347356
create_request_timeout (float):
@@ -350,6 +359,7 @@ def _run(
350359
self.submit(
351360
service_account=service_account,
352361
network=network,
362+
reserved_ip_ranges=reserved_ip_ranges,
353363
create_request_timeout=create_request_timeout,
354364
)
355365

@@ -359,6 +369,7 @@ def submit(
359369
self,
360370
service_account: Optional[str] = None,
361371
network: Optional[str] = None,
372+
reserved_ip_ranges: Optional[List[str]] = None,
362373
create_request_timeout: Optional[float] = None,
363374
*,
364375
experiment: Optional[Union[str, experiment_resources.Experiment]] = None,
@@ -376,6 +387,12 @@ def submit(
376387
Private services access must already be configured for the network.
377388
If left unspecified, the network set in aiplatform.init will be used.
378389
Otherwise, the job is not peered with any network.
390+
reserved_ip_ranges (List[str]):
391+
Optional. A list of names for the reserved IP ranges under the VPC
392+
network that can be used for this PipelineJob's workload. For example: ['vertex-ai-ip-range'].
393+
394+
If left unspecified, the job will be deployed to any IP ranges under
395+
the provided VPC network.
379396
create_request_timeout (float):
380397
Optional. The timeout for the create request in seconds.
381398
experiment (Union[str, experiments_resource.Experiment]):
@@ -396,6 +413,9 @@ def submit(
396413
if network:
397414
self._gca_resource.network = network
398415

416+
if reserved_ip_ranges:
417+
self._gca_resource.reserved_ip_ranges = reserved_ip_ranges
418+
399419
try:
400420
output_artifacts_gcs_dir = (
401421
self._gca_resource.runtime_config.gcs_output_directory

tests/unit/aiplatform/test_pipeline_jobs.py

+87
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
_TEST_HTTPS_TEMPLATE_PATH = "https://raw.githubusercontent.com/repo/pipeline.json"
6464
_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}"
6565
_TEST_NETWORK = f"projects/{_TEST_PROJECT}/global/networks/{_TEST_PIPELINE_JOB_ID}"
66+
_TEST_RESERVED_IP_RANGES = ["vertex-ai-ip-range"]
6667

6768
_TEST_PIPELINE_JOB_NAME = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID}"
6869
_TEST_PIPELINE_JOB_LIST_READ_MASK = field_mask.FieldMask(
@@ -231,6 +232,7 @@ def mock_pipeline_service_create():
231232
create_time=_TEST_PIPELINE_CREATE_TIME,
232233
service_account=_TEST_SERVICE_ACCOUNT,
233234
network=_TEST_NETWORK,
235+
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
234236
)
235237
yield mock_create_pipeline_job
236238

@@ -267,6 +269,7 @@ def make_pipeline_job(state):
267269
create_time=_TEST_PIPELINE_CREATE_TIME,
268270
service_account=_TEST_SERVICE_ACCOUNT,
269271
network=_TEST_NETWORK,
272+
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
270273
job_detail=gca_pipeline_job.PipelineJobDetail(
271274
pipeline_run_context=gca_context.Context(
272275
name=_TEST_PIPELINE_JOB_NAME,
@@ -548,6 +551,90 @@ def test_run_call_pipeline_service_create(
548551
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
549552
)
550553

554+
@pytest.mark.parametrize(
555+
"job_spec",
556+
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
557+
)
558+
@pytest.mark.parametrize("sync", [True, False])
559+
def test_run_call_pipeline_service_run_with_reserved_ip_ranges(
560+
self,
561+
mock_pipeline_service_create,
562+
mock_pipeline_service_get,
563+
mock_pipeline_bucket_exists,
564+
job_spec,
565+
mock_load_yaml_and_json,
566+
sync,
567+
):
568+
import yaml
569+
570+
aiplatform.init(
571+
project=_TEST_PROJECT,
572+
staging_bucket=_TEST_GCS_BUCKET_NAME,
573+
location=_TEST_LOCATION,
574+
credentials=_TEST_CREDENTIALS,
575+
service_account=_TEST_SERVICE_ACCOUNT,
576+
network=_TEST_NETWORK,
577+
)
578+
579+
job = pipeline_jobs.PipelineJob(
580+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
581+
template_path=_TEST_TEMPLATE_PATH,
582+
job_id=_TEST_PIPELINE_JOB_ID,
583+
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
584+
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
585+
enable_caching=True,
586+
)
587+
588+
job.run(
589+
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
590+
sync=sync,
591+
create_request_timeout=None,
592+
)
593+
594+
if not sync:
595+
job.wait()
596+
597+
expected_runtime_config_dict = {
598+
"gcsOutputDirectory": _TEST_GCS_BUCKET_NAME,
599+
"parameterValues": _TEST_PIPELINE_PARAMETER_VALUES,
600+
"inputArtifacts": {"vertex_model": {"artifactId": "456"}},
601+
}
602+
runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb
603+
json_format.ParseDict(expected_runtime_config_dict, runtime_config)
604+
605+
job_spec = yaml.safe_load(job_spec)
606+
pipeline_spec = job_spec.get("pipelineSpec") or job_spec
607+
608+
# Construct expected request
609+
expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob(
610+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
611+
pipeline_spec={
612+
"components": {},
613+
"pipelineInfo": pipeline_spec["pipelineInfo"],
614+
"root": pipeline_spec["root"],
615+
"schemaVersion": "2.1.0",
616+
},
617+
runtime_config=runtime_config,
618+
service_account=_TEST_SERVICE_ACCOUNT,
619+
network=_TEST_NETWORK,
620+
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
621+
)
622+
623+
mock_pipeline_service_create.assert_called_once_with(
624+
parent=_TEST_PARENT,
625+
pipeline_job=expected_gapic_pipeline_job,
626+
pipeline_job_id=_TEST_PIPELINE_JOB_ID,
627+
timeout=None,
628+
)
629+
630+
mock_pipeline_service_get.assert_called_with(
631+
name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY
632+
)
633+
634+
assert job._gca_resource == make_pipeline_job(
635+
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
636+
)
637+
551638
@pytest.mark.parametrize(
552639
"job_spec",
553640
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],

0 commit comments

Comments
 (0)