Skip to content

Commit cb495e7

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: Add Batch Delete and Batch Cancel Pipeline Jobs and unit tests.
PiperOrigin-RevId: 615930540
1 parent c2ba7d7 commit cb495e7

File tree

2 files changed

+250
-0
lines changed

2 files changed

+250
-0
lines changed

google/cloud/aiplatform/pipeline_jobs.py

+108
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import tempfile
2222
import time
2323
from typing import Any, Callable, Dict, List, Optional, Union
24+
from google.api_core import operation
2425

2526
from google.auth import credentials as auth_credentials
2627
from google.cloud import aiplatform
28+
from google.cloud import aiplatform_v1
2729
from google.cloud.aiplatform import base
2830
from google.cloud.aiplatform import initializer
2931
from google.cloud.aiplatform import utils
@@ -44,6 +46,12 @@
4446
pipeline_job as gca_pipeline_job,
4547
pipeline_state as gca_pipeline_state,
4648
)
49+
from google.cloud.aiplatform_v1.types import (
50+
pipeline_service as PipelineServiceV1,
51+
)
52+
from google.cloud.aiplatform_v1.services.pipeline_service import (
53+
PipelineServiceClient as PipelineServiceClientGa,
54+
)
4755

4856
_LOGGER = base.Logger(__name__)
4957

@@ -551,6 +559,106 @@ def wait(self):
551559
else:
552560
super().wait()
553561

562+
def batch_delete(
563+
self,
564+
project: str,
565+
location: str,
566+
names: List[str],
567+
) -> PipelineServiceV1.BatchDeletePipelineJobsResponse:
568+
"""
569+
Example Usage:
570+
pipeline_job = aiplatform.PipelineJob(
571+
display_name='job_display_name',
572+
template_path='your_pipeline.yaml',
573+
)
574+
pipeline_job.batch_delete(
575+
project='your_project_id',
576+
location='your_location',
577+
names=['pipeline_job_name',
578+
'pipeline_job_name2']
579+
)
580+
581+
Args:
582+
project: Required. The project id of the PipelineJobs to batch delete.
583+
location: Required. The location of the PipelineJobs to batch delete.
584+
names: Required. The names of the PipelineJobs to delete. A
585+
maximum of 32 PipelineJobs can be deleted in a batch.
586+
587+
Returns:
588+
BatchDeletePipelineJobsResponse contains PipelineJobs deleted.
589+
"""
590+
user_project = project or initializer.global_config.project
591+
user_location = location or initializer.global_config.location
592+
parent = initializer.global_config.common_location_path(
593+
project=user_project, location=user_location
594+
)
595+
pipeline_jobs_names = [
596+
utils.full_resource_name(
597+
resource_name=name,
598+
resource_noun="pipelineJobs",
599+
parse_resource_name_method=PipelineServiceClientGa.parse_pipeline_job_path,
600+
format_resource_name_method=PipelineServiceClientGa.pipeline_job_path,
601+
project=user_project,
602+
location=user_location,
603+
)
604+
for name in names
605+
]
606+
request = aiplatform_v1.BatchDeletePipelineJobsRequest(
607+
parent=parent, names=pipeline_jobs_names
608+
)
609+
operation = self.api_client.batch_delete_pipeline_jobs(request)
610+
return operation.result()
611+
612+
def batch_cancel(
613+
self,
614+
project: str,
615+
location: str,
616+
names: List[str],
617+
) -> operation.Operation:
618+
"""
619+
Example Usage:
620+
pipeline_job = aiplatform.PipelineJob(
621+
display_name='job_display_name',
622+
template_path='your_pipeline.yaml',
623+
)
624+
pipeline_job.batch_cancel(
625+
project='your_project_id',
626+
location='your_location',
627+
names=['pipeline_job_name',
628+
'pipeline_job_name2']
629+
)
630+
631+
Args:
632+
project: Required. The project id of the PipelineJobs to batch delete.
633+
location: Required. The location of the PipelineJobs to batch delete.
634+
names: Required. The names of the PipelineJobs to cancel. A
635+
maximum of 32 PipelineJobs can be cancelled in a batch.
636+
637+
Returns:
638+
operation (Operation):
639+
An object representing a long-running operation.
640+
"""
641+
user_project = project or initializer.global_config.project
642+
user_location = location or initializer.global_config.location
643+
parent = initializer.global_config.common_location_path(
644+
project=user_project, location=user_location
645+
)
646+
pipeline_jobs_names = [
647+
utils.full_resource_name(
648+
resource_name=name,
649+
resource_noun="pipelineJobs",
650+
parse_resource_name_method=PipelineServiceClientGa.parse_pipeline_job_path,
651+
format_resource_name_method=PipelineServiceClientGa.pipeline_job_path,
652+
project=user_project,
653+
location=user_location,
654+
)
655+
for name in names
656+
]
657+
request = aiplatform_v1.BatchCancelPipelineJobsRequest(
658+
parent=parent, names=pipeline_jobs_names
659+
)
660+
return self.api_client.batch_cancel_pipeline_jobs(request)
661+
554662
@property
555663
def pipeline_spec(self):
556664
return self._gca_resource.pipeline_spec

tests/unit/aiplatform/test_pipeline_jobs.py

+142
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
from google.cloud.aiplatform_v1beta1.types import (
4848
pipeline_service as PipelineServiceV1Beta1,
4949
)
50+
from google.cloud.aiplatform_v1.types import (
51+
pipeline_service as PipelineServiceV1,
52+
)
5053
from google.cloud.aiplatform_v1beta1.services import (
5154
pipeline_service as v1beta1_pipeline_service,
5255
)
@@ -255,6 +258,46 @@ def mock_pipeline_service_create():
255258
yield mock_create_pipeline_job
256259

257260

261+
@pytest.fixture
262+
def mock_pipeline_v1_service_batch_cancel():
263+
with patch.object(
264+
pipeline_service_client.PipelineServiceClient, "batch_cancel_pipeline_jobs"
265+
) as batch_cancel_pipeline_jobs_mock:
266+
batch_cancel_pipeline_jobs_mock.return_value = mock.Mock(ga_operation.Operation)
267+
yield batch_cancel_pipeline_jobs_mock
268+
269+
270+
@pytest.fixture
271+
def mock_pipeline_v1_service_batch_delete():
272+
with mock.patch.object(
273+
pipeline_service_client.PipelineServiceClient, "batch_delete_pipeline_jobs"
274+
) as mock_batch_pipeline_jobs:
275+
mock_batch_pipeline_jobs.return_value = (
276+
make_v1_batch_delete_pipeline_jobs_response()
277+
)
278+
mock_lro = mock.Mock(ga_operation.Operation)
279+
mock_lro.result.return_value = make_v1_batch_delete_pipeline_jobs_response()
280+
mock_batch_pipeline_jobs.return_value = mock_lro
281+
yield mock_batch_pipeline_jobs
282+
283+
284+
def make_v1_batch_delete_pipeline_jobs_response():
285+
response = PipelineServiceV1.BatchDeletePipelineJobsResponse()
286+
response.pipeline_jobs.append(
287+
make_pipeline_job_with_name(
288+
_TEST_PIPELINE_JOB_NAME,
289+
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
290+
)
291+
)
292+
response.pipeline_jobs.append(
293+
make_pipeline_job_with_name(
294+
_TEST_PIPELINE_JOB_NAME_2,
295+
gca_pipeline_state.PipelineState.PIPELINE_STATE_FAILED,
296+
)
297+
)
298+
return response
299+
300+
258301
@pytest.fixture
259302
def mock_pipeline_v1beta1_service_batch_delete():
260303
with mock.patch.object(
@@ -342,6 +385,22 @@ def make_pipeline_job(state):
342385
)
343386

344387

388+
def make_pipeline_job_with_name(name, state):
389+
return gca_pipeline_job.PipelineJob(
390+
name=name,
391+
state=state,
392+
create_time=_TEST_PIPELINE_CREATE_TIME,
393+
service_account=_TEST_SERVICE_ACCOUNT,
394+
network=_TEST_NETWORK,
395+
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
396+
job_detail=gca_pipeline_job.PipelineJobDetail(
397+
pipeline_run_context=gca_context.Context(
398+
name=name,
399+
)
400+
),
401+
)
402+
403+
345404
@pytest.fixture
346405
def mock_pipeline_service_get():
347406
with mock.patch.object(
@@ -2079,3 +2138,86 @@ def test_create_two_and_batch_delete_pipeline_jobs_returns_response(
20792138

20802139
assert mock_pipeline_v1beta1_service_batch_delete.call_count == 1
20812140
assert len(response.pipeline_jobs) == 2
2141+
2142+
@pytest.mark.usefixtures(
2143+
"mock_pipeline_service_get",
2144+
"mock_pipeline_v1_service_batch_delete",
2145+
)
2146+
@pytest.mark.parametrize(
2147+
"job_spec",
2148+
[
2149+
_TEST_PIPELINE_SPEC_JSON,
2150+
_TEST_PIPELINE_SPEC_YAML,
2151+
_TEST_PIPELINE_JOB,
2152+
_TEST_PIPELINE_SPEC_LEGACY_JSON,
2153+
_TEST_PIPELINE_SPEC_LEGACY_YAML,
2154+
_TEST_PIPELINE_JOB_LEGACY,
2155+
],
2156+
)
2157+
def test_create_two_and_batch_delete_v1_pipeline_jobs_returns_response(
2158+
self,
2159+
mock_load_yaml_and_json,
2160+
mock_pipeline_v1_service_batch_delete,
2161+
):
2162+
aiplatform.init(
2163+
project=_TEST_PROJECT,
2164+
staging_bucket=_TEST_GCS_BUCKET_NAME,
2165+
location=_TEST_LOCATION,
2166+
credentials=_TEST_CREDENTIALS,
2167+
)
2168+
2169+
job = pipeline_jobs.PipelineJob(
2170+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
2171+
template_path=_TEST_TEMPLATE_PATH,
2172+
job_id=_TEST_PIPELINE_JOB_ID,
2173+
)
2174+
2175+
response = job.batch_delete(
2176+
project=_TEST_PROJECT,
2177+
location=_TEST_LOCATION,
2178+
names=[_TEST_PIPELINE_JOB_ID, _TEST_PIPELINE_JOB_ID_2],
2179+
)
2180+
2181+
assert mock_pipeline_v1_service_batch_delete.call_count == 1
2182+
assert len(response.pipeline_jobs) == 2
2183+
2184+
@pytest.mark.usefixtures(
2185+
"mock_pipeline_service_get",
2186+
"mock_pipeline_v1_service_batch_cancel",
2187+
)
2188+
@pytest.mark.parametrize(
2189+
"job_spec",
2190+
[
2191+
_TEST_PIPELINE_SPEC_JSON,
2192+
_TEST_PIPELINE_SPEC_YAML,
2193+
_TEST_PIPELINE_JOB,
2194+
_TEST_PIPELINE_SPEC_LEGACY_JSON,
2195+
_TEST_PIPELINE_SPEC_LEGACY_YAML,
2196+
_TEST_PIPELINE_JOB_LEGACY,
2197+
],
2198+
)
2199+
def test_create_two_and_batch_cancel_v1_pipeline_jobs_returns_response(
2200+
self,
2201+
mock_load_yaml_and_json,
2202+
mock_pipeline_v1_service_batch_cancel,
2203+
):
2204+
aiplatform.init(
2205+
project=_TEST_PROJECT,
2206+
staging_bucket=_TEST_GCS_BUCKET_NAME,
2207+
location=_TEST_LOCATION,
2208+
credentials=_TEST_CREDENTIALS,
2209+
)
2210+
2211+
job = pipeline_jobs.PipelineJob(
2212+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
2213+
template_path=_TEST_TEMPLATE_PATH,
2214+
job_id=_TEST_PIPELINE_JOB_ID,
2215+
)
2216+
2217+
job.batch_cancel(
2218+
project=_TEST_PROJECT,
2219+
location=_TEST_LOCATION,
2220+
names=[_TEST_PIPELINE_JOB_ID, _TEST_PIPELINE_JOB_ID_2],
2221+
)
2222+
2223+
assert mock_pipeline_v1_service_batch_cancel.call_count == 1

0 commit comments

Comments
 (0)