-
Notifications
You must be signed in to change notification settings - Fork 1
feat: use direct writes to bigquery #264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces modifications to the GCP integration and Spark table utilities, focusing on changes in data writing and table creation processes. The primary updates involve adjusting the BigQuery write format, removing table reachability checks, and simplifying partition handling in the Changes
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🧰 Additional context used📓 Learnings (1)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (2)
⏰ Context from checks skipped due to timeout of 90000ms (6)
🔇 Additional comments (3)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
.select(saltedDf.columns.map { | ||
case c if c == partitionColumn && dataPointer.writeFormat.map(_.toUpperCase).exists("BIGQUERY".equals) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgetting why we had this conditional:
&& dataPointer.writeFormat.map(_.toUpperCase).exists("BIGQUERY".equals)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to keep the behavior in all other cases the same, eg when we write to a Hive / Iceberg table. However, I think we should just try to keep the behavior same everywhere instead of having a ton of branching.
9f87f4c
to
c1a3da9
Compare
0e1fa49
to
0e284cc
Compare
3424bff
to
efbddc9
Compare
44476e7
to
ff2c7b7
Compare
54eb393
to
d1d545f
Compare
ff2c7b7
to
7cdc7c1
Compare
2c92ca2
to
bbc2ec2
Compare
7cdc7c1
to
bc7933b
Compare
bbc2ec2
to
43077dc
Compare
bc7933b
to
d6ed467
Compare
44983bc
to
6d4c5bd
Compare
d6ed467
to
76466ad
Compare
} | ||
if (autoExpand) { | ||
expandTable(tableName, df.schema) | ||
if (writeFormat.name.toUpperCase != "BIGQUERY") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without this conditional it breaks right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes correct.
df | ||
} | ||
val colOrder = df.columns.diff(partitionColumns) ++ partitionColumns | ||
val dfRearranged: DataFrame = df.select(colOrder.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're losing the behavior though of making sure the partition column is the last in the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fine right? va colOrder
handles that for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes it does. thanks for pointing out
val colOrder = df.columns.diff(partitionColumns) ++ partitionColumns | ||
val dfRearranged: DataFrame = df.select(colOrder.map { | ||
case c if c == partitionColumn => | ||
to_date(df.col(c), partitionFormat).as(partitionColumn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we want to only convert the partition column to a date type for just bigquery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to try and make this the same across the board to avoid any special casing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current unit test suite passes with this change, but we'll have to see if it affects hive implementations anywhere. However, I do think the engine should support date partition columns as the right behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, putting this back because it's too much of a lift to support this generally.
76466ad
to
b3d765b
Compare
6d4c5bd
to
7e62ea4
Compare
7e62ea4
to
f0fd283
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (1)
39-39
: Document CREATE_NEVER behavior.Add a comment explaining that jobs will fail if the table doesn't exist, as table creation is handled by TableUtils.
+ // Table must exist or job will fail. Table creation is handled by TableUtils "createDisposition" -> JobInfo.CreateDisposition.CREATE_NEVER.name
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(3 hunks)
🧰 Additional context used
📓 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (2)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (1)
38-40
: Verify BigQuery direct write configuration.The switch to direct writes looks good, but ensure the BigQuery Storage Write API quotas are sufficient for your write patterns.
✅ Verification successful
Direct write configuration is safe to proceed ✅
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if the BigQuery Storage Write API is enabled gcloud services list --format="table(NAME)" | grep bigquerystorage.googleapis.com # Look for any quota-related configurations or issues in the codebase rg -i "quota|rate.?limit" --type scalaLength of output: 2129
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4)
287-287
: LGTM! Initialization moved outside condition.Moving writeFormat initialization before the tableReachable check prevents potential NPEs.
333-334
: LGTM! Simplified column ordering.Clean simplification of column reordering logic.
335-338
: LGTM! Consistent date type handling.Uniform date type conversion for partition columns across all formats improves consistency.
312-318
: Verify BigQuery table properties handling.Skipping table properties and autoExpand for BigQuery tables aligns with direct writes, but ensure this doesn't affect any existing functionality that relies on these properties.
✅ Verification successful
BigQuery table properties handling is correctly implemented
Skipping table properties for BigQuery is intentional and safe. BigQuery tables use direct writes and native property management through BigQuery's API instead of Hive-style table properties. No existing functionality relies on these properties for BigQuery tables.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for code that depends on BigQuery table properties rg -A 5 "tableProperties.*BIGQUERY"Length of output: 37
Script:
#!/bin/bash # Search for BigQuery related code and table properties echo "=== BigQuery Format Usage ===" rg -i "bigquery" -A 5 echo -e "\n=== Table Properties Usage ===" rg "tableProperties" -A 5 echo -e "\n=== Property Configuration Patterns ===" ast-grep --pattern 'if ($format == "BIGQUERY") { $$$ }'Length of output: 68758
f0fd283
to
8b9be56
Compare
0018205
to
ec109c4
Compare
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
ec109c4
to
a891110
Compare
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
Summary
Checklist
Summary by CodeRabbit
Release Notes
Improvements
Changes
These updates improve data management and writing efficiency in cloud data processing workflows.