Skip to content

Commit 25ff3cf

Browse files
chore: remove writeFormat as it is no longer used (#491)
## Summary - Remove `writeFormat`. We now just create the table with the proper provider, and write into it as specified by the table. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Removed legacy support for custom data write formatting and configuration. - Eliminated extended data pointer handling for saving and loading operations. - Streamlined table write operations by discarding redundant prefix and format determination logic. - Enhanced input validation for table types in the table creation process. - Simplified `BigQueryFormat` and `GCSFormat` by removing unsupported methods. - Removed unnecessary methods from `DeltaLake`, `Hive`, and `Iceberg` format implementations. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 24b70e9 commit 25ff3cf

File tree

13 files changed

+10
-251
lines changed

13 files changed

+10
-251
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import ai.chronon.spark.format.Format
55
import com.google.cloud.bigquery.BigQuery
66
import com.google.cloud.bigquery.connector.common.BigQueryUtil
77
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTableProvider
8-
import org.apache.spark.sql.{DataFrame, SparkSession}
8+
import org.apache.spark.sql.SparkSession
99
import org.apache.spark.sql.functions.{col, date_format, to_date}
1010

1111
case class BigQueryFormat(project: String, bqClient: BigQuery, override val options: Map[String, String])
@@ -22,13 +22,6 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
2222
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
2323
implicit sparkSession: SparkSession): List[String] =
2424
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)
25-
override def generateTableBuilder(df: DataFrame,
26-
tableName: String,
27-
partitionColumns: List[String],
28-
tableProperties: Map[String, String],
29-
fileFormat: String): (String => Unit) => Unit = {
30-
throw new UnsupportedOperationException("generateTableBuilder not supported for BigQuery")
31-
}
3225

3326
override def partitions(tableName: String)(implicit sparkSession: SparkSession): List[Map[String, String]] = {
3427
import sparkSession.implicits._
@@ -99,10 +92,5 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
9992

10093
}
10194

102-
def createTableTypeString: String =
103-
throw new UnsupportedOperationException("createTableTypeString not yet supported for BigQuery")
104-
def fileFormatString(format: String): String =
105-
throw new UnsupportedOperationException("fileFormatString not yet supported for BigQuery")
106-
10795
override def supportSubPartitionsFilter: Boolean = true
10896
}

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigq
55
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
66
import org.apache.spark.sql.execution.FileSourceScanExec
77
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
8-
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
8+
import org.apache.spark.sql.{Encoders, Row, SparkSession}
99
import org.slf4j.LoggerFactory
1010

1111
case class GCS(sourceUri: String, fileFormat: String) extends Format {
@@ -78,18 +78,6 @@ case class GCS(sourceUri: String, fileFormat: String) extends Format {
7878
.toList
7979
}
8080

81-
override def generateTableBuilder(df: DataFrame,
82-
tableName: String,
83-
partitionColumns: List[String],
84-
tableProperties: Map[String, String],
85-
fileFormat: String): (String => Unit) => Unit = {
86-
throw new UnsupportedOperationException("generateTableBuilder not supported for GCS")
87-
}
88-
89-
def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table")
90-
91-
def fileFormatString(format: String): String = ""
92-
9381
override def supportSubPartitionsFilter: Boolean = true
9482

9583
}

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,6 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
3636

3737
override def readFormat(tableName: String): scala.Option[Format] = format(tableName)
3838

39-
override def writeFormat(table: String): Format = {
40-
val writePrefix = TableUtils(sparkSession).writePrefix
41-
require(writePrefix.nonEmpty, "Please set conf 'spark.chronon.table_write.prefix' pointing to a data buour clientset.")
42-
43-
val path = writePrefix.get + table.sanitize //split("/").map(_.sanitize).mkString("/")
44-
GCS(path, "PARQUET")
45-
}
46-
4739
private[cloud_gcp] def getFormat(table: Table): Format = {
4840
table.getDefinition.asInstanceOf[TableDefinition] match {
4941
case definition: ExternalTableDefinition =>

spark/src/main/scala/ai/chronon/spark/CatalogAwareDataPointer.scala

Lines changed: 0 additions & 39 deletions
This file was deleted.

spark/src/main/scala/ai/chronon/spark/Extensions.scala

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -297,89 +297,6 @@ object Extensions {
297297
result
298298
}
299299
}
300-
301-
implicit class DataPointerAwareDataFrameWriter[T](dfw: DataFrameWriter[T]) {
302-
303-
def save(dataPointer: DataPointer): Unit = {
304-
305-
val optionDfw = dfw.options(dataPointer.writeOptions)
306-
dataPointer.writeFormat
307-
.map((wf) => {
308-
val normalized = wf.toLowerCase
309-
normalized match {
310-
case "bigquery" | "bq" =>
311-
optionDfw
312-
.format("bigquery")
313-
.save(dataPointer.tableOrPath)
314-
case "snowflake" | "sf" =>
315-
optionDfw
316-
.format("net.snowflake.spark.snowflake")
317-
.option("dbtable", dataPointer.tableOrPath)
318-
.save()
319-
case "parquet" | "csv" =>
320-
optionDfw
321-
.format(normalized)
322-
.save(dataPointer.tableOrPath)
323-
case "hive" | "delta" | "iceberg" =>
324-
optionDfw
325-
.format(normalized)
326-
.partitionBy(null: _*)
327-
.insertInto(dataPointer.tableOrPath)
328-
case _ =>
329-
throw new UnsupportedOperationException(s"Unsupported write catalog: ${normalized}")
330-
}
331-
})
332-
.getOrElse(
333-
// None case is just table against default catalog
334-
optionDfw
335-
.format("hive")
336-
.partitionBy(null: _*)
337-
.insertInto(dataPointer.tableOrPath))
338-
}
339-
}
340-
341-
implicit class DataPointerAwareDataFrameReader(dfr: DataFrameReader) {
342-
343-
def load(dataPointer: DataPointer): DataFrame = {
344-
val tableOrPath = dataPointer.tableOrPath
345-
346-
val optionDfr = dfr.options(dataPointer.readOptions)
347-
348-
dataPointer.readFormat
349-
.map { fmt =>
350-
val fmtLower = fmt.toLowerCase
351-
352-
fmtLower match {
353-
354-
case "bigquery" | "bq" =>
355-
optionDfr
356-
.format("bigquery")
357-
.load(tableOrPath)
358-
359-
case "snowflake" | "sf" =>
360-
optionDfr
361-
.format("net.snowflake.spark.snowflake")
362-
.option("dbtable", tableOrPath)
363-
.load()
364-
365-
case "parquet" | "csv" =>
366-
optionDfr
367-
.format(fmt)
368-
.load(tableOrPath)
369-
370-
case "hive" | "delta" | "iceberg" => optionDfr.table(tableOrPath)
371-
372-
case _ =>
373-
throw new UnsupportedOperationException(s"Unsupported read catalog: $fmtLower")
374-
375-
}
376-
}
377-
.getOrElse {
378-
// None case is just table against default catalog
379-
optionDfr.table(tableOrPath)
380-
}
381-
}
382-
}
383300
implicit class SourceSparkOps(source: api.Source) {
384301

385302
def partitionColumn(implicit tableUtils: TableUtils): String = {

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
7676

7777
private val tableWriteFormat = sparkSession.conf.get("spark.chronon.table_write.format", "").toLowerCase
7878

79-
val writePrefix: Option[String] = {
80-
81-
val barePrefix = sparkSession.conf.get("spark.chronon.table_write.prefix", "")
82-
83-
if (barePrefix.isEmpty || barePrefix.toUpperCase() == "NONE") {
84-
None
85-
} else if (barePrefix.endsWith("/")) {
86-
Some(barePrefix)
87-
} else {
88-
Some(barePrefix + "/")
89-
}
90-
91-
}
92-
9379
// transient because the format provider is not always serializable.
9480
// for example, BigQueryImpl during reflecting with bq flavor
9581
@transient private implicit lazy val tableFormatProvider: FormatProvider = FormatProvider.from(sparkSession)

spark/src/main/scala/ai/chronon/spark/format/CreationUtils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,20 @@ import org.apache.spark.sql.types.StructType
44

55
object CreationUtils {
66

7+
private val ALLOWED_TABLE_TYPES = List("iceberg", "delta", "hive", "parquet", "hudi")
8+
79
def createTableSql(tableName: String,
810
schema: StructType,
911
partitionColumns: List[String],
1012
tableProperties: Map[String, String],
1113
fileFormatString: String,
1214
tableTypeString: String): String = {
1315

16+
require(
17+
tableTypeString.isEmpty || ALLOWED_TABLE_TYPES.contains(tableTypeString.toLowerCase),
18+
s"Invalid table type: ${tableTypeString}. Must be empty OR one of: ${ALLOWED_TABLE_TYPES}"
19+
)
20+
1421
val noPartitions = StructType(
1522
schema
1623
.filterNot(field => partitionColumns.contains(field.name)))

spark/src/main/scala/ai/chronon/spark/format/DefaultFormatProvider.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,31 +50,4 @@ case class DefaultFormatProvider(sparkSession: SparkSession) extends FormatProvi
5050
false
5151
}
5252
}
53-
54-
// Return the write format to use for the given table. The logic at a high level is:
55-
// 1) If the user specifies the spark.chronon.table_write.iceberg - we go with Iceberg
56-
// 2) If the user specifies a spark.chronon.table_write.format as Hive (parquet), Iceberg or Delta we go with their choice
57-
// 3) Default to Hive (parquet)
58-
// Note the table_write.iceberg is supported for legacy reasons. Specifying "iceberg" in spark.chronon.table_write.format
59-
// is preferred as the latter conf also allows us to support additional formats
60-
override def writeFormat(tableName: String): Format = {
61-
val useIceberg: Boolean = sparkSession.conf.get("spark.chronon.table_write.iceberg", "false").toBoolean
62-
63-
// Default provider just looks for any default config.
64-
// Unlike read table, these write tables might not already exist.
65-
val maybeFormat = sparkSession.conf.getOption("spark.chronon.table_write.format").map(_.toLowerCase) match {
66-
case Some("hive") => Some(Hive)
67-
case Some("iceberg") => Some(Iceberg)
68-
case Some("delta") => Some(DeltaLake)
69-
case _ => None
70-
}
71-
(useIceberg, maybeFormat) match {
72-
// if explicitly configured Iceberg - we go with that setting
73-
case (true, _) => Iceberg
74-
// else if there is a write format we piour clients that
75-
case (false, Some(format)) => format
76-
// fallbaour clients to hive (parquet)
77-
case (false, None) => Hive
78-
}
79-
}
8053
}

spark/src/main/scala/ai/chronon/spark/format/DeltaLake.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,5 @@ case object DeltaLake extends Format {
3232

3333
}
3434

35-
def createTableTypeString: String = "DELTA"
36-
37-
def fileFormatString(format: String): String = ""
38-
3935
override def supportSubPartitionsFilter: Boolean = true
4036
}

spark/src/main/scala/ai/chronon/spark/format/Format.scala

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
paour clientsage ai.chronon.spark.format
22

33
import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql
4-
import ai.chronon.spark.format.CreationUtils.createTableSql
5-
import org.apache.spark.sql.DataFrame
64
import org.apache.spark.sql.SparkSession
7-
import org.slf4j.Logger
8-
import org.slf4j.LoggerFactory
5+
import org.slf4j.{Logger, LoggerFactory}
96

107
trait Format {
118
@transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass)
@@ -48,33 +45,6 @@ trait Format {
4845
// )
4946
def partitions(tableName: String)(implicit sparkSession: SparkSession): List[Map[String, String]]
5047

51-
def generateTableBuilder(df: DataFrame,
52-
tableName: String,
53-
partitionColumns: List[String],
54-
tableProperties: Map[String, String],
55-
fileFormat: String): (String => Unit) => Unit = {
56-
57-
def inner(df: DataFrame,
58-
tableName: String,
59-
partitionColumns: List[String],
60-
tableProperties: Map[String, String],
61-
fileFormat: String)(sqlEvaluator: String => Unit): Unit = {
62-
val creationSql =
63-
createTableSql(tableName,
64-
df.schema,
65-
partitionColumns,
66-
tableProperties,
67-
fileFormatString(fileFormat),
68-
createTableTypeString)
69-
70-
sqlEvaluator(creationSql)
71-
()
72-
}
73-
74-
inner(df, tableName, partitionColumns, tableProperties, fileFormat)
75-
76-
}
77-
7848
def alterTableProperties(tableName: String, tableProperties: Map[String, String]): (String => Unit) => Unit = {
7949

8050
def inner(tableName: String, tableProperties: Map[String, String])(sqlEvaluator: String => Unit) = {
@@ -89,16 +59,7 @@ trait Format {
8959

9060
}
9161

92-
// Help specify the appropriate table type to use in the Spark create table DDL query
93-
def createTableTypeString: String
94-
95-
// Help specify the appropriate file format to use in the Spark create table DDL query
96-
def fileFormatString(format: String): String
97-
9862
// Does this format support sub partitions filters
9963
def supportSubPartitionsFilter: Boolean
10064

101-
// TODO: remove this once all formats implement table creation
102-
val canCreateTable: Boolean = false
103-
10465
}

spark/src/main/scala/ai/chronon/spark/format/FormatProvider.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ trait FormatProvider extends Serializable {
1616

1717
def readFormat(tableName: String): Option[Format]
1818

19-
def writeFormat(tableName: String): Format
20-
2119
def resolveTableName(tableName: String) = tableName
2220

2321
}

spark/src/main/scala/ai/chronon/spark/format/Hive.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,5 @@ case object Hive extends Format {
3232
.toList
3333
}
3434

35-
def createTableTypeString: String = ""
36-
37-
def fileFormatString(format: String): String = s"STORED AS $format"
38-
3935
override def supportSubPartitionsFilter: Boolean = true
4036
}

spark/src/main/scala/ai/chronon/spark/format/Iceberg.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,5 @@ case object Iceberg extends Format {
5252
}
5353
}
5454

55-
def createTableTypeString: String = "ICEBERG"
56-
57-
def fileFormatString(format: String): String = ""
58-
5955
override def supportSubPartitionsFilter: Boolean = false
6056
}

0 commit comments

Comments
 (0)