Skip to content

Commit b0b604e

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: Add batch delete method in preview pipeline job class and unit test.
PiperOrigin-RevId: 599720798
1 parent 066f32d commit b0b604e

File tree

2 files changed

+175
-2
lines changed

2 files changed

+175
-2
lines changed

google/cloud/aiplatform/preview/pipelinejob/pipeline_jobs.py

+70-2
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
# limitations under the License.
1616
#
1717

18-
from typing import Optional
18+
from typing import List, Optional
1919

2020
from google.cloud.aiplatform.pipeline_jobs import (
2121
PipelineJob as PipelineJobGa,
2222
)
23-
from google.cloud.aiplatform import pipeline_job_schedules
23+
from google.cloud.aiplatform_v1.services.pipeline_service import (
24+
PipelineServiceClient as PipelineServiceClientGa,
25+
)
26+
from google.cloud import aiplatform_v1beta1
27+
from google.cloud.aiplatform import compat, pipeline_job_schedules
28+
from google.cloud.aiplatform import initializer
29+
from google.cloud.aiplatform import utils
2430

2531
from google.cloud.aiplatform.metadata import constants as metadata_constants
2632
from google.cloud.aiplatform.metadata import experiment_resources
@@ -112,3 +118,65 @@ def create_schedule(
112118
network=network,
113119
create_request_timeout=create_request_timeout,
114120
)
121+
122+
@classmethod
123+
def batch_delete(
124+
cls,
125+
names: List[str],
126+
project: Optional[str] = None,
127+
location: Optional[str] = None,
128+
) -> aiplatform_v1beta1.BatchDeletePipelineJobsResponse:
129+
"""
130+
Example Usage:
131+
pipeline_job = aiplatform.PipelineJob(
132+
display_name='job_display_name',
133+
template_path='your_pipeline.yaml',
134+
)
135+
pipeline_job.batch_delete(
136+
names=['pipeline_job_name', 'pipeline_job_name2']
137+
)
138+
139+
Args:
140+
names (List[str]):
141+
Required. The fully-qualified resource name or ID of the
142+
Pipeline Jobs to batch delete. Example:
143+
"projects/123/locations/us-central1/pipelineJobs/456"
144+
or "456" when project and location are initialized or passed.
145+
project (str):
146+
Optional. Project containing the Pipeline Jobs to
147+
batch delete. If not set, the project given to `aiplatform.init`
148+
will be used.
149+
location (str):
150+
Optional. Location containing the Pipeline Jobs to
151+
batch delete. If not set, the location given to `aiplatform.init`
152+
will be used.
153+
154+
Returns:
155+
BatchDeletePipelineJobsResponse contains PipelineJobs deleted.
156+
"""
157+
user_project = project or initializer.global_config.project
158+
user_location = location or initializer.global_config.location
159+
parent = initializer.global_config.common_location_path(
160+
project=user_project, location=user_location
161+
)
162+
pipeline_jobs_names = [
163+
utils.full_resource_name(
164+
resource_name=name,
165+
resource_noun="pipelineJobs",
166+
parse_resource_name_method=PipelineServiceClientGa.parse_pipeline_job_path,
167+
format_resource_name_method=PipelineServiceClientGa.pipeline_job_path,
168+
project=user_project,
169+
location=user_location,
170+
)
171+
for name in names
172+
]
173+
request = aiplatform_v1beta1.BatchDeletePipelineJobsRequest(
174+
parent=parent, names=pipeline_jobs_names
175+
)
176+
client = cls._instantiate_client(
177+
location=user_location,
178+
appended_user_agent=["preview-pipeline-jobs-batch-delete"],
179+
)
180+
v1beta1_client = client.select_version(compat.V1BETA1)
181+
operation = v1beta1_client.batch_delete_pipeline_jobs(request)
182+
return operation.result()

tests/unit/aiplatform/test_pipeline_jobs.py

+105
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from urllib import request
2525
from datetime import datetime
2626

27+
from google.api_core import operation as ga_operation
2728
from google.auth import credentials as auth_credentials
2829
from google.cloud import aiplatform
2930
from google.cloud.aiplatform import base
@@ -43,6 +44,20 @@
4344
from google.cloud.aiplatform.compat.services import (
4445
pipeline_service_client,
4546
)
47+
from google.cloud.aiplatform_v1beta1.types import (
48+
pipeline_service as PipelineServiceV1Beta1,
49+
)
50+
from google.cloud.aiplatform_v1beta1.services import (
51+
pipeline_service as v1beta1_pipeline_service,
52+
)
53+
from google.cloud.aiplatform_v1beta1.types import (
54+
pipeline_job as v1beta1_pipeline_job,
55+
pipeline_state as v1beta1_pipeline_state,
56+
context as v1beta1_context,
57+
)
58+
from google.cloud.aiplatform.preview.pipelinejob import (
59+
pipeline_jobs as preview_pipeline_jobs,
60+
)
4661
from google.cloud.aiplatform.compat.types import (
4762
pipeline_job as gca_pipeline_job,
4863
pipeline_state as gca_pipeline_state,
@@ -52,7 +67,9 @@
5267
_TEST_PROJECT = "test-project"
5368
_TEST_LOCATION = "us-central1"
5469
_TEST_PIPELINE_JOB_DISPLAY_NAME = "sample-pipeline-job-display-name"
70+
_TEST_PIPELINE_JOB_DISPLAY_NAME_2 = "sample-pipeline-job-display-name-2"
5571
_TEST_PIPELINE_JOB_ID = "sample-test-pipeline-202111111"
72+
_TEST_PIPELINE_JOB_ID_2 = "sample-test-pipeline-202111112"
5673
_TEST_GCS_BUCKET_NAME = "my-bucket"
5774
_TEST_GCS_OUTPUT_DIRECTORY = f"gs://{_TEST_GCS_BUCKET_NAME}/output_artifacts/"
5875
_TEST_CREDENTIALS = auth_credentials.AnonymousCredentials()
@@ -66,6 +83,7 @@
6683
_TEST_RESERVED_IP_RANGES = ["vertex-ai-ip-range"]
6784

6885
_TEST_PIPELINE_JOB_NAME = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID}"
86+
_TEST_PIPELINE_JOB_NAME_2 = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID_2}"
6987
_TEST_PIPELINE_JOB_LIST_READ_MASK = field_mask.FieldMask(
7088
paths=pipeline_constants._READ_MASK_FIELDS
7189
)
@@ -237,6 +255,52 @@ def mock_pipeline_service_create():
237255
yield mock_create_pipeline_job
238256

239257

258+
@pytest.fixture
259+
def mock_pipeline_v1beta1_service_batch_delete():
260+
with mock.patch.object(
261+
v1beta1_pipeline_service.PipelineServiceClient, "batch_delete_pipeline_jobs"
262+
) as mock_batch_pipeline_jobs:
263+
mock_batch_pipeline_jobs.return_value = (
264+
make_batch_delete_pipeline_jobs_response()
265+
)
266+
mock_lro = mock.Mock(ga_operation.Operation)
267+
mock_lro.result.return_value = make_batch_delete_pipeline_jobs_response()
268+
mock_batch_pipeline_jobs.return_value = mock_lro
269+
yield mock_batch_pipeline_jobs
270+
271+
272+
def make_v1beta1_pipeline_job(name: str, state: v1beta1_pipeline_state.PipelineState):
273+
return v1beta1_pipeline_job.PipelineJob(
274+
name=name,
275+
state=state,
276+
create_time=_TEST_PIPELINE_CREATE_TIME,
277+
service_account=_TEST_SERVICE_ACCOUNT,
278+
network=_TEST_NETWORK,
279+
job_detail=v1beta1_pipeline_job.PipelineJobDetail(
280+
pipeline_run_context=v1beta1_context.Context(
281+
name=name,
282+
)
283+
),
284+
)
285+
286+
287+
def make_batch_delete_pipeline_jobs_response():
288+
response = PipelineServiceV1Beta1.BatchDeletePipelineJobsResponse()
289+
response.pipeline_jobs.append(
290+
make_v1beta1_pipeline_job(
291+
_TEST_PIPELINE_JOB_NAME,
292+
v1beta1_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
293+
)
294+
)
295+
response.pipeline_jobs.append(
296+
make_v1beta1_pipeline_job(
297+
_TEST_PIPELINE_JOB_NAME_2,
298+
v1beta1_pipeline_state.PipelineState.PIPELINE_STATE_FAILED,
299+
)
300+
)
301+
return response
302+
303+
240304
@pytest.fixture
241305
def mock_pipeline_bucket_exists():
242306
def mock_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
@@ -1974,3 +2038,44 @@ def test_get_associated_experiment_from_pipeline_returns_experiment(
19742038
assert associated_experiment.resource_name == _TEST_CONTEXT_NAME
19752039

19762040
assert add_context_children_mock.call_count == 1
2041+
2042+
@pytest.mark.usefixtures(
2043+
"mock_pipeline_service_get",
2044+
"mock_pipeline_v1beta1_service_batch_delete",
2045+
)
2046+
@pytest.mark.parametrize(
2047+
"job_spec",
2048+
[
2049+
_TEST_PIPELINE_SPEC_JSON,
2050+
_TEST_PIPELINE_SPEC_YAML,
2051+
_TEST_PIPELINE_JOB,
2052+
_TEST_PIPELINE_SPEC_LEGACY_JSON,
2053+
_TEST_PIPELINE_SPEC_LEGACY_YAML,
2054+
_TEST_PIPELINE_JOB_LEGACY,
2055+
],
2056+
)
2057+
def test_create_two_and_batch_delete_pipeline_jobs_returns_response(
2058+
self,
2059+
mock_load_yaml_and_json,
2060+
mock_pipeline_v1beta1_service_batch_delete,
2061+
):
2062+
aiplatform.init(
2063+
project=_TEST_PROJECT,
2064+
staging_bucket=_TEST_GCS_BUCKET_NAME,
2065+
credentials=_TEST_CREDENTIALS,
2066+
)
2067+
2068+
job = preview_pipeline_jobs._PipelineJob(
2069+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
2070+
template_path=_TEST_TEMPLATE_PATH,
2071+
job_id=_TEST_PIPELINE_JOB_ID,
2072+
)
2073+
2074+
response = job.batch_delete(
2075+
project=_TEST_PROJECT,
2076+
location=_TEST_LOCATION,
2077+
names=[_TEST_PIPELINE_JOB_ID, _TEST_PIPELINE_JOB_ID_2],
2078+
)
2079+
2080+
assert mock_pipeline_v1beta1_service_batch_delete.call_count == 1
2081+
assert len(response.pipeline_jobs) == 2

0 commit comments

Comments
 (0)