-
Notifications
You must be signed in to change notification settings - Fork 0
Connect GroupByUploadToKVBulkLoad from Driver.scala to run.py #221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces enhancements to Chronon's infrastructure, focusing on Google Cloud Platform (GCP) integration, job submission, and configuration management. The changes span multiple files, including Changes
Possibly related PRs
Suggested Reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
api/py/ai/chronon/repo/run.py (2)
684-685
: Consider using os.path for better path handlingReplace with
os.path.basename
for more robust path handling.-def extract_filename_from_path(path): - return path.split("/")[-1] +def extract_filename_from_path(path): + return os.path.basename(path)
639-645
: Enhance error handling for GCS uploadsConsider adding error handling for the case when teams.json doesn't exist.
+if not os.path.exists(get_teams_json_file_path(self.repo)): + raise ValueError(f"teams.json not found at {get_teams_json_file_path(self.repo)}") local_files_to_upload_to_gcs.append(get_teams_json_file_path(self.repo))spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
784-787
: Improve error message clarityThe error message could be more descriptive.
- throw new Exception("Must supply both srcOfflineTable and groupbyName if confPath is not supplied") + throw new Exception("Either provide confPath OR both srcOfflineTable and groupbyName parameters")distribution/build_and_upload_gcp_artifacts.sh (1)
3-3
: Make error handling consistentConsider using the same strict error handling throughout the script.
-set -e +set -euxo pipefail
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (8)
api/py/ai/chronon/repo/run.py
(20 hunks)api/py/test/sample/teams.json
(1 hunks)chronon_dataproc_submitter.env
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
(1 hunks)distribution/build_and_upload_gcp_artifacts.sh
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
(1 hunks)
🔇 Additional comments (9)
chronon_dataproc_submitter.env (1)
5-5
: Verify the instance ID value for different environments.The instance ID appears to be hardcoded for the canary environment. Ensure this value is appropriately configured for other environments.
✅ Verification successful
Instance ID configuration is correctly implemented
The value "zipline-canary-instance" is consistently used in both the configuration and test data, indicating this is the intended instance ID for the environment.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check for environment-specific instance IDs in configuration files rg -g '*.{json,yaml,env}' 'ZIPLINE_GCP_INSTANCE_ID'Length of output: 219
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
26-28
: LGTM! Environment variable keys updated consistently.Environment variable keys now include the ZIPLINE_ prefix, maintaining consistency with other configurations.
Also applies to: 31-33
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
157-164
: LGTM! Improved GCS files handling.Better error handling with assertion and empty case handling for GCS files arguments.
spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala (1)
260-261
: LGTM! Using tableUtils as recommended.Changed to tableUtils.loadTable to ensure proper BigQuery configuration.
api/py/ai/chronon/repo/run.py (3)
50-50
: LGTM: New mode integrationThe "upload-to-kv" mode is correctly added with appropriate ONLINE_WRITE_ARGS.
Also applies to: 74-74
94-94
: LGTM: Route mappingThe "upload-to-kv" mode is correctly mapped to "groupby-upload-bulk-load".
130-133
: LGTM: GCP constantsThe GCP-specific constants are properly defined for jar and class defaults.
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
518-522
: LGTM: Debug optionThe jsonPercent option is well-documented with a sensible default value.
api/py/test/sample/teams.json (1)
8-12
: Verify TODO placeholdersReplace TODO placeholders with actual values before deployment.
✅ Verification successful
Configuration placeholders need replacement
The following placeholders must be replaced with actual values:
[TODO]/path/to/spark-submit
in SPARK_SUBMIT_PATH[STREAMING-TODO]/path/to/folder/containing
in HADOOP_DIR[ONLINE-TODO]your.online.class
in CHRONON_ONLINE_CLASS[ONLINE-TODO]
host and port values in CHRONON_ONLINE_ARGS🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check for remaining TODOs in teams.json rg "\[.*TODO.*\]" teams.jsonLength of output: 81
Script:
#!/bin/bash # Find TODOs in teams.json rg "\[.*TODO.*\]" api/py/test/sample/teams.jsonLength of output: 421
api/py/ai/chronon/repo/run.py
Outdated
online_jar=self.online_jar, | ||
online_class=self.online_class, | ||
online_jar=self.online_jar if not self.dataproc else ZIPLINE_ONLINE_JAR_DEFAULT, | ||
online_class=self.online_class if not self.dataproc else ZIPLINE_ONLINE_CLASS_DEFAULT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc why do we need to modify how this works? Could we just set a default for self.online_class
and similar for the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true that. changing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious also about why we need these defaults - we might want to support multiple KV potentially in the same cloud provider so ensuring it has to be injected and not having defaults point to the wrong thing is nice..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we ok with having this exposed and set in teams.json
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
api/py/ai/chronon/repo/run.py (4)
130-133
: Consider making jar version configurable.The hardcoded version "0.1.0-SNAPSHOT" in the default jar name could make version updates difficult.
Consider moving the version to a configuration file or environment variable:
-ZIPLINE_ONLINE_JAR_DEFAULT = "cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" +ZIPLINE_ONLINE_JAR_DEFAULT = f"cloud_gcp-assembly-{os.getenv('ZIPLINE_JAR_VERSION', '0.1.0-SNAPSHOT')}.jar"
678-679
: Use string formatting instead of concatenation.String concatenation with + is less readable and less efficient.
- final_args = base_args + " " + \ - str(self.args) + override_start_partition_arg + final_args = f"{base_args} {str(self.args)} {override_start_partition_arg}"
684-685
: Use os.path.basename instead of manual path splitting.Python's os.path module provides a more robust way to extract filenames.
-def extract_filename_from_path(path): - return path.split("/")[-1] +def extract_filename_from_path(path): + return os.path.basename(path)
854-858
: Improve help text for CLI options.The help text should mention that these are default values from constants.
- help="Jar containing Online KvStore & Deserializer Impl. " + help=f"Jar containing Online KvStore & Deserializer Impl (default: {ZIPLINE_ONLINE_JAR_DEFAULT}). " - help="Class name of Online Impl. Used for streaming and metadata-upload mode.") + help=f"Class name of Online Impl (default: {ZIPLINE_ONLINE_CLASS_DEFAULT}). Used for streaming and metadata-upload mode.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
api/py/ai/chronon/repo/run.py
(21 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: no_spark_scala_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (1)
api/py/ai/chronon/repo/run.py (1)
50-50
: LGTM: New mode and route additions are consistent.The addition of "upload-to-kv" mode and its corresponding route "groupby-upload-bulk-load" aligns with the PR objectives.
Also applies to: 74-74, 94-94
api/py/ai/chronon/repo/run.py
Outdated
online_jar=self.online_jar, | ||
online_class=self.online_class, | ||
online_jar=self.online_jar if not self.dataproc else ZIPLINE_ONLINE_JAR_DEFAULT, | ||
online_class=self.online_class if not self.dataproc else ZIPLINE_ONLINE_CLASS_DEFAULT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious also about why we need these defaults - we might want to support multiple KV potentially in the same cloud provider so ensuring it has to be injected and not having defaults point to the wrong thing is nice..
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
.tool-versions (2)
Line range hint
1-10
: Fix inconsistent indentation.-java - corretto-11.0.25.9.1 - corretto-17.0.9.8.1 +java + corretto-11.0.25.9.1 + corretto-17.0.9.8.1 scala 2.12.18 asdf-plugin-manager 1.4.0 sbt 1.8.2 python - 3.7.17 - 3.11.0 + 3.7.17 + 3.11.0 gcloud 504.0.1
1-3
: Document Java version requirements.Add a comment explaining why both Java versions are needed.
+# Java 11 for Dataproc compatibility, Java 17 for local development java corretto-11.0.25.9.1 corretto-17.0.9.8.1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
.tool-versions
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: no_spark_scala_tests
- GitHub Check: mutation_spark_tests
🔇 Additional comments (1)
.tool-versions (1)
1-3
: Verify Java version requirements for Dataproc.✅ Verification successful
Both Java versions are required for different components.
The dual Java version setup is intentional:
- Java 11 is required for GCP Dataproc operations
- Java 17 is used for other components
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check Java version requirements in Dataproc-related code rg -l 'dataproc' | xargs rg -i 'java.*version|jdk|jre'Length of output: 218
Script:
#!/bin/bash # Find Dataproc configs rg -l 'dataproc|cluster' | xargs rg -i 'runtime|version|java|jdk' # Check build files for Java version fd 'build.sbt|pom.xml|build.gradle' --exec rg -i 'java|jdk|scala|version'Length of output: 20281
48a106e
to
3862baa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
api/py/ai/chronon/repo/run.py (4)
132-133
: Consider moving hardcoded defaults to environment variables.Move
ZIPLINE_ONLINE_JAR_DEFAULT
andZIPLINE_ONLINE_CLASS_DEFAULT
values to environment variables for better configurability.
684-685
: Use os.path.basename instead of manual path splitting.Replace custom path splitting with the standard library function.
-def extract_filename_from_path(path): - return path.split("/")[-1] +def extract_filename_from_path(path): + return os.path.basename(path)
800-801
: Fix inconsistent indentation in string formatting.Align the indentation with the opening parenthesis.
- download_gcs_blob(bucket_name, source_blob_name, - dataproc_jar_destination_path) + download_gcs_blob(bucket_name, source_blob_name, + dataproc_jar_destination_path)
757-758
: Use double quotes consistently for string literals.Match the quote style used in the rest of the file.
- raise ValueError( - 'Please set ZIPLINE_GCP_PROJECT_ID environment variable') + raise ValueError( + "Please set ZIPLINE_GCP_PROJECT_ID environment variable")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
.tool-versions
(1 hunks)api/py/ai/chronon/repo/run.py
(21 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- .tool-versions
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: no_spark_scala_tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
586-589
: Track the TODO for removing additionalConfPath.Create a ticket to track the removal of this configuration path after migration.
Would you like me to create a GitHub issue to track this technical debt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
api/py/ai/chronon/repo/run.py
(21 hunks)api/py/test/sample/teams.json
(1 hunks)chronon_dataproc_submitter.env
(0 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)
💤 Files with no reviewable changes (1)
- chronon_dataproc_submitter.env
🚧 Files skipped from review as they are similar to previous changes (2)
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
- api/py/ai/chronon/repo/run.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: fetcher_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
🔇 Additional comments (5)
spark/src/main/scala/ai/chronon/spark/Driver.scala (4)
516-522
: LGTM: New jsonPercent option.The option is well-documented with a sensible default value.
526-529
: LGTM: jsonPercent parameter passed correctly.
598-613
: LGTM: Robust teams.json loading.The implementation safely handles:
- File existence check
- JSON parsing
- Fallback to empty map
768-799
: LGTM: Simplified configuration and improved error handling.Good improvements:
- Removed redundant options
- Added comprehensive logging
- Added proper error handling
api/py/test/sample/teams.json (1)
58-69
: Review namespace configuration.Multiple teams sharing the 'default' namespace could lead to resource conflicts.
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit", | ||
"JOB_MODE": "local[*]", | ||
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", | ||
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", | ||
"CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host=<YOUR_HOST> -Zkv-port=<YOUR_PORT>", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove TODO placeholders before production deployment.
These placeholder values could cause runtime issues if not properly configured.
"sample_team": { | ||
"description": "Team description", | ||
"namespace": "chronon_db", | ||
"user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace TODO with actual LDAP username.
Placeholder LDAP username needs to be configured before deployment.
api/py/test/sample/teams.json
Outdated
"CUSTOMER_ID": "canary", | ||
"GCP_PROJECT_ID": "canary-443022", | ||
"GCP_REGION": "us-central1", | ||
"GCP_DATAPROC_CLUSTER_NAME": "canary-2", | ||
"GCP_INSTANCE_ID": "zipline-canary-instance" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move sensitive GCP configuration to environment variables.
Hardcoded GCP configuration should be externalized for security and flexibility.
- "CUSTOMER_ID": "canary",
- "GCP_PROJECT_ID": "canary-443022",
- "GCP_REGION": "us-central1",
- "GCP_DATAPROC_CLUSTER_NAME": "canary-2",
- "GCP_INSTANCE_ID": "zipline-canary-instance"
+ "CUSTOMER_ID": "${CUSTOMER_ID}",
+ "GCP_PROJECT_ID": "${GCP_PROJECT_ID}",
+ "GCP_REGION": "${GCP_REGION}",
+ "GCP_DATAPROC_CLUSTER_NAME": "${GCP_DATAPROC_CLUSTER_NAME}",
+ "GCP_INSTANCE_ID": "${GCP_INSTANCE_ID}"
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"CUSTOMER_ID": "canary", | |
"GCP_PROJECT_ID": "canary-443022", | |
"GCP_REGION": "us-central1", | |
"GCP_DATAPROC_CLUSTER_NAME": "canary-2", | |
"GCP_INSTANCE_ID": "zipline-canary-instance" | |
"CUSTOMER_ID": "${CUSTOMER_ID}", | |
"GCP_PROJECT_ID": "${GCP_PROJECT_ID}", | |
"GCP_REGION": "${GCP_REGION}", | |
"GCP_DATAPROC_CLUSTER_NAME": "${GCP_DATAPROC_CLUSTER_NAME}", | |
"GCP_INSTANCE_ID": "${GCP_INSTANCE_ID}" |
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit", | ||
"JOB_MODE": "local[*]", | ||
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", | ||
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to ZIPLINE_
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i can change. but CHRONON_ONLINE_CLASS
is set throughout run.py like here https://github.com/zipline-ai/chronon/blob/main/api/py/ai/chronon/repo/run.py#L684
still want to change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah got it okay maybe a followup.
"CUSTOMER_ID": "canary", | ||
"GCP_PROJECT_ID": "canary-443022", | ||
"GCP_REGION": "us-central1", | ||
"GCP_DATAPROC_CLUSTER_NAME": "canary-2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no changes needed for the PR but I think at some point we'll want to hide some of these away from the customer (just making a note)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i'm going to clean up the teams.json in the etsy directory https://github.com/etsy/zipline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some minor things
1fd2e04
to
2a91430
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
157-164
: Add error handling for malformed --gcs_files argument.Current split operations could throw ArrayIndexOutOfBoundsException if argument format is incorrect.
- gcsFilesArgs(0).split("=")(1).split(",") + try { + val parts = gcsFilesArgs(0).split("=") + if (parts.length != 2) throw new IllegalArgumentException("Invalid format") + parts(1).split(",") + } catch { + case e: Exception => + throw new IllegalArgumentException(s"Invalid --gcs_files format: ${gcsFilesArgs(0)}", e) + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (9)
.tool-versions
(1 hunks)api/py/ai/chronon/repo/run.py
(22 hunks)api/py/test/sample/teams.json
(1 hunks)chronon_dataproc_submitter.env
(0 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
(1 hunks)distribution/build_and_upload_gcp_artifacts.sh
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(5 hunks)spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
(1 hunks)
💤 Files with no reviewable changes (1)
- chronon_dataproc_submitter.env
🚧 Files skipped from review as they are similar to previous changes (6)
- distribution/build_and_upload_gcp_artifacts.sh
- .tool-versions
- spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
- api/py/test/sample/teams.json
- api/py/ai/chronon/repo/run.py
- spark/src/main/scala/ai/chronon/spark/Driver.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)
169-171
: LGTM! Clean environment variable handling.Good use of early validation and consistent naming convention.
Also applies to: 178-181
197-200
: LGTM! Clean args handling and submission.Good separation of concerns and helpful debug output.
Also applies to: 207-207
189-195
: Consider validating GCP identifiers.Empty Bigtable instance ID and unvalidated project ID might cause issues downstream.
.get("GCP_BIGTABLE_INSTANCE_ID") | ||
.orElse(conf.get("GCP_BIGTABLE_INSTANCE_ID")) | ||
.getOrElse(throw new IllegalArgumentException("GCP_BIGTABLE_INSTANCE_ID environment variable not set")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Environment variable rename is incomplete
The variable name change from GCP_INSTANCE_ID
to GCP_BIGTABLE_INSTANCE_ID
needs to be updated in:
- quickstart/cloud_gcp/scripts/load_data.sh
- quickstart/cloud_gcp/gcp-docker-compose.yml
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreBulkLoadIntegrationTest.scala
🔗 Analysis chain
LGTM! More descriptive environment variable name.
The rename from GCP_INSTANCE_ID to GCP_BIGTABLE_INSTANCE_ID improves clarity.
Let's verify the variable name is updated everywhere:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining instances of the old variable name
rg "GCP_INSTANCE_ID"
# Search for the new variable name to ensure consistent usage
rg "GCP_BIGTABLE_INSTANCE_ID"
Length of output: 1835
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment on the -is-Gcp flag (think we don't need it beyond the driver), otherwise lgtm..
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
api/py/ai/chronon/repo/run.py (5)
133-134
: Consider externalizing version numbers.The hardcoded version "0.1.0-SNAPSHOT" in the defaults could make updates difficult.
-ZIPLINE_ONLINE_JAR_DEFAULT = "cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" +ZIPLINE_ONLINE_JAR_DEFAULT = f"cloud_gcp-assembly-{os.getenv('CLOUD_GCP_VERSION', '0.1.0-SNAPSHOT')}.jar"
Line range hint
186-202
: Use f-strings for better readability.Multiple string concatenations make the code harder to read.
- content_length = re.search( - "(content-length:\\s)(\\d+)", content_output.lower()) + content_length = re.search(r"content-length:\s(\d+)", content_output.lower()) - print("Different file from remote at local: " + - path + ". Re-downloading..") + print(f"Different file from remote at local: {path}. Re-downloading..")
672-673
: Make path extraction more robust.The current implementation might fail with Windows paths or URLs.
-def extract_filename_from_path(path): - return path.split("/")[-1] +def extract_filename_from_path(path): + return os.path.basename(path)
736-738
: Enhance error messages for environment variables.The error messages could be more helpful by suggesting where to set these variables.
- raise ValueError('Please set CUSTOMER_ID environment variable') + raise ValueError('CUSTOMER_ID environment variable is not set. Set it in your .env file or export it in your shell.') - raise ValueError( - 'Please set GCP_PROJECT_ID environment variable') + raise ValueError('GCP_PROJECT_ID environment variable is not set. Set it in your .env file or export it in your shell.')Also applies to: 743-746
842-846
: Enhance CLI help text for GCP options.The help text could better explain the GCP integration aspects.
- help="Jar containing Online KvStore & Deserializer Impl. " - "Used for streaming and metadata-upload mode.") + help="GCP implementation JAR for KvStore & Deserializer. " + "Required for streaming, metadata-upload, and upload-to-kv modes.") - help="Class name of Online Impl. Used for streaming and metadata-upload mode.") + help="GCP implementation class name. Required for streaming, metadata-upload, and upload-to-kv modes.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
api/py/ai/chronon/repo/run.py
(22 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
api/py/ai/chronon/repo/run.py
218-218: Found useless expression. Either assign it to a variable or remove it.
(B018)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (1)
api/py/ai/chronon/repo/run.py (1)
51-51
: LGTM: Mode additions are consistent.The new "upload-to-kv" mode is properly integrated into SPARK_MODES, MODE_ARGS, and ROUTES.
Also applies to: 75-75, 95-95
…ls since we need the bigquery datapointer stuff configured.
…y builds with java 11
3bfc5e6
to
4acbd0b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
api/py/ai/chronon/repo/run.py (1)
587-590
: Extract duplicated GCS upload logic.The conf file upload logic is duplicated. Extract it into a helper method.
+def prepare_gcs_upload_files(conf_path): + local_files = [] + if conf_path: + local_files.append(conf_path) + return local_files -local_files_to_upload_to_gcs = [] -if self.conf: - local_files_to_upload_to_gcs.append(self.conf) +local_files_to_upload_to_gcs = prepare_gcs_upload_files(self.conf)Also applies to: 632-634
spark/src/main/scala/ai/chronon/spark/Driver.scala (2)
769-800
: LGTM: Bulk load implementation is solid.Good error handling and logging. Consider adding metrics for monitoring upload performance.
+ private val metrics = new BulkLoadMetrics() def run(args: Args): Unit = { val startTime = System.currentTimeMillis() + metrics.recordStart() try { kvStore.bulkPut(...) + metrics.recordSuccess() } catch { case e: Exception => + metrics.recordFailure(e) throw e } }
596-598
: Track the TODO about additional-conf-path migration.This technical debt should be tracked for future cleanup.
Would you like me to create a GitHub issue to track the migration away from additional-conf-path?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (9)
.tool-versions
(1 hunks)api/py/ai/chronon/repo/run.py
(23 hunks)api/py/test/sample/teams.json
(1 hunks)chronon_dataproc_submitter.env
(0 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
(1 hunks)distribution/build_and_upload_gcp_artifacts.sh
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(5 hunks)spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
(1 hunks)
💤 Files with no reviewable changes (1)
- chronon_dataproc_submitter.env
🚧 Files skipped from review as they are similar to previous changes (6)
- distribution/build_and_upload_gcp_artifacts.sh
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
- .tool-versions
- spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
- api/py/test/sample/teams.json
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: join_spark_tests
🔇 Additional comments (5)
api/py/ai/chronon/repo/run.py (3)
51-51
: LGTM: Mode and route correctly configured.The new mode "upload-to-kv" is properly mapped to "groupby-upload-bulk-load" route.
Also applies to: 95-95
131-134
: LGTM: GCP constants properly defined.The constants follow naming conventions and provide appropriate defaults.
842-846
: LGTM: Online jar and class options properly configured.The options have appropriate defaults and clear help text.
spark/src/main/scala/ai/chronon/spark/Driver.scala (2)
89-97
: LGTM: GCP args trait well-designed.The trait provides all necessary GCP configuration options with proper optionality.
587-587
: LGTM: OnlineSubcommand properly integrates GCP configuration.Clean implementation that follows the discussed patterns.
Also applies to: 607-615
fetch
to be called from zipline run outside of Driver.scala so that spark is not required
#306
## Summary ^^^ ### Testing This testing assumes the gbu job (this one is just the job that generates the data to bigquery) has run. Tested with this command: ``` (dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (main) $ python api/py/ai/chronon/repo/run.py --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1 --partition-string=2023-11-30 --dataproc --repo=/Users/davidhan/zipline/chronon/api/py/test/sample ``` and led to successful run of this job: https://console.cloud.google.com/dataproc/jobs/2f6b0b81-7b34-4a92-840d-cb90059f3d42/monitoring?region=us-central1&project=canary-443022 ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for a new upload mode to Google Cloud Platform. - Enhanced configuration handling for GCP Dataproc clusters. - Introduced new GCP-related options in the command-line interface. - Updated JSON configuration with new GCP parameters. - **Bug Fixes** - Improved error handling and argument processing in various components. - **Refactor** - Updated environment variable naming conventions. - Restructured configuration management across multiple files. - Enhanced clarity and organization in code structure. - **Chores** - Added support for multiple Java versions. - Updated build and deployment scripts for improved reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ ### Testing This testing assumes the gbu job (this one is just the job that generates the data to bigquery) has run. Tested with this command: ``` (dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (main) $ python api/py/ai/chronon/repo/run.py --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1 --partition-string=2023-11-30 --dataproc --repo=/Users/davidhan/zipline/chronon/api/py/test/sample ``` and led to successful run of this job: https://console.cloud.google.com/dataproc/jobs/2f6b0b81-7b34-4a92-840d-cb90059f3d42/monitoring?region=us-central1&project=canary-443022 ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for a new upload mode to Google Cloud Platform. - Enhanced configuration handling for GCP Dataproc clusters. - Introduced new GCP-related options in the command-line interface. - Updated JSON configuration with new GCP parameters. - **Bug Fixes** - Improved error handling and argument processing in various components. - **Refactor** - Updated environment variable naming conventions. - Restructured configuration management across multiple files. - Enhanced clarity and organization in code structure. - **Chores** - Added support for multiple Java versions. - Updated build and deployment scripts for improved reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ ### Testing This testing assumes the gbu job (this one is just the job that generates the data to bigquery) has run. Tested with this command: ``` (dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (main) $ python api/py/ai/chronon/repo/run.py --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1 --partition-string=2023-11-30 --dataproc --repo=/Users/davidhan/zipline/chronon/api/py/test/sample ``` and led to successful run of this job: https://console.cloud.google.com/dataproc/jobs/2f6b0b81-7b34-4a92-840d-cb90059f3d42/monitoring?region=us-central1&project=canary-443022 ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for a new upload mode to Google Cloud Platform. - Enhanced configuration handling for GCP Dataproc clusters. - Introduced new GCP-related options in the command-line interface. - Updated JSON configuration with new GCP parameters. - **Bug Fixes** - Improved error handling and argument processing in various components. - **Refactor** - Updated environment variable naming conventions. - Restructured configuration management across multiple files. - Enhanced clarity and organization in code structure. - **Chores** - Added support for multiple Java versions. - Updated build and deployment scripts for improved reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ ### Testing This testing assumes the gbu job (this one is just the job that generates the data to bigquery) has run. Tested with this command: ``` (dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (main) $ python api/py/ai/chronon/repo/run.py --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1 --partition-string=2023-11-30 --dataproc --repo=/Users/davidhan/zipline/chronon/api/py/test/sample ``` and led to successful run of this job: https://console.cloud.google.com/dataproc/jobs/2f6b0b81-7b34-4a92-840d-cb90059f3d42/monitoring?region=us-central1&project=canary-443022 ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for a new upload mode to Google Cloud Platform. - Enhanced configuration handling for GCP Dataproc clusters. - Introduced new GCP-related options in the command-line interface. - Updated JSON configuration with new GCP parameters. - **Bug Fixes** - Improved error handling and argument processing in various components. - **Refactor** - Updated environment variable naming conventions. - Restructured configuration management across multiple files. - Enhanced clarity and organization in code structure. - **Chores** - Added support for multiple Java versions. - Updated build and deployment scripts for improved reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ ### Testing This testing assumes the gbu job (this one is just the job that generates the data to bigquery) has run. Tested with this command: ``` (dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (main) $ python api/py/ai/chronon/repo/run.py --mode upload-to-kv --conf production/group_bys/quiour clientsstart/purchases.v1 --partition-string=2023-11-30 --dataproc --repo=/Users/davidhan/zipline/chronon/api/py/test/sample ``` and led to successful run of this job: https://console.cloud.google.com/dataproc/jobs/2f6b0b81-7b34-4a92-840d-cb90059f3d42/monitoring?region=us-central1&project=canary-443022 ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for a new upload mode to Google Cloud Platform. - Enhanced configuration handling for GCP Dataproc clusters. - Introduced new GCP-related options in the command-line interface. - Updated JSON configuration with new GCP parameters. - **Bug Fixes** - Improved error handling and argument processing in various components. - **Refactor** - Updated environment variable naming conventions. - Restructured configuration management across multiple files. - Enhanced clarity and organization in code structure. - **Chores** - Added support for multiple Java versions. - Updated build and deployment scripts for improved reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
^^^
Testing
This testing assumes the gbu job (this one is just the job that generates the data to bigquery) has run.
Tested with this command:
and led to successful run of this job: https://console.cloud.google.com/dataproc/jobs/2f6b0b81-7b34-4a92-840d-cb90059f3d42/monitoring?region=us-central1&project=canary-443022
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Chores