-
Notifications
You must be signed in to change notification settings - Fork 0
Add flag to skip repartition before writing. #239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request modifies the Changes
Possibly related PRs
Suggested Reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
(df.count(), 1) | ||
} | ||
val skipRepartitionBeforeWriting = | ||
sparkSession.conf.get("spark.chronon.skip_repartition_before_writing", "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sparkSession.conf.get("spark.chronon.skip_repartition_before_writing", "false").toBoolean | |
sparkSession.conf.get("spark.chronon.write.repartition", "false").toBoolean |
.drop(saltCol) | ||
.sortWithinPartitions(partitionSortCols.map(col): _*) | ||
.write | ||
if (skipRepartitionBeforeWriting) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add this switch in the insert(Partitions) or insert(UnPartitioned) methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved it to those methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (2)
374-384
: Extract configuration flag to a class-level constant.The configuration flag is duplicated. Consider extracting it to avoid repetition.
class TableUtils(@transient val sparkSession: SparkSession) extends Serializable { + private val skipRepartitionBeforeWriting = + sparkSession.conf.get("spark.chronon.write.repartition", "false").toBoolean + // ... existing code ... - val skipRepartitionBeforeWriting = - sparkSession.conf.get("spark.chronon.write.repartition", "false").toBoolean
564-566
: Fix indentation.The indentation is inconsistent with the surrounding code.
- } else { - (Seq(saltCol), sortByCols) - } + } else { + (Seq(saltCol), sortByCols) + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(5 hunks)
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
444-453
: 🛠️ Refactor suggestionRemove duplicated configuration check.
This block duplicates the same logic as in
insertPartitions
. Use the extracted class-level constant.- val skipRepartitionBeforeWriting = - sparkSession.conf.get("spark.chronon.write.repartition", "false").toBoolean if (skipRepartitionBeforeWriting) {Likely invalid or redundant comment.
@@ -431,7 +441,16 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable | |||
tableProperties, | |||
fileFormat) | |||
|
|||
repartitionAndWrite(df, tableName, saveMode, None) | |||
val skipRepartitionBeforeWriting = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait let's just put this branch in the repartitionAndwrite
method, there's other transformation logic we'd skip out if you have it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean the wrapWithCache on line 475 (old file)?
val skipRepartitionBeforeWriting = | ||
sparkSession.conf.get("spark.chronon.write.repartition", "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this class level member?
@@ -431,7 +441,16 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable | |||
tableProperties, | |||
fileFormat) | |||
|
|||
repartitionAndWrite(df, tableName, saveMode, None) | |||
val skipRepartitionBeforeWriting = | |||
sparkSession.conf.get("spark.chronon.write.repartition", "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to be class level
ef9d272
to
7edd765
Compare
repartitionAndWriteInternal(df, tableName, saveMode, stats, sortByCols) | ||
val dataPointer = DataPointer.from(tableName, sparkSession) | ||
val repartitioned = | ||
if (sparkSession.conf.get("spark.chronon.write.repartition", "false").toBoolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nikhil-zlai | @david-zlai here's the conf guard.
48150b2
to
bec3da7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
484-498
: Consider making repartitioning enabled by default.The new flag defaults to false, which means repartitioning is skipped. This could lead to suboptimal write performance for large datasets.
- if (sparkSession.conf.get("spark.chronon.write.repartition", false.toString).toBoolean) + if (sparkSession.conf.get("spark.chronon.write.repartition", true.toString).toBoolean)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
Line range hint
503-574
: LGTM! Clean method refactoring.The method has been correctly refactored to separate concerns between repartitioning and writing operations.
@david-zlai | @nikhil-zlai main difference in functionality here is that an empty dataframe is still going to trigger a spark action to attempt to write, rather than no action at all. I think that's better and more semantically correct, we would expect a new version of a table to be generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (1)
488-501
: Nice test. Consider a quick schema check after the first insert.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (5)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (2)
26-26
: Looks good.
35-36
: Fine addition.spark/src/main/scala/ai/chronon/spark/TableUtils.scala (3)
484-499
: Config default differs from prior suggestion. Should it be false by default?#!/bin/bash # Check references and merges for "spark.chronon.write.repartition" defaults rg "spark\.chronon\.write\.repartition"
503-506
: Return type improvement is good.
573-573
: Zero-row shortcut is logical.
dfe2ad8
to
f3f4a2e
Compare
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
f3f4a2e
to
20ef1dc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (1)
487-501
: LGTM! Good edge case coverage.Test verifies empty DataFrame handling and subsequent data insertion.
However, consider adding assertions for the schema and partition structure:
assertEquals(0, res.count) +assertEquals(Seq("ds", "id"), res.schema.fieldNames.toSeq) +assertTrue(tableUtils.allPartitions(tableName).nonEmpty)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/main/scala/ai/chronon/spark/TableUtils.scala
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (1)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (1)
34-35
: LGTM! Well-structured case class.Simple and clean case class definition with appropriate field types.
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added configurable repartitioning option for DataFrame writes. - Introduced a new configuration setting to control repartitioning behavior. - Enhanced test suite with functionality to handle empty DataFrames. - **Chores** - Improved code formatting and logging for DataFrame writing process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added configurable repartitioning option for DataFrame writes. - Introduced a new configuration setting to control repartitioning behavior. - Enhanced test suite with functionality to handle empty DataFrames. - **Chores** - Improved code formatting and logging for DataFrame writing process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added configurable repartitioning option for DataFrame writes. - Introduced a new configuration setting to control repartitioning behavior. - Enhanced test suite with functionality to handle empty DataFrames. - **Chores** - Improved code formatting and logging for DataFrame writing process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added configurable repartitioning option for DataFrame writes. - Introduced a new configuration setting to control repartitioning behavior. - Enhanced test suite with functionality to handle empty DataFrames. - **Chores** - Improved code formatting and logging for DataFrame writing process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added configurable repartitioning option for DataFrame writes. - Introduced a new configuration setting to control repartitioning behavior. - Enhanced test suite with functionality to handle empty DataFrames. - **Chores** - Improved code formatting and logging for DataFrame writing process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
## Summary ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added configurable repartitioning option for DataFrame writes. - Introduced a new configuration setting to control repartitioning behavior. - Enhanced test suite with functionality to handle empty DataFrames. - **Chores** - Improved code formatting and logging for DataFrame writing process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Summary
Checklist
Summary by CodeRabbit
New Features
Chores