Skip to content

Commit 1fda417

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
chore: Add scheduled pipelines client system test.
fix: Remove Schedule read mask because ListSchedules does not support it. PiperOrigin-RevId: 539819661
1 parent 69c5f60 commit 1fda417

File tree

4 files changed

+100
-98
lines changed

4 files changed

+100
-98
lines changed

google/cloud/aiplatform/preview/constants/schedules.py

-15
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,3 @@
4141

4242
# Pattern for any JSON or YAML file over HTTPS.
4343
_VALID_HTTPS_URL = pipeline_constants._VALID_HTTPS_URL
44-
45-
# Fields to include in returned PipelineJobSchedule when enable_simple_view=True in PipelineJobSchedule.list()
46-
_PIPELINE_JOB_SCHEDULE_READ_MASK_FIELDS = [
47-
"name",
48-
"display_name",
49-
"start_time",
50-
"end_time",
51-
"max_run_count",
52-
"started_run_count",
53-
"state",
54-
"create_time",
55-
"update_time",
56-
"cron",
57-
"catch_up",
58-
]

google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py

-23
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@
5454

5555
_SCHEDULE_ERROR_STATES = schedule_constants._SCHEDULE_ERROR_STATES
5656

57-
_READ_MASK_FIELDS = schedule_constants._PIPELINE_JOB_SCHEDULE_READ_MASK_FIELDS
58-
5957

6058
class PipelineJobSchedule(
6159
_Schedule,
@@ -264,7 +262,6 @@ def list(
264262
cls,
265263
filter: Optional[str] = None,
266264
order_by: Optional[str] = None,
267-
enable_simple_view: bool = True,
268265
project: Optional[str] = None,
269266
location: Optional[str] = None,
270267
credentials: Optional[auth_credentials.Credentials] = None,
@@ -286,16 +283,6 @@ def list(
286283
Optional. A comma-separated list of fields to order by, sorted in
287284
ascending order. Use "desc" after a field name for descending.
288285
Supported fields: `display_name`, `create_time`, `update_time`
289-
enable_simple_view (bool):
290-
Optional. Whether to pass the `read_mask` parameter to the list call.
291-
Defaults to False if not provided. This will improve the performance of calling
292-
list(). However, the returned PipelineJobSchedule list will not include all fields for
293-
each PipelineJobSchedule. Setting this to True will exclude the following fields in your
294-
response: 'create_pipeline_job_request', 'next_run_time', 'last_pause_time',
295-
'last_resume_time', 'max_concurrent_run_count', 'allow_queueing','last_scheduled_run_response'.
296-
The following fields will be included in each PipelineJobSchedule resource in your
297-
response: 'name', 'display_name', 'start_time', 'end_time', 'max_run_count',
298-
'started_run_count', 'state', 'create_time', 'update_time', 'cron', 'catch_up'.
299286
project (str):
300287
Optional. Project to retrieve list from. If not set, project
301288
set in aiplatform.init will be used.
@@ -309,19 +296,9 @@ def list(
309296
Returns:
310297
List[PipelineJobSchedule] - A list of PipelineJobSchedule resource objects.
311298
"""
312-
313-
read_mask_fields = None
314-
315-
if enable_simple_view:
316-
read_mask_fields = field_mask.FieldMask(paths=_READ_MASK_FIELDS)
317-
_LOGGER.warn(
318-
"By enabling simple view, the PipelineJobSchedule resources returned from this method will not contain all fields."
319-
)
320-
321299
return cls._list_with_local_order(
322300
filter=filter,
323301
order_by=order_by,
324-
read_mask=read_mask_fields,
325302
project=project,
326303
location=location,
327304
credentials=credentials,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# Copyright 2023 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from google.cloud import aiplatform
19+
from google.cloud.aiplatform.compat.types import schedule_v1beta1 as gca_schedule
20+
from google.cloud.aiplatform.preview.pipelinejobschedule import pipeline_job_schedules
21+
from tests.system.aiplatform import e2e_base
22+
23+
from kfp import components
24+
from kfp.v2 import compiler
25+
26+
import pytest
27+
from google.protobuf.json_format import MessageToDict
28+
29+
30+
@pytest.mark.usefixtures(
31+
"tear_down_resources", "prepare_staging_bucket", "delete_staging_bucket"
32+
)
33+
class TestPreviewPipelineJobSchedule(e2e_base.TestEndToEnd):
34+
_temp_prefix = "tmpvrtxsdk-e2e-pjs"
35+
36+
def test_create_get_list(self, shared_state):
37+
# Components:
38+
def train(
39+
number_of_epochs: int,
40+
learning_rate: float,
41+
):
42+
print(f"number_of_epochs={number_of_epochs}")
43+
print(f"learning_rate={learning_rate}")
44+
45+
train_op = components.create_component_from_func(train)
46+
47+
# Pipeline:
48+
def training_pipeline(number_of_epochs: int = 2):
49+
train_op(
50+
number_of_epochs=number_of_epochs,
51+
learning_rate="0.1",
52+
)
53+
54+
# Creating the pipeline job schedule.
55+
aiplatform.init(
56+
project=e2e_base._PROJECT,
57+
location=e2e_base._LOCATION,
58+
)
59+
60+
ir_file = "pipeline.json"
61+
compiler.Compiler().compile(
62+
pipeline_func=training_pipeline,
63+
package_path=ir_file,
64+
pipeline_name="training-pipeline",
65+
)
66+
job = aiplatform.PipelineJob(
67+
template_path=ir_file,
68+
display_name="display_name",
69+
)
70+
71+
pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
72+
pipeline_job=job, display_name="pipeline_job_schedule_display_name"
73+
)
74+
75+
pipeline_job_schedule.create(cron_expression="*/2 * * * *", max_run_count=2)
76+
77+
shared_state.setdefault("resources", []).append(pipeline_job_schedule)
78+
79+
pipeline_job_schedule.pause()
80+
assert pipeline_job_schedule.state == gca_schedule.Schedule.State.PAUSED
81+
82+
pipeline_job_schedule.resume()
83+
assert pipeline_job_schedule.state == gca_schedule.Schedule.State.ACTIVE
84+
85+
pipeline_job_schedule.wait()
86+
87+
list_jobs_with_read_mask = pipeline_job_schedule.list_jobs(
88+
enable_simple_view=True
89+
)
90+
list_jobs_without_read_mask = pipeline_job_schedule.list_jobs()
91+
92+
# enable_simple_view=True should apply the `read_mask` filter to limit PipelineJob fields returned
93+
assert "serviceAccount" in MessageToDict(
94+
list_jobs_without_read_mask[0].gca_resource._pb
95+
)
96+
assert "serviceAccount" not in MessageToDict(
97+
list_jobs_with_read_mask[0].gca_resource._pb
98+
)

tests/unit/aiplatform/test_pipeline_job_schedules.py

+2-60
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from google.cloud import aiplatform
2828
from google.cloud.aiplatform import base
2929
from google.cloud.aiplatform import initializer
30+
from google.cloud.aiplatform import pipeline_jobs
3031
from google.cloud.aiplatform.compat.services import (
3132
pipeline_service_client,
3233
schedule_service_client_v1beta1 as schedule_service_client,
@@ -37,21 +38,16 @@
3738
pipeline_state_v1beta1 as gca_pipeline_state,
3839
schedule_v1beta1 as gca_schedule,
3940
)
40-
from google.cloud.aiplatform.preview.constants import (
41-
schedules as schedule_constants,
42-
)
4341
from google.cloud.aiplatform.preview.pipelinejob import (
4442
pipeline_jobs as preview_pipeline_jobs,
4543
)
46-
from google.cloud.aiplatform import pipeline_jobs
4744
from google.cloud.aiplatform.preview.pipelinejobschedule import (
4845
pipeline_job_schedules,
4946
)
5047
from google.cloud.aiplatform.utils import gcs_utils
5148
import pytest
5249
import yaml
5350

54-
from google.protobuf import field_mask_pb2 as field_mask
5551
from google.protobuf import json_format
5652

5753
_TEST_PROJECT = "test-project"
@@ -1129,66 +1125,12 @@ def test_list_schedules(self, mock_schedule_service_list, mock_load_yaml_and_jso
11291125
create_request_timeout=None,
11301126
)
11311127

1132-
pipeline_job_schedule.list(enable_simple_view=False)
1128+
pipeline_job_schedule.list()
11331129

11341130
mock_schedule_service_list.assert_called_once_with(
11351131
request={"parent": _TEST_PARENT}
11361132
)
11371133

1138-
@pytest.mark.usefixtures(
1139-
"mock_schedule_service_create",
1140-
"mock_schedule_service_get",
1141-
"mock_schedule_bucket_exists",
1142-
)
1143-
@pytest.mark.parametrize(
1144-
"job_spec",
1145-
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
1146-
)
1147-
def test_list_schedules_with_read_mask(
1148-
self, mock_schedule_service_list, mock_load_yaml_and_json
1149-
):
1150-
aiplatform.init(
1151-
project=_TEST_PROJECT,
1152-
staging_bucket=_TEST_GCS_BUCKET_NAME,
1153-
location=_TEST_LOCATION,
1154-
credentials=_TEST_CREDENTIALS,
1155-
)
1156-
1157-
job = pipeline_jobs.PipelineJob(
1158-
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
1159-
template_path=_TEST_TEMPLATE_PATH,
1160-
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
1161-
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
1162-
enable_caching=True,
1163-
)
1164-
1165-
pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
1166-
pipeline_job=job,
1167-
display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME,
1168-
)
1169-
1170-
pipeline_job_schedule.create(
1171-
cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION,
1172-
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
1173-
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
1174-
service_account=_TEST_SERVICE_ACCOUNT,
1175-
network=_TEST_NETWORK,
1176-
create_request_timeout=None,
1177-
)
1178-
1179-
pipeline_job_schedule.list(enable_simple_view=True)
1180-
1181-
test_pipeline_job_schedule_list_read_mask = field_mask.FieldMask(
1182-
paths=schedule_constants._PIPELINE_JOB_SCHEDULE_READ_MASK_FIELDS
1183-
)
1184-
1185-
mock_schedule_service_list.assert_called_once_with(
1186-
request={
1187-
"parent": _TEST_PARENT,
1188-
"read_mask": test_pipeline_job_schedule_list_read_mask,
1189-
},
1190-
)
1191-
11921134
@pytest.mark.usefixtures(
11931135
"mock_schedule_service_create",
11941136
"mock_schedule_service_get",

0 commit comments

Comments
 (0)