-
Notifications
You must be signed in to change notification settings - Fork 0
chore: refactor some existing methods #156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces a systematic refactoring of DataFrame saving methods across multiple Spark-related classes. The changes replace Changes
Possibly related PRs
Suggested Reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (4)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/StagingQuery.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
(0 hunks)
💤 Files with no reviewable changes (1)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
🔇 Additional comments (6)
spark/src/main/scala/ai/chronon/spark/StagingQuery.scala (1)
83-83
: Check partition expansions
Use a try/catch or confirm that the table exists.
spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (1)
134-134
: Validate table write
Ensure correct partition data after saving.
spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala (2)
28-28
: New import usage
Extensions needed for saveUnPartitioned. Looks fine.
91-91
: Metrics table save
Make sure metrics are consistent with compare logic.
✅ Verification successful
Let me gather information about the metrics and compare logic to verify their consistency.
This will help us understand:
- The context around metrics table save
- The compare logic implementation
- How metrics are calculated and used
Let me search for the compare logic implementation to verify its consistency with metrics.
Let me check how the metrics are computed and what they contain.
Let me check the metric transformations and compare logic implementation.
Let me check the final implementation of metrics computation and comparison.
Based on the code analysis, I can now provide a final response:
Metrics are correctly aligned with compare logic
The metrics computation is properly integrated with the comparison process:
- Metrics are generated from value fields and mapped columns
- Comparison results feed into metrics through CompareBaseJob.compare
- Test cases verify metrics consistency with actual data mismatches
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Find the metrics table save implementation and surrounding context
rg -A 10 -B 10 "saveUnPartitioned.*metricsTableName"
# Find compare logic implementation
ast-grep --pattern 'def compare($$$) {
$$$
}'
# Search for metric calculation logic
rg -A 5 "metrics" --type scala
Length of output: 76428
Script:
#!/bin/bash
# Find compare logic implementation
ast-grep --pattern 'def compare($$$) {
$$$
CompareBaseJob.compare($$$)
$$$
}'
# Find metrics computation in CompareBaseJob
rg -A 10 "CompareMetrics.compute"
Length of output: 778
Script:
#!/bin/bash
# Find metrics computation implementation
rg -A 15 "def compute.*fields.*compareDf.*keys.*name.*mapping" --type scala
# Find metric transformations
ast-grep --pattern 'val metrics = $$$'
Length of output: 2642
Script:
#!/bin/bash
# Find metric transformations for numeric and any columns
rg -A 10 "def numericTransforms|def anyTransforms" --type scala
# Find CompareMetrics compute implementation
rg -A 20 "object CompareMetrics" --type scala
Length of output: 4228
Script:
#!/bin/bash
# Find CompareMetrics compute implementation
rg -B 5 -A 20 "def compute.*valueSchema.*compareDf.*keys.*name.*mapping" --type scala
# Find how metrics are used in comparison
rg -A 10 "CompareBaseJob.compare.*leftDf.*rightDf.*keys" --type scala
Length of output: 1468
spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala (2)
25-25
: New import
Required for the new save method. OK.
229-231
: Auto-expand caution
Check columns added at runtime won't break downstream.
✅ Verification successful
Let me gather information about the codebase to understand the auto-expand feature and its potential impact.
Based on the search results, I can now generate a final response:
Auto-expand is safe here
The autoExpand
parameter is used consistently across the codebase for schema evolution support, with proper validation in TableUtils
. Tests show that incompatible schema changes are caught by IncompatibleSchemaException
, protecting downstream dependencies.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for references to autoExpand parameter and its usage
rg -A 5 "autoExpand\s*="
# Look for save method implementation
ast-grep --pattern 'def save($$$) { $$$ }'
# Search for any references to schema evolution or column addition
rg -i "schema.*evolution|add.*column"
Length of output: 18441
883c9a8
to
1ef9160
Compare
1ef9160
to
e9a0d24
Compare
cc65297
to
603c857
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
build.sbt (1)
218-218
: Check version alignmentConsider matching the version with
gcsio
(3.0.3) for consistency.- libraryDependencies += "com.google.cloud.bigdataoss" % "util-hadoop" % "3.0.0" + libraryDependencies += "com.google.cloud.bigdataoss" % "util-hadoop" % "3.0.3"
7802e5f
to
d55e7a2
Compare
-e Co-authored-by: Thomas Chow <[email protected]>
-e Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> -e Co-authored-by: Thomas Chow <[email protected]>
d55e7a2
to
8709e08
Compare
## Summary Don't rely on `TableUtils` directly as much as possible, use the dataframe extensions. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Updated DataFrame saving methods across multiple Spark job classes - Replaced `tableUtils.insertPartitions` and `tableUtils.insertUnPartitioned` with direct DataFrame `save` and `saveUnPartitioned` methods - Simplified data persistence mechanism in LogFlattenerJob, StagingQuery, CompareJob, and ConsistencyJob <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Don't rely on `TableUtils` directly as much as possible, use the dataframe extensions. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Updated DataFrame saving methods across multiple Spark job classes - Replaced `tableUtils.insertPartitions` and `tableUtils.insertUnPartitioned` with direct DataFrame `save` and `saveUnPartitioned` methods - Simplified data persistence mechanism in LogFlattenerJob, StagingQuery, CompareJob, and ConsistencyJob <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Don't rely on `TableUtils` directly as much as possible, use the dataframe extensions. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Updated DataFrame saving methods across multiple Spark job classes - Replaced `tableUtils.insertPartitions` and `tableUtils.insertUnPartitioned` with direct DataFrame `save` and `saveUnPartitioned` methods - Simplified data persistence mechanism in LogFlattenerJob, StagingQuery, CompareJob, and ConsistencyJob <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Don't rely on `TableUtils` directly as much as possible, use the dataframe extensions. ## Cheour clientslist - [ ] Added Unit Tests - [x] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Updated DataFrame saving methods across multiple Spark job classes - Replaced `tableUtils.insertPartitions` and `tableUtils.insertUnPartitioned` with direct DataFrame `save` and `saveUnPartitioned` methods - Simplified data persistence mechanism in LogFlattenerJob, StagingQuery, CompareJob, and ConsistencyJob <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
Summary
Don't rely on
TableUtils
directly as much as possible, use the dataframe extensions.Checklist
Summary by CodeRabbit
tableUtils.insertPartitions
andtableUtils.insertUnPartitioned
with direct DataFramesave
andsaveUnPartitioned
methods