Skip to content

Commit 570d9c1

Browse files
chore: refactor some existing methods (#156)
## Summary Don't rely on `TableUtils` directly as much as possible, use the dataframe extensions. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Updated DataFrame saving methods across multiple Spark job classes - Replaced `tableUtils.insertPartitions` and `tableUtils.insertUnPartitioned` with direct DataFrame `save` and `saveUnPartitioned` methods - Simplified data persistence mechanism in LogFlattenerJob, StagingQuery, CompareJob, and ConsistencyJob <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 8d20689 commit 570d9c1

File tree

4 files changed

+10
-13
lines changed

4 files changed

+10
-13
lines changed

spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import ai.chronon.api._
2222
import ai.chronon.online.OnlineDerivationUtil.timeFields
2323
import ai.chronon.online._
2424
import ai.chronon.spark.Extensions.StructTypeOps
25+
import ai.chronon.spark.Extensions._
2526
import org.apache.spark.rdd.RDD
2627
import org.apache.spark.sql.DataFrame
2728
import org.apache.spark.sql.Dataset
@@ -224,11 +225,10 @@ class LogFlattenerJob(session: SparkSession,
224225
val schemaTblProps = buildTableProperties(schemaStringsMap)
225226
logger.info("======= Log table schema =======")
226227
logger.info(flattenedDf.schema.pretty)
227-
tableUtils.insertPartitions(flattenedDf,
228-
joinConf.metaData.loggedTable,
229-
tableProperties =
230-
joinTblProps ++ schemaTblProps ++ Map(Constants.ChrononLogTable -> true.toString),
231-
autoExpand = true)
228+
229+
flattenedDf.save(joinConf.metaData.loggedTable,
230+
joinTblProps ++ schemaTblProps ++ Map(Constants.ChrononLogTable -> true.toString),
231+
autoExpand = true)
232232

233233
val inputRowCount = rawDf.count()
234234
// read from output table to avoid recomputation

spark/src/main/scala/ai/chronon/spark/StagingQuery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
8080
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
8181
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
8282
val df = tableUtils.sql(renderedQuery)
83-
tableUtils.insertPartitions(df, outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
83+
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
8484
logger.info(s"Wrote to table $outputTable, into partitions: $range $progress")
8585
}
8686
logger.info(s"Finished writing Staging Query data to $outputTable")

spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import ai.chronon.online.DataMetrics
2525
import ai.chronon.online.PartitionRange
2626
import ai.chronon.online.SparkConversions
2727
import ai.chronon.spark.Analyzer
28+
import ai.chronon.spark.Extensions._
2829
import ai.chronon.spark.StagingQuery
2930
import ai.chronon.spark.TableUtils
3031
import ai.chronon.spark.TimedKvRdd
3132
import ai.chronon.spark.stats.CompareJob.getJoinKeys
3233
import org.apache.spark.sql.DataFrame
33-
import org.apache.spark.sql.SaveMode
3434
import org.slf4j.Logger
3535
import org.slf4j.LoggerFactory
3636

@@ -82,13 +82,13 @@ class CompareJob(
8282
logger.info("Saving comparison output..")
8383
logger.info(
8484
s"Comparison schema ${compareDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
85-
tableUtils.insertUnPartitioned(compareDf, comparisonTableName, tableProps, saveMode = SaveMode.Overwrite)
85+
compareDf.saveUnPartitioned(comparisonTableName, tableProps)
8686

8787
// Save the metrics table
8888
logger.info("Saving metrics output..")
8989
val metricsDf = metricsTimedKvRdd.toFlatDf
9090
logger.info(s"Metrics schema ${metricsDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
91-
tableUtils.insertUnPartitioned(metricsDf, metricsTableName, tableProps, saveMode = SaveMode.Overwrite)
91+
metricsDf.saveUnPartitioned(metricsTableName, tableProps)
9292

9393
logger.info("Printing basic comparison results..")
9494
logger.info("(Note: This is just an estimation and not a detailed analysis of results)")

spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ class ConsistencyJob(session: SparkSession, joinConf: Join, endDate: String) ext
131131
logger.info("Saving output.")
132132
val outputDf = metricsKvRdd.toFlatDf.withTimeBasedColumn("ds")
133133
logger.info(s"output schema ${outputDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
134-
tableUtils.insertPartitions(outputDf,
135-
joinConf.metaData.consistencyTable,
136-
tableProperties = tblProperties,
137-
autoExpand = true)
134+
outputDf.save(joinConf.metaData.consistencyTable, tableProperties = tblProperties, autoExpand = true)
138135
metricsKvRdd.toAvroDf
139136
.withTimeBasedColumn(tableUtils.partitionColumn)
140137
.save(joinConf.metaData.consistencyUploadTable, tblProperties)

0 commit comments

Comments
 (0)