Skip to content

Commit 4e7d11a

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
fix: Fix bug where scheduled pipeline jobs were not running.
PiperOrigin-RevId: 542024449
1 parent a7d92e5 commit 4e7d11a

File tree

3 files changed

+32
-13
lines changed

3 files changed

+32
-13
lines changed

google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __init__(
104104
"parent": self._parent,
105105
"pipeline_job": {
106106
"runtime_config": runtime_config,
107-
"pipeline_spec": {"fields": pipeline_job.pipeline_spec},
107+
"pipeline_spec": pipeline_job.pipeline_spec,
108108
},
109109
}
110110
pipeline_job_schedule_args = {

tests/system/aiplatform/test_pipeline_job_schedule.py

+16-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
#
1717

1818
from google.cloud import aiplatform
19-
from google.cloud.aiplatform.compat.types import schedule_v1beta1 as gca_schedule
20-
from google.cloud.aiplatform.preview.pipelinejobschedule import pipeline_job_schedules
19+
from google.cloud.aiplatform.compat.types import (
20+
schedule_v1beta1 as gca_schedule,
21+
)
22+
from google.cloud.aiplatform.preview.pipelinejobschedule import (
23+
pipeline_job_schedules,
24+
)
2125
from tests.system.aiplatform import e2e_base
2226

2327
from kfp import components
@@ -61,7 +65,7 @@ def training_pipeline(number_of_epochs: int = 2):
6165
compiler.Compiler().compile(
6266
pipeline_func=training_pipeline,
6367
package_path=ir_file,
64-
pipeline_name="training-pipeline",
68+
pipeline_name="system-test-training-pipeline",
6569
)
6670
job = aiplatform.PipelineJob(
6771
template_path=ir_file,
@@ -72,21 +76,28 @@ def training_pipeline(number_of_epochs: int = 2):
7276
pipeline_job=job, display_name="pipeline_job_schedule_display_name"
7377
)
7478

75-
pipeline_job_schedule.create(cron_expression="*/2 * * * *", max_run_count=2)
79+
max_run_count = 2
80+
pipeline_job_schedule.create(
81+
cron_expression="*/5 * * * *",
82+
max_run_count=max_run_count,
83+
max_concurrent_run_count=2,
84+
)
7685

7786
shared_state.setdefault("resources", []).append(pipeline_job_schedule)
7887

7988
pipeline_job_schedule.pause()
8089
assert pipeline_job_schedule.state == gca_schedule.Schedule.State.PAUSED
8190

82-
pipeline_job_schedule.resume()
91+
pipeline_job_schedule.resume(catch_up=True)
8392
assert pipeline_job_schedule.state == gca_schedule.Schedule.State.ACTIVE
8493

8594
pipeline_job_schedule.wait()
8695

8796
list_jobs_with_read_mask = pipeline_job_schedule.list_jobs(
8897
enable_simple_view=True
8998
)
99+
assert len(list_jobs_with_read_mask) == max_run_count
100+
90101
list_jobs_without_read_mask = pipeline_job_schedule.list_jobs()
91102

92103
# enable_simple_view=True should apply the `read_mask` filter to limit PipelineJob fields returned

tests/unit/aiplatform/test_pipeline_job_schedules.py

+15-7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from datetime import datetime
1919
from importlib import reload
2020
import json
21+
from typing import Any, Dict
2122
from unittest import mock
2223
from unittest.mock import patch
2324
from urllib import request
@@ -48,6 +49,7 @@
4849
import pytest
4950
import yaml
5051

52+
from google.protobuf import struct_pb2
5153
from google.protobuf import json_format
5254

5355
_TEST_PROJECT = "test-project"
@@ -405,6 +407,12 @@ def mock_request_urlopen(job_spec):
405407
yield mock_urlopen
406408

407409

410+
def dict_to_struct(d: Dict[str, Any]) -> struct_pb2.Struct:
411+
s = struct_pb2.Struct()
412+
s.update(d)
413+
return s
414+
415+
408416
@pytest.mark.usefixtures("google_auth_mock")
409417
class TestPipelineJobSchedule:
410418
def setup_method(self):
@@ -481,7 +489,7 @@ def test_call_schedule_service_create(
481489
"parent": _TEST_PARENT,
482490
"pipeline_job": {
483491
"runtime_config": runtime_config,
484-
"pipeline_spec": {"fields": pipeline_spec},
492+
"pipeline_spec": dict_to_struct(pipeline_spec),
485493
"service_account": _TEST_SERVICE_ACCOUNT,
486494
"network": _TEST_NETWORK,
487495
},
@@ -565,7 +573,7 @@ def test_call_schedule_service_create_with_different_timezone(
565573
"parent": _TEST_PARENT,
566574
"pipeline_job": {
567575
"runtime_config": runtime_config,
568-
"pipeline_spec": {"fields": pipeline_spec},
576+
"pipeline_spec": dict_to_struct(pipeline_spec),
569577
"service_account": _TEST_SERVICE_ACCOUNT,
570578
"network": _TEST_NETWORK,
571579
},
@@ -647,7 +655,7 @@ def test_call_schedule_service_create_artifact_registry(
647655
"parent": _TEST_PARENT,
648656
"pipeline_job": {
649657
"runtime_config": runtime_config,
650-
"pipeline_spec": {"fields": pipeline_spec},
658+
"pipeline_spec": dict_to_struct(pipeline_spec),
651659
"service_account": _TEST_SERVICE_ACCOUNT,
652660
"network": _TEST_NETWORK,
653661
},
@@ -729,7 +737,7 @@ def test_call_schedule_service_create_https(
729737
"parent": _TEST_PARENT,
730738
"pipeline_job": {
731739
"runtime_config": runtime_config,
732-
"pipeline_spec": {"fields": pipeline_spec},
740+
"pipeline_spec": dict_to_struct(pipeline_spec),
733741
"service_account": _TEST_SERVICE_ACCOUNT,
734742
"network": _TEST_NETWORK,
735743
},
@@ -810,7 +818,7 @@ def test_call_schedule_service_create_with_timeout(
810818
"parent": _TEST_PARENT,
811819
"pipeline_job": {
812820
"runtime_config": runtime_config,
813-
"pipeline_spec": {"fields": pipeline_spec},
821+
"pipeline_spec": dict_to_struct(pipeline_spec),
814822
"service_account": _TEST_SERVICE_ACCOUNT,
815823
"network": _TEST_NETWORK,
816824
},
@@ -890,7 +898,7 @@ def test_call_schedule_service_create_with_timeout_not_explicitly_set(
890898
"parent": _TEST_PARENT,
891899
"pipeline_job": {
892900
"runtime_config": runtime_config,
893-
"pipeline_spec": {"fields": pipeline_spec},
901+
"pipeline_spec": dict_to_struct(pipeline_spec),
894902
"service_account": _TEST_SERVICE_ACCOUNT,
895903
"network": _TEST_NETWORK,
896904
},
@@ -958,7 +966,7 @@ def test_call_pipeline_job_create_schedule(
958966
"parent": _TEST_PARENT,
959967
"pipeline_job": {
960968
"runtime_config": runtime_config,
961-
"pipeline_spec": {"fields": pipeline_spec},
969+
"pipeline_spec": dict_to_struct(pipeline_spec),
962970
"service_account": _TEST_SERVICE_ACCOUNT,
963971
"network": _TEST_NETWORK,
964972
},

0 commit comments

Comments
 (0)