Skip to content

Commit 1b335d7

Browse files
remove other stuff
Co-authored-by: Thomas Chow <[email protected]>
1 parent 916c39a commit 1b335d7

File tree

6 files changed

+3
-78
lines changed

6 files changed

+3
-78
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import ai.chronon.spark.format.Format
55
import com.google.cloud.bigquery.BigQuery
66
import com.google.cloud.bigquery.connector.common.BigQueryUtil
77
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTableProvider
8-
import org.apache.spark.sql.{DataFrame, SparkSession}
8+
import org.apache.spark.sql.SparkSession
99
import org.apache.spark.sql.functions.{col, date_format, to_date}
1010

1111
case class BigQueryFormat(project: String, bqClient: BigQuery, override val options: Map[String, String])
@@ -22,13 +22,6 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
2222
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
2323
implicit sparkSession: SparkSession): List[String] =
2424
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)
25-
override def generateTableBuilder(df: DataFrame,
26-
tableName: String,
27-
partitionColumns: List[String],
28-
tableProperties: Map[String, String],
29-
fileFormat: String): (String => Unit) => Unit = {
30-
throw new UnsupportedOperationException("generateTableBuilder not supported for BigQuery")
31-
}
3225

3326
override def partitions(tableName: String)(implicit sparkSession: SparkSession): List[Map[String, String]] = {
3427
import sparkSession.implicits._
@@ -99,10 +92,5 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
9992

10093
}
10194

102-
def createTableTypeString: String =
103-
throw new UnsupportedOperationException("createTableTypeString not yet supported for BigQuery")
104-
def fileFormatString(format: String): String =
105-
throw new UnsupportedOperationException("fileFormatString not yet supported for BigQuery")
106-
10795
override def supportSubPartitionsFilter: Boolean = true
10896
}

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery._
55
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
66
import org.apache.spark.sql.execution.FileSourceScanExec
77
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
8-
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
8+
import org.apache.spark.sql.{Encoders, Row, SparkSession}
99
import org.slf4j.LoggerFactory
1010

1111
case class GCS(sourceUri: String, fileFormat: String) extends Format {
@@ -78,18 +78,6 @@ case class GCS(sourceUri: String, fileFormat: String) extends Format {
7878
.toList
7979
}
8080

81-
override def generateTableBuilder(df: DataFrame,
82-
tableName: String,
83-
partitionColumns: List[String],
84-
tableProperties: Map[String, String],
85-
fileFormat: String): (String => Unit) => Unit = {
86-
throw new UnsupportedOperationException("generateTableBuilder not supported for GCS")
87-
}
88-
89-
def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table")
90-
91-
def fileFormatString(format: String): String = ""
92-
9381
override def supportSubPartitionsFilter: Boolean = true
9482

9583
}

spark/src/main/scala/ai/chronon/spark/format/DeltaLake.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,5 @@ case object DeltaLake extends Format {
3232

3333
}
3434

35-
def createTableTypeString: String = "DELTA"
36-
37-
def fileFormatString(format: String): String = ""
38-
3935
override def supportSubPartitionsFilter: Boolean = true
4036
}

spark/src/main/scala/ai/chronon/spark/format/Format.scala

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package ai.chronon.spark.format
22

33
import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql
4-
import ai.chronon.spark.format.CreationUtils.createTableSql
5-
import org.apache.spark.sql.DataFrame
64
import org.apache.spark.sql.SparkSession
7-
import org.slf4j.Logger
8-
import org.slf4j.LoggerFactory
5+
import org.slf4j.{Logger, LoggerFactory}
96

107
trait Format {
118
@transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass)
@@ -48,33 +45,6 @@ trait Format {
4845
// )
4946
def partitions(tableName: String)(implicit sparkSession: SparkSession): List[Map[String, String]]
5047

51-
def generateTableBuilder(df: DataFrame,
52-
tableName: String,
53-
partitionColumns: List[String],
54-
tableProperties: Map[String, String],
55-
fileFormat: String): (String => Unit) => Unit = {
56-
57-
def inner(df: DataFrame,
58-
tableName: String,
59-
partitionColumns: List[String],
60-
tableProperties: Map[String, String],
61-
fileFormat: String)(sqlEvaluator: String => Unit): Unit = {
62-
val creationSql =
63-
createTableSql(tableName,
64-
df.schema,
65-
partitionColumns,
66-
tableProperties,
67-
fileFormatString(fileFormat),
68-
createTableTypeString)
69-
70-
sqlEvaluator(creationSql)
71-
()
72-
}
73-
74-
inner(df, tableName, partitionColumns, tableProperties, fileFormat)
75-
76-
}
77-
7848
def alterTableProperties(tableName: String, tableProperties: Map[String, String]): (String => Unit) => Unit = {
7949

8050
def inner(tableName: String, tableProperties: Map[String, String])(sqlEvaluator: String => Unit) = {
@@ -89,16 +59,7 @@ trait Format {
8959

9060
}
9161

92-
// Help specify the appropriate table type to use in the Spark create table DDL query
93-
def createTableTypeString: String
94-
95-
// Help specify the appropriate file format to use in the Spark create table DDL query
96-
def fileFormatString(format: String): String
97-
9862
// Does this format support sub partitions filters
9963
def supportSubPartitionsFilter: Boolean
10064

101-
// TODO: remove this once all formats implement table creation
102-
val canCreateTable: Boolean = false
103-
10465
}

spark/src/main/scala/ai/chronon/spark/format/Hive.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,5 @@ case object Hive extends Format {
3232
.toList
3333
}
3434

35-
def createTableTypeString: String = ""
36-
37-
def fileFormatString(format: String): String = s"STORED AS $format"
38-
3935
override def supportSubPartitionsFilter: Boolean = true
4036
}

spark/src/main/scala/ai/chronon/spark/format/Iceberg.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,5 @@ case object Iceberg extends Format {
5252
}
5353
}
5454

55-
def createTableTypeString: String = "ICEBERG"
56-
57-
def fileFormatString(format: String): String = ""
58-
5955
override def supportSubPartitionsFilter: Boolean = false
6056
}

0 commit comments

Comments
 (0)