Skip to content

Commit 0c1bb9d

Browse files
authored
Check for final job status for gcp runs (#588)
## Summary https://linear.app/zipline-ai/issue/ZIP-666/update-runpy-to-check-the-status-at-the-end-of-the-wait-and-throw-a ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced cloud job monitoring now verifies the final status of processing jobs, displaying confirmation upon completion and providing clear error alerts when issues occur. - **Refactor** - Streamlined the quickstart execution flow by removing redundant state checks, simplifying the overall process. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 4b7df51 commit 0c1bb9d

File tree

1 file changed

+24
-5
lines changed
  • api/python/ai/chronon/repo

1 file changed

+24
-5
lines changed

api/python/ai/chronon/repo/gcp.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import base64
2+
import json
23
import logging
34
import multiprocessing
45
import os
@@ -315,8 +316,12 @@ def run(self):
315316
# for now only poking for a particular partition is supported.
316317
args = self._args.get("args")
317318
supported_subcommands = ["check-partitions"]
318-
assert "check-partitions" in args, f"Must specify one of the following subcommands: {supported_subcommands}"
319-
assert "--partition-names" in args, "Must specify a list of `--partition-names=schema.table/pk1=pv1/pk2=pv2"
319+
assert (
320+
"check-partitions" in args
321+
), f"Must specify one of the following subcommands: {supported_subcommands}"
322+
assert (
323+
"--partition-names" in args
324+
), "Must specify a list of `--partition-names=schema.table/pk1=pv1/pk2=pv2"
320325

321326
dataproc_args = self.generate_dataproc_submitter_args(
322327
# for now, self.conf is the only local file that requires uploading to gcs
@@ -419,11 +424,11 @@ def run(self):
419424
]
420425
if dataproc_submitter_logs:
421426
log = dataproc_submitter_logs[0]
422-
job_id = log[
427+
job_id = (log[
423428
log.index(dataproc_submitter_id_str)
424429
+ len(dataproc_submitter_id_str)
425-
+ 1:
426-
]
430+
+ 1 :
431+
]).strip()
427432
print(
428433
"""
429434
<-----------------------------------------------------------------------------------
@@ -436,4 +441,18 @@ def run(self):
436441
check_call(
437442
f"gcloud dataproc jobs wait {job_id} --region={GcpRunner.get_gcp_region_id()}"
438443
)
444+
445+
# Fetch the final job state
446+
jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8")
447+
job_info = json.loads(jobs_info_str)
448+
job_state = job_info.get("status", {}).get("state", "")
449+
450+
451+
print("<<<<<<<<<<<<<<<<-----------------JOB STATUS----------------->>>>>>>>>>>>>>>>>")
452+
if job_state != 'DONE':
453+
print(f"Job {job_id} is not in DONE state. Current state: {job_state}")
454+
raise RuntimeError(f"Job {job_id} failed.")
455+
else:
456+
print(f"Job {job_id} is in DONE state.")
439457
return job_id
458+
raise RuntimeError("No job id found from dataproc submitter logs.")

0 commit comments

Comments
 (0)