Skip to content

Commit ee65917

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: Add max_wait_duration option to custom jobs.
PiperOrigin-RevId: 671592468
1 parent 0f91257 commit ee65917

File tree

7 files changed

+181
-7
lines changed

7 files changed

+181
-7
lines changed

google/cloud/aiplatform/jobs.py

+44-2
Original file line numberDiff line numberDiff line change
@@ -2215,6 +2215,7 @@ def run(
22152215
disable_retries: bool = False,
22162216
persistent_resource_id: Optional[str] = None,
22172217
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
2218+
max_wait_duration: Optional[int] = None,
22182219
) -> None:
22192220
"""Run this configured CustomJob.
22202221
@@ -2285,6 +2286,10 @@ def run(
22852286
PersistentResource, otherwise, the job will be rejected.
22862287
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
22872288
Optional. Indicates the job scheduling strategy.
2289+
max_wait_duration (int):
2290+
This is the maximum duration that a job will wait for the
2291+
requested resources to be provisioned in seconds. If set to 0,
2292+
the job will wait indefinitely. The default is 1 day.
22882293
"""
22892294
network = network or initializer.global_config.network
22902295
service_account = service_account or initializer.global_config.service_account
@@ -2303,6 +2308,7 @@ def run(
23032308
disable_retries=disable_retries,
23042309
persistent_resource_id=persistent_resource_id,
23052310
scheduling_strategy=scheduling_strategy,
2311+
max_wait_duration=max_wait_duration,
23062312
)
23072313

23082314
@base.optional_sync()
@@ -2321,6 +2327,7 @@ def _run(
23212327
disable_retries: bool = False,
23222328
persistent_resource_id: Optional[str] = None,
23232329
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
2330+
max_wait_duration: Optional[int] = None,
23242331
) -> None:
23252332
"""Helper method to ensure network synchronization and to run the configured CustomJob.
23262333
@@ -2389,6 +2396,10 @@ def _run(
23892396
PersistentResource, otherwise, the job will be rejected.
23902397
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
23912398
Optional. Indicates the job scheduling strategy.
2399+
max_wait_duration (int):
2400+
This is the maximum duration that a job will wait for the
2401+
requested resources to be provisioned in seconds. If set to 0,
2402+
the job will wait indefinitely. The default is 1 day.
23922403
"""
23932404
self.submit(
23942405
service_account=service_account,
@@ -2403,6 +2414,7 @@ def _run(
24032414
disable_retries=disable_retries,
24042415
persistent_resource_id=persistent_resource_id,
24052416
scheduling_strategy=scheduling_strategy,
2417+
max_wait_duration=max_wait_duration,
24062418
)
24072419

24082420
self._block_until_complete()
@@ -2422,6 +2434,7 @@ def submit(
24222434
disable_retries: bool = False,
24232435
persistent_resource_id: Optional[str] = None,
24242436
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
2437+
max_wait_duration: Optional[int] = None,
24252438
) -> None:
24262439
"""Submit the configured CustomJob.
24272440
@@ -2487,6 +2500,10 @@ def submit(
24872500
PersistentResource, otherwise, the job will be rejected.
24882501
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
24892502
Optional. Indicates the job scheduling strategy.
2503+
max_wait_duration (int):
2504+
This is the maximum duration that a job will wait for the
2505+
requested resources to be provisioned in seconds. If set to 0,
2506+
the job will wait indefinitely. The default is 1 day.
24902507
24912508
Raises:
24922509
ValueError:
@@ -2514,13 +2531,20 @@ def submit(
25142531
or restart_job_on_worker_restart
25152532
or disable_retries
25162533
or scheduling_strategy
2534+
or max_wait_duration
25172535
):
25182536
timeout = duration_pb2.Duration(seconds=timeout) if timeout else None
2537+
max_wait_duration = (
2538+
duration_pb2.Duration(seconds=max_wait_duration)
2539+
if max_wait_duration
2540+
else None
2541+
)
25192542
self._gca_resource.job_spec.scheduling = gca_custom_job_compat.Scheduling(
25202543
timeout=timeout,
25212544
restart_job_on_worker_restart=restart_job_on_worker_restart,
25222545
disable_retries=disable_retries,
25232546
strategy=scheduling_strategy,
2547+
max_wait_duration=max_wait_duration,
25242548
)
25252549

25262550
if enable_web_access:
@@ -2886,6 +2910,7 @@ def run(
28862910
create_request_timeout: Optional[float] = None,
28872911
disable_retries: bool = False,
28882912
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
2913+
max_wait_duration: Optional[int] = None, # seconds
28892914
) -> None:
28902915
"""Run this configured CustomJob.
28912916
@@ -2936,6 +2961,10 @@ def run(
29362961
`restart_job_on_worker_restart` to False.
29372962
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
29382963
Optional. Indicates the job scheduling strategy.
2964+
max_wait_duration (int):
2965+
This is the maximum duration that a job will wait for the
2966+
requested resources to be provisioned in seconds. If set to 0,
2967+
the job will wait indefinitely. The default is 1 day.
29392968
"""
29402969
network = network or initializer.global_config.network
29412970
service_account = service_account or initializer.global_config.service_account
@@ -2951,6 +2980,7 @@ def run(
29512980
create_request_timeout=create_request_timeout,
29522981
disable_retries=disable_retries,
29532982
scheduling_strategy=scheduling_strategy,
2983+
max_wait_duration=max_wait_duration,
29542984
)
29552985

29562986
@base.optional_sync()
@@ -2966,6 +2996,7 @@ def _run(
29662996
create_request_timeout: Optional[float] = None,
29672997
disable_retries: bool = False,
29682998
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
2999+
max_wait_duration: Optional[int] = None, # seconds
29693000
) -> None:
29703001
"""Helper method to ensure network synchronization and to run the configured CustomJob.
29713002
@@ -3014,6 +3045,10 @@ def _run(
30143045
`restart_job_on_worker_restart` to False.
30153046
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
30163047
Optional. Indicates the job scheduling strategy.
3048+
max_wait_duration (int):
3049+
This is the maximum duration that a job will wait for the
3050+
requested resources to be provisioned in seconds. If set to 0,
3051+
the job will wait indefinitely. The default is 1 day.
30173052
"""
30183053
if service_account:
30193054
self._gca_resource.trial_job_spec.service_account = service_account
@@ -3025,15 +3060,22 @@ def _run(
30253060
timeout
30263061
or restart_job_on_worker_restart
30273062
or disable_retries
3063+
or max_wait_duration
30283064
or scheduling_strategy
30293065
):
3030-
duration = duration_pb2.Duration(seconds=timeout) if timeout else None
3066+
timeout = duration_pb2.Duration(seconds=timeout) if timeout else None
3067+
max_wait_duration = (
3068+
duration_pb2.Duration(seconds=max_wait_duration)
3069+
if max_wait_duration
3070+
else None
3071+
)
30313072
self._gca_resource.trial_job_spec.scheduling = (
30323073
gca_custom_job_compat.Scheduling(
3033-
timeout=duration,
3074+
timeout=timeout,
30343075
restart_job_on_worker_restart=restart_job_on_worker_restart,
30353076
disable_retries=disable_retries,
30363077
strategy=scheduling_strategy,
3078+
max_wait_duration=max_wait_duration,
30373079
)
30383080
)
30393081

google/cloud/aiplatform/preview/jobs.py

+36-4
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ def submit(
266266
tensorboard: Optional[str] = None,
267267
create_request_timeout: Optional[float] = None,
268268
disable_retries: bool = False,
269+
max_wait_duration: Optional[int] = None,
269270
) -> None:
270271
"""Submit the configured CustomJob.
271272
@@ -322,6 +323,10 @@ def submit(
322323
Indicates if the job should retry for internal errors after the
323324
job starts running. If True, overrides
324325
`restart_job_on_worker_restart` to False.
326+
max_wait_duration (int):
327+
This is the maximum duration that a job will wait for the
328+
requested resources to be provisioned in seconds. If set to 0,
329+
the job will wait indefinitely. The default is 30 minutes.
325330
326331
Raises:
327332
ValueError:
@@ -342,12 +347,23 @@ def submit(
342347
if network:
343348
self._gca_resource.job_spec.network = network
344349

345-
if timeout or restart_job_on_worker_restart or disable_retries:
350+
if (
351+
timeout
352+
or restart_job_on_worker_restart
353+
or disable_retries
354+
or max_wait_duration
355+
):
346356
timeout = duration_pb2.Duration(seconds=timeout) if timeout else None
357+
max_wait_duration = (
358+
duration_pb2.Duration(seconds=max_wait_duration)
359+
if max_wait_duration
360+
else None
361+
)
347362
self._gca_resource.job_spec.scheduling = gca_custom_job_compat.Scheduling(
348363
timeout=timeout,
349364
restart_job_on_worker_restart=restart_job_on_worker_restart,
350365
disable_retries=disable_retries,
366+
max_wait_duration=max_wait_duration,
351367
)
352368

353369
if enable_web_access:
@@ -741,6 +757,7 @@ def _run(
741757
sync: bool = True,
742758
create_request_timeout: Optional[float] = None,
743759
disable_retries: bool = False,
760+
max_wait_duration: Optional[int] = None,
744761
) -> None:
745762
"""Helper method to ensure network synchronization and to run the configured CustomJob.
746763
@@ -787,20 +804,35 @@ def _run(
787804
Indicates if the job should retry for internal errors after the
788805
job starts running. If True, overrides
789806
`restart_job_on_worker_restart` to False.
807+
max_wait_duration (int):
808+
This is the maximum duration that a job will wait for the
809+
requested resources to be provisioned in seconds. If set to 0,
810+
the job will wait indefinitely. The default is 30 minutes.
790811
"""
791812
if service_account:
792813
self._gca_resource.trial_job_spec.service_account = service_account
793814

794815
if network:
795816
self._gca_resource.trial_job_spec.network = network
796817

797-
if timeout or restart_job_on_worker_restart or disable_retries:
798-
duration = duration_pb2.Duration(seconds=timeout) if timeout else None
818+
if (
819+
timeout
820+
or restart_job_on_worker_restart
821+
or disable_retries
822+
or max_wait_duration
823+
):
824+
timeout = duration_pb2.Duration(seconds=timeout) if timeout else None
825+
max_wait_duration = (
826+
duration_pb2.Duration(seconds=max_wait_duration)
827+
if max_wait_duration
828+
else None
829+
)
799830
self._gca_resource.trial_job_spec.scheduling = (
800831
gca_custom_job_compat.Scheduling(
801-
timeout=duration,
832+
timeout=timeout,
802833
restart_job_on_worker_restart=restart_job_on_worker_restart,
803834
disable_retries=disable_retries,
835+
max_wait_duration=max_wait_duration,
804836
)
805837
)
806838

0 commit comments

Comments
 (0)