Skip to content

Commit 1ef9160

Browse files
committed
chore: refactor some existing methods
1 parent 6f9342d commit 1ef9160

File tree

4 files changed

+10
-13
lines changed

4 files changed

+10
-13
lines changed

spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import ai.chronon.api._
2222
import ai.chronon.online.OnlineDerivationUtil.timeFields
2323
import ai.chronon.online._
2424
import ai.chronon.spark.Extensions.StructTypeOps
25+
import ai.chronon.spark.Extensions._
2526
import org.apache.spark.rdd.RDD
2627
import org.apache.spark.sql.DataFrame
2728
import org.apache.spark.sql.Dataset
@@ -224,11 +225,10 @@ class LogFlattenerJob(session: SparkSession,
224225
val schemaTblProps = buildTableProperties(schemaStringsMap)
225226
logger.info("======= Log table schema =======")
226227
logger.info(flattenedDf.schema.pretty)
227-
tableUtils.insertPartitions(flattenedDf,
228-
joinConf.metaData.loggedTable,
229-
tableProperties =
230-
joinTblProps ++ schemaTblProps ++ Map(Constants.ChrononLogTable -> true.toString),
231-
autoExpand = true)
228+
229+
flattenedDf.save(joinConf.metaData.loggedTable,
230+
joinTblProps ++ schemaTblProps ++ Map(Constants.ChrononLogTable -> true.toString),
231+
autoExpand = true)
232232

233233
val inputRowCount = rawDf.count()
234234
// read from output table to avoid recomputation

spark/src/main/scala/ai/chronon/spark/StagingQuery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
8080
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
8181
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
8282
val df = tableUtils.sql(renderedQuery)
83-
tableUtils.insertPartitions(df, outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
83+
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
8484
logger.info(s"Wrote to table $outputTable, into partitions: $range $progress")
8585
}
8686
logger.info(s"Finished writing Staging Query data to $outputTable")

spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import ai.chronon.online.DataMetrics
2525
import ai.chronon.online.PartitionRange
2626
import ai.chronon.online.SparkConversions
2727
import ai.chronon.spark.Analyzer
28+
import ai.chronon.spark.Extensions._
2829
import ai.chronon.spark.StagingQuery
2930
import ai.chronon.spark.TableUtils
3031
import ai.chronon.spark.TimedKvRdd
3132
import ai.chronon.spark.stats.CompareJob.getJoinKeys
3233
import org.apache.spark.sql.DataFrame
33-
import org.apache.spark.sql.SaveMode
3434
import org.slf4j.Logger
3535
import org.slf4j.LoggerFactory
3636

@@ -82,13 +82,13 @@ class CompareJob(
8282
logger.info("Saving comparison output..")
8383
logger.info(
8484
s"Comparison schema ${compareDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
85-
tableUtils.insertUnPartitioned(compareDf, comparisonTableName, tableProps, saveMode = SaveMode.Overwrite)
85+
compareDf.saveUnPartitioned(comparisonTableName, tableProps)
8686

8787
// Save the metrics table
8888
logger.info("Saving metrics output..")
8989
val metricsDf = metricsTimedKvRdd.toFlatDf
9090
logger.info(s"Metrics schema ${metricsDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
91-
tableUtils.insertUnPartitioned(metricsDf, metricsTableName, tableProps, saveMode = SaveMode.Overwrite)
91+
metricsDf.saveUnPartitioned(metricsTableName, tableProps)
9292

9393
logger.info("Printing basic comparison results..")
9494
logger.info("(Note: This is just an estimation and not a detailed analysis of results)")

spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ class ConsistencyJob(session: SparkSession, joinConf: Join, endDate: String) ext
131131
logger.info("Saving output.")
132132
val outputDf = metricsKvRdd.toFlatDf.withTimeBasedColumn("ds")
133133
logger.info(s"output schema ${outputDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
134-
tableUtils.insertPartitions(outputDf,
135-
joinConf.metaData.consistencyTable,
136-
tableProperties = tblProperties,
137-
autoExpand = true)
134+
outputDf.save(joinConf.metaData.consistencyTable, tableProperties = tblProperties, autoExpand = true)
138135
metricsKvRdd.toAvroDf
139136
.withTimeBasedColumn(tableUtils.partitionColumn)
140137
.save(joinConf.metaData.consistencyUploadTable, tblProperties)

0 commit comments

Comments
 (0)