Skip to content

Commit 9802baf

Browse files
fix: Respect coalesce factor again (#372)
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Improved handling of date-based partition columns during table processing to ensure data is formatted and consolidated accurately. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 9eac6a7 commit 9802baf

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -768,15 +768,13 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
768768
val parallelism = sparkSession.sparkContext.getConf.getInt("spark.default.parallelism", 1000)
769769
val coalesceFactor = sparkSession.sparkContext.getConf.getInt("spark.chronon.coalesce.factor", 10)
770770

771-
df.coalesce(coalesceFactor * parallelism)
772-
773771
// TODO: this is a temporary fix to handle the case where the partition column is a DATE type and not a string.
774772
// This is the case for partitioned BigQuery native tables.
775-
if (df.schema.fieldNames.contains(partitionColumn) && df.schema(partitionColumn).dataType == DateType) {
776-
df.withColumn(partitionColumn, date_format(df.col(partitionColumn), partitionFormat))
777-
} else {
778-
df
779-
}
773+
(if (df.schema.fieldNames.contains(partitionColumn) && df.schema(partitionColumn).dataType == DateType) {
774+
df.withColumn(partitionColumn, date_format(df.col(partitionColumn), partitionFormat))
775+
} else {
776+
df
777+
}).coalesce(coalesceFactor * parallelism)
780778
}
781779

782780
def whereClauses(partitionRange: PartitionRange, partitionColumn: String = partitionColumn): Seq[String] = {

0 commit comments

Comments
 (0)