Skip to content

Commit 44d0afd

Browse files
fix: use insertInto instead of saveAsTable to preserve original TableUtils behavior (#173)
## Summary - #157 introduced CatalogAwareDataPointer and also a regression in the way we write tables. Need to perform `insertInto` for hive-based tables which have different write semantics from `saveAsTable`. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> Co-authored-by: Thomas Chow <[email protected]>
1 parent 8ccd760 commit 44d0afd

File tree

2 files changed

+18
-15
lines changed

2 files changed

+18
-15
lines changed

spark/src/main/scala/ai/chronon/spark/Extensions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ object Extensions {
325325
case "hive" | "delta" | "iceberg" =>
326326
dfw
327327
.format(normalized)
328-
.saveAsTable(dataPointer.tableOrPath)
328+
.insertInto(dataPointer.tableOrPath)
329329
case _ =>
330330
throw new UnsupportedOperationException(s"Unsupported write catalog: ${normalized}")
331331
}
@@ -334,7 +334,7 @@ object Extensions {
334334
// None case is just table against default catalog
335335
dfw
336336
.format("hive")
337-
.saveAsTable(dataPointer.tableOrPath))
337+
.insertInto(dataPointer.tableOrPath))
338338
}
339339
}
340340

spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ class GroupByTest {
170170
val computed = resultDf.select("user", "ts", "listing_view_last30", "listing_view_count")
171171
computed.show()
172172

173-
val expected = eventDf.sqlContext.sql("""
173+
val expected = eventDf.sqlContext.sql(
174+
"""
174175
|SELECT
175176
| events_last_k.user as user,
176177
| queries_last_k.ts as ts,
@@ -351,9 +352,10 @@ class GroupByTest {
351352

352353
val columns = aggregationsMetadata.map(a => a.name -> a.columnType).toMap
353354
assertEquals(Map(
354-
"time_spent_ms" -> LongType,
355-
"price" -> DoubleType
356-
), columns)
355+
"time_spent_ms" -> LongType,
356+
"price" -> DoubleType
357+
),
358+
columns)
357359
}
358360

359361
// test that OrderByLimit and OrderByLimitTimed serialization works well with Spark's data type
@@ -423,8 +425,8 @@ class GroupByTest {
423425
tableUtils.createDatabase(namespace)
424426
DataFrameGen.events(spark, sourceSchema, count = 1000, partitions = 200).save(sourceTable)
425427
val source = Builders.Source.events(
426-
query =
427-
Builders.Query(selects = Builders.Selects("ts", "item", "time_spent_ms", "price"), startPartition = startPartition),
428+
query = Builders.Query(selects = Builders.Selects("ts", "item", "time_spent_ms", "price"),
429+
startPartition = startPartition),
428430
table = sourceTable
429431
)
430432
(source, endPartition)
@@ -560,7 +562,8 @@ class GroupByTest {
560562
val joinSource = TestUtils.getParentJoin(spark, namespace, "parent_join_table", "parent_gb")
561563
val query = Builders.Query(startPartition = today)
562564
val chainingGroupBy = TestUtils.getTestGBWithJoinSource(joinSource, query, namespace, "chaining_gb")
563-
val newGroupBy = GroupBy.replaceJoinSource(chainingGroupBy, PartitionRange(today, today), tableUtils, computeDependency = false)
565+
val newGroupBy =
566+
GroupBy.replaceJoinSource(chainingGroupBy, PartitionRange(today, today), tableUtils, computeDependency = false)
564567

565568
assertEquals(joinSource.metaData.outputTable, newGroupBy.sources.get(0).table)
566569
assertEquals(joinSource.left.topic + Constants.TopicInvalidSuffix, newGroupBy.sources.get(0).topic)
@@ -656,13 +659,13 @@ class GroupByTest {
656659
new Window(15, TimeUnit.DAYS),
657660
new Window(60, TimeUnit.DAYS)
658661
)
659-
),
662+
)
660663
)
661664
backfill(name = "unit_test_group_by_descriptive_stats",
662-
source = source,
663-
endPartition = endPartition,
664-
namespace = namespace,
665-
tableUtils = tableUtils,
666-
additionalAgg = aggs)
665+
source = source,
666+
endPartition = endPartition,
667+
namespace = namespace,
668+
tableUtils = tableUtils,
669+
additionalAgg = aggs)
667670
}
668671
}

0 commit comments

Comments
 (0)