Skip to content

Commit 711b759

Browse files
feat: refactor for alter tables
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
1 parent 44983bc commit 711b759

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
1818
extends Format {
1919
override def name: String = "bigquery"
2020

21+
override def alterTableProperties(tableName: String,
22+
tableProperties: Map[String, String]): (String => Unit) => Unit = {
23+
throw new NotImplementedError("alterTableProperties not yet supported for BigQuery")
24+
}
25+
2126
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
2227
implicit sparkSession: SparkSession): Seq[String] =
2328
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
311311
// https://app.asana.com/0/1208949807589885/1209111629687568/f
312312
if (writeFormat.name.toUpperCase != "BIGQUERY") {
313313
if (tableProperties != null && tableProperties.nonEmpty) {
314-
sql(alterTablePropertiesSql(tableName, tableProperties))
314+
val alterTblPropsOperation = writeFormat.alterTableProperties(tableName, tableProperties)
315+
alterTblPropsOperation(sql)
315316
}
316317
if (autoExpand) {
317318
expandTable(tableName, df.schema)

spark/src/main/scala/ai/chronon/spark/format/Format.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package ai.chronon.spark.format
22

3+
import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql
34
import ai.chronon.spark.format.CreationUtils.createTableSql
45
import org.apache.spark.sql.DataFrame
56
import org.apache.spark.sql.SparkSession
@@ -73,6 +74,20 @@ trait Format {
7374

7475
}
7576

77+
def alterTableProperties(tableName: String, tableProperties: Map[String, String]): (String => Unit) => Unit = {
78+
79+
def inner(tableName: String, tableProperties: Map[String, String])(sqlEvaluator: String => Unit) = {
80+
val creationSql =
81+
alterTablePropertiesSql(tableName, tableProperties)
82+
83+
sqlEvaluator(creationSql)
84+
85+
}
86+
87+
inner(tableName, tableProperties)
88+
89+
}
90+
7691
// Help specify the appropriate table type to use in the Spark create table DDL query
7792
def createTableTypeString: String
7893

0 commit comments

Comments
 (0)