-
Notifications
You must be signed in to change notification settings - Fork 0
Check for final job status for gcp runs #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request enhances the Changes
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code Definitions (1)api/python/ai/chronon/repo/gcp.py (1)
⏰ Context from checks skipped due to timeout of 90000ms (2)
🔇 Additional comments (3)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
api/python/ai/chronon/repo/gcp.py (1)
444-461
: Consider enhancing error handling for job failures.The job state checking logic is good, but could benefit from more detailed error information.
if job_state != dataproc_v1.JobStatus.State.DONE: print(f"Job {job_id} is not in DONE state. Current state: {job_state.name}") - raise ValueError(f"Job {job_id} failed.") + error_details = job_info.status.details if job_info.status.details else "No details available" + raise ValueError(f"Job {job_id} failed. State: {job_state.name}. Details: {error_details}") else: print(f"Job {job_id} is in DONE state.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
api/python/ai/chronon/repo/gcp.py
(4 hunks)api/python/requirements/base.in
(1 hunks)api/python/requirements/base.txt
(3 hunks)api/python/requirements/dev.txt
(3 hunks)
🧰 Additional context used
🪛 LanguageTool
api/python/requirements/base.txt
[duplication] ~58-~58: Possible typo: you repeated a word.
Context: ...ia # google-cloud-storage # google-resumable-media google-resumable-media==2.7.2 # via google-cloud-storage g...
(ENGLISH_WORD_REPEAT_RULE)
[duplication] ~73-~73: Possible typo: you repeated a word.
Context: ...otos # grpc-google-iam-v1 # grpcio-status grpcio-status==1.71.0 # via google-api-core idna=...
(ENGLISH_WORD_REPEAT_RULE)
api/python/requirements/dev.txt
[duplication] ~63-~63: Possible typo: you repeated a word.
Context: ...0 # via tox pytest==8.3.5 # via pytest-cov pytest-cov==6.1.0 # via -r requirements/dev.in...
(ENGLISH_WORD_REPEAT_RULE)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
api/python/requirements/base.in (1)
11-12
: Dependency addition looks good.Added required dependency for the Dataproc functionality.
api/python/ai/chronon/repo/gcp.py (4)
8-8
: Import addition is correct.Added Dataproc import to support new functionality.
318-323
: Assertion formatting looks good.Reformatted assertions for better readability.
426-430
: String slicing adjustment looks good.String formatting changes for job ID extraction.
462-462
: Error handling for missing job ID is appropriate.Good addition to catch cases where no job ID is found.
api/python/requirements/dev.txt (1)
11-73
: Version updates for development dependencies look good.Updates to the latest versions improve security and add new features.
🧰 Tools
🪛 LanguageTool
[duplication] ~63-~63: Possible typo: you repeated a word.
Context: ...0 # via tox pytest==8.3.5 # via pytest-cov pytest-cov==6.1.0 # via -r requirements/dev.in...(ENGLISH_WORD_REPEAT_RULE)
api/python/requirements/base.txt (1)
14-137
: Dependency updates are appropriate.Added required Dataproc dependencies and updated versions of existing packages.
🧰 Tools
🪛 LanguageTool
[duplication] ~31-~31: Possible typo: you repeated a word.
Context: ...irements/base.in face==24.0.0 # via glom glom==24.11.0 # via -r requirements/base...(ENGLISH_WORD_REPEAT_RULE)
[duplication] ~58-~58: Possible typo: you repeated a word.
Context: ...ia # google-cloud-storage # google-resumable-media google-resumable-media==2.7.2 # via google-cloud-storage g...(ENGLISH_WORD_REPEAT_RULE)
[duplication] ~73-~73: Possible typo: you repeated a word.
Context: ...otos # grpc-google-iam-v1 # grpcio-status grpcio-status==1.71.0 # via google-api-core idna=...(ENGLISH_WORD_REPEAT_RULE)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
api/python/ai/chronon/repo/gcp.py (1)
318-324
: Use explicit error instead ofassert
.
Usingassert
in production can be disabled with optimizations. Consider raising a specific exception for clarity.-assert ("check-partitions" in args), f"Must specify one of the following subcommands: {supported_subcommands}" +if "check-partitions" not in args: + raise ValueError(f"Must specify one of the following subcommands: {supported_subcommands}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
api/python/ai/chronon/repo/gcp.py
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: python_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
api/python/ai/chronon/repo/gcp.py (3)
8-8
: Looks good.
The import ensures we can manage Dataproc jobs. No issues here.
426-430
: Parsing logic is fine.
Ensure the log format remains stable so this index slicing works reliably.
444-463
: Verify final job success.
Currently, we only check if the job is inDONE
state. Consider verifying if the job actually succeeded (check error details).
@@ -436,4 +440,24 @@ def run(self): | |||
check_call( | |||
f"gcloud dataproc jobs wait {job_id} --region={GcpRunner.get_gcp_region_id()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought about removing this and using the python sdk to poll but I decided against it because this wait also shows logs too. i couldn't find a python api in dataproc that shows logs and waits like this
api/python/ai/chronon/repo/gcp.py
Outdated
@@ -436,4 +440,24 @@ def run(self): | |||
check_call( | |||
f"gcloud dataproc jobs wait {job_id} --region={GcpRunner.get_gcp_region_id()}" | |||
) | |||
|
|||
# Fetch the final job state | |||
dataproc_client = dataproc_v1.JobControllerClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh, would prefer if we just stick to one client, even if we have to go with the gcloud cli. it's simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
api/python/ai/chronon/repo/gcp.py (1)
9-9
: Remove unused import.The
dataproc_v1
import is not used in the code.-from google.cloud import dataproc_v1, storage +from google.cloud import storage🧰 Tools
🪛 Ruff (0.8.2)
9-9:
google.cloud.dataproc_v1
imported but unusedRemove unused import:
google.cloud.dataproc_v1
(F401)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
api/python/ai/chronon/repo/gcp.py
(4 hunks)scripts/distribution/run_gcp_quickstart.sh
(0 hunks)
💤 Files with no reviewable changes (1)
- scripts/distribution/run_gcp_quickstart.sh
🧰 Additional context used
🧬 Code Definitions (1)
api/python/ai/chronon/repo/gcp.py (1)
api/python/ai/chronon/repo/utils.py (1)
check_output
(68-70)
🪛 Ruff (0.8.2)
api/python/ai/chronon/repo/gcp.py
9-9: google.cloud.dataproc_v1
imported but unused
Remove unused import: google.cloud.dataproc_v1
(F401)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
🔇 Additional comments (1)
api/python/ai/chronon/repo/gcp.py (1)
445-458
: LGTM on verification of job state.Adding job state verification is good - it ensures jobs completed successfully.
# Fetch the final job state | ||
jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8") | ||
job_info = json.loads(jobs_info_str) | ||
job_state = job_info.get("status", {}).get("state", "") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use consistent region parameter.
Hardcoded us-central1
region inconsistent with GcpRunner.get_gcp_region_id()
used elsewhere.
-jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8")
+jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region={GcpRunner.get_gcp_region_id()} --format=json").decode("utf-8")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Fetch the final job state | |
jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8") | |
job_info = json.loads(jobs_info_str) | |
job_state = job_info.get("status", {}).get("state", "") | |
# Fetch the final job state | |
- jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8") | |
+ jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region={GcpRunner.get_gcp_region_id()} --format=json").decode("utf-8") | |
job_info = json.loads(jobs_info_str) | |
job_state = job_info.get("status", {}).get("state", "") |
# Fetch the final job state | ||
jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8") | ||
job_info = json.loads(jobs_info_str) | ||
job_state = job_info.get("status", {}).get("state", "") | ||
|
||
|
||
print("<<<<<<<<<<<<<<<<-----------------JOB STATUS----------------->>>>>>>>>>>>>>>>>") | ||
if job_state != 'DONE': | ||
print(f"Job {job_id} is not in DONE state. Current state: {job_state}") | ||
raise RuntimeError(f"Job {job_id} failed.") | ||
else: | ||
print(f"Job {job_id} is in DONE state.") | ||
return job_id | ||
raise RuntimeError("No job id found from dataproc submitter logs.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for job info command.
The check_output
call lacks error handling if command fails.
- # Fetch the final job state
- jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8")
- job_info = json.loads(jobs_info_str)
- job_state = job_info.get("status", {}).get("state", "")
+ # Fetch the final job state
+ try:
+ jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region={GcpRunner.get_gcp_region_id()} --format=json").decode("utf-8")
+ job_info = json.loads(jobs_info_str)
+ job_state = job_info.get("status", {}).get("state", "")
+ except Exception as e:
+ raise RuntimeError(f"Failed to get job status for job {job_id}: {str(e)}") from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Fetch the final job state | |
jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region=us-central1 --format=json").decode("utf-8") | |
job_info = json.loads(jobs_info_str) | |
job_state = job_info.get("status", {}).get("state", "") | |
print("<<<<<<<<<<<<<<<<-----------------JOB STATUS----------------->>>>>>>>>>>>>>>>>") | |
if job_state != 'DONE': | |
print(f"Job {job_id} is not in DONE state. Current state: {job_state}") | |
raise RuntimeError(f"Job {job_id} failed.") | |
else: | |
print(f"Job {job_id} is in DONE state.") | |
return job_id | |
raise RuntimeError("No job id found from dataproc submitter logs.") | |
# Fetch the final job state | |
try: | |
jobs_info_str = check_output(f"gcloud dataproc jobs describe {job_id} --region={GcpRunner.get_gcp_region_id()} --format=json").decode("utf-8") | |
job_info = json.loads(jobs_info_str) | |
job_state = job_info.get("status", {}).get("state", "") | |
except Exception as e: | |
raise RuntimeError(f"Failed to get job status for job {job_id}: {str(e)}") from e | |
print("<<<<<<<<<<<<<<<<-----------------JOB STATUS----------------->>>>>>>>>>>>>>>>>") | |
if job_state != 'DONE': | |
print(f"Job {job_id} is not in DONE state. Current state: {job_state}") | |
raise RuntimeError(f"Job {job_id} failed.") | |
else: | |
print(f"Job {job_id} is in DONE state.") | |
return job_id | |
raise RuntimeError("No job id found from dataproc submitter logs.") |
if job_state != 'DONE': | ||
print(f"Job {job_id} is not in DONE state. Current state: {job_state}") | ||
raise RuntimeError(f"Job {job_id} failed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider other non-terminal states.
Only checking for 'DONE' state might miss 'CANCELLED' or 'ERROR' states that are also terminal.
- if job_state != 'DONE':
- print(f"Job {job_id} is not in DONE state. Current state: {job_state}")
- raise RuntimeError(f"Job {job_id} failed.")
+ if job_state == 'DONE':
+ print(f"Job {job_id} is in DONE state.")
+ else:
+ print(f"Job {job_id} is not in DONE state. Current state: {job_state}")
+ raise RuntimeError(f"Job {job_id} failed with state: {job_state}")
Committable suggestion skipped: line range outside the PR's diff.
## Summary https://linear.app/zipline-ai/issue/ZIP-666/update-runpy-to-check-the-status-at-the-end-of-the-wait-and-throw-a ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced cloud job monitoring now verifies the final status of processing jobs, displaying confirmation upon completion and providing clear error alerts when issues occur. - **Refactor** - Streamlined the quickstart execution flow by removing redundant state checks, simplifying the overall process. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary https://linear.app/zipline-ai/issue/ZIP-666/update-runpy-to-check-the-status-at-the-end-of-the-wait-and-throw-a ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced cloud job monitoring now verifies the final status of processing jobs, displaying confirmation upon completion and providing clear error alerts when issues occur. - **Refactor** - Streamlined the quickstart execution flow by removing redundant state checks, simplifying the overall process. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary https://linear.app/zipline-ai/issue/ZIP-666/update-runpy-to-check-the-status-at-the-end-of-the-wait-and-throw-a ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced cloud job monitoring now verifies the final status of processing jobs, displaying confirmation upon completion and providing clear error alerts when issues occur. - **Refactor** - Streamlined the quickstart execution flow by removing redundant state checks, simplifying the overall process. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary https://linear.app/zipline-ai/issue/ZIP-666/update-runpy-to-check-the-status-at-the-end-of-the-wait-and-throw-a ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced cloud job monitoring now verifies the final status of processing jobs, displaying confirmation upon completion and providing clear error alerts when issues occur. - **Refactor** - Streamlined the quickstart execution flow by removing redundant state checks, simplifying the overall process. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary https://linear.app/zipline-ai/issue/ZIP-666/update-runpy-to-cheour clients-the-status-at-the-end-of-the-wait-and-throw-a ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced cloud job monitoring now verifies the final status of processing jobs, displaying confirmation upon completion and providing clear error alerts when issues occur. - **Refactor** - Streamlined the quiour clientsstart execution flow by removing redundant state cheour clientss, simplifying the overall process. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
https://linear.app/zipline-ai/issue/ZIP-666/update-runpy-to-check-the-status-at-the-end-of-the-wait-and-throw-a
Checklist
Summary by CodeRabbit
New Features
Refactor