Skip to content

Commit 2010a0b

Browse files
nikhil-zlaitchow-zlaithomaschow
authored
fix: write to gcs parquet instead of bq native (#371)
## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the table creation process to return clear, detailed statuses, improving feedback during table building. - Introduced a new method for generating table builders that integrates with BigQuery, including error handling for partitioning. - Streamlined data writing operations to cloud storage with automatic path configuration and Parquet integration. - Added explicit partitioning for DataFrame saves in Hive, Delta, and Iceberg formats. - **Refactor** - Overhauled logic to enforce partition restrictions and incorporate robust error handling for a smoother user experience. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
1 parent 9802baf commit 2010a0b

File tree

6 files changed

+190
-81
lines changed

6 files changed

+190
-81
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package ai.chronon.integrations.cloud_gcp
22

3+
import ai.chronon.spark.TableUtils
4+
import ai.chronon.spark.TableUtils.TableCreationStatus
35
import ai.chronon.spark.format.Format
46
import com.google.cloud.bigquery.connector.common.BigQueryUtil
57
import com.google.cloud.spark.bigquery.SchemaConverters
@@ -26,13 +28,14 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
2628
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
2729
implicit sparkSession: SparkSession): Seq[String] =
2830
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)
29-
override def createTable(df: DataFrame,
30-
tableName: String,
31-
partitionColumns: Seq[String],
32-
tableProperties: Map[String, String],
33-
fileFormat: String): (String => Unit) => Unit = {
31+
override def generateTableBuilder(df: DataFrame,
32+
tableName: String,
33+
partitionColumns: Seq[String],
34+
tableProperties: Map[String, String],
35+
fileFormat: String): (String => Unit) => TableCreationStatus = {
3436

35-
def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(sqlEvaluator: String => Unit) = {
37+
def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(
38+
sqlEvaluator: String => Unit): TableCreationStatus = {
3639

3740
// See: https://cloud.google.com/bigquery/docs/partitioned-tables#limitations
3841
// "BigQuery does not support partitioning by multiple columns. Only one column can be used to partition a table."
@@ -57,8 +60,8 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
5760
val tableInfoBuilder = TableInfo.newBuilder(shadedTableId, tableDefinition.build)
5861

5962
val tableInfo = tableInfoBuilder.build
60-
6163
bqClient.create(tableInfo)
64+
TableUtils.TableCreatedWithoutInitialData
6265
}
6366

6467
inner(df, tableName, partitionColumns)

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,33 @@
11
package ai.chronon.integrations.cloud_gcp
22

3+
import ai.chronon.api.Extensions.StringOps
4+
import ai.chronon.api.ScalaJavaConversions.JListOps
5+
import ai.chronon.spark.TableUtils
6+
import ai.chronon.spark.TableUtils.{TableCreatedWithInitialData, TableCreationStatus}
37
import ai.chronon.spark.format.Format
4-
import org.apache.spark.sql.Encoders
5-
import org.apache.spark.sql.Row
6-
import org.apache.spark.sql.SparkSession
8+
import com.google.cloud.bigquery.connector.common.BigQueryUtil
9+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.{
10+
BigQuery,
11+
BigQueryOptions,
12+
ExternalTableDefinition,
13+
FormatOptions,
14+
HivePartitioningOptions,
15+
TableInfo
16+
}
17+
import com.google.cloud.spark.bigquery.{SchemaConverters, SchemaConvertersConfiguration}
18+
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
719
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
820
import org.apache.spark.sql.execution.FileSourceScanExec
921
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
22+
import org.slf4j.LoggerFactory
1023

1124
case class GCS(sourceUri: String, fileFormat: String) extends Format {
1225

26+
private lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
27+
28+
private lazy val bqOptions = BigQueryOptions.getDefaultInstance
29+
lazy val bigQueryClient: BigQuery = bqOptions.getService
30+
1331
override def name: String = fileFormat
1432

1533
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
@@ -72,6 +90,62 @@ case class GCS(sourceUri: String, fileFormat: String) extends Format {
7290
}.toMap)
7391
}
7492

93+
override def generateTableBuilder(df: DataFrame,
94+
tableName: String,
95+
partitionColumns: Seq[String],
96+
tableProperties: Map[String, String],
97+
fileFormat: String): (String => Unit) => TableCreationStatus = {
98+
99+
def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(sqlEvaluator: String => Unit) = {
100+
101+
// See: https://cloud.google.com/bigquery/docs/partitioned-tables#limitations
102+
// "BigQuery does not support partitioning by multiple columns. Only one column can be used to partition a table."
103+
require(partitionColumns.size < 2,
104+
s"BigQuery only supports at most one partition column, incoming spec: ${partitionColumns}")
105+
106+
val shadedTableId = BigQueryUtil.parseTableId(tableName)
107+
108+
val writePrefix = TableUtils(df.sparkSession).writePrefix
109+
require(writePrefix.nonEmpty, "Please set conf 'spark.chronon.table_write.prefix' pointing to a data bucket.")
110+
111+
val path = writePrefix.get + tableName.sanitize + "/" //split("/").map(_.sanitize).mkString("/")
112+
val dataGlob = path + "*"
113+
114+
logger.info(s"""
115+
|table source uri: $dataGlob
116+
|partition uri: $path
117+
|""".stripMargin)
118+
119+
df.write
120+
.partitionBy(partitionColumns: _*)
121+
.mode("overwrite") // or "append" based on your needs
122+
.parquet(path)
123+
124+
val baseTableDef = ExternalTableDefinition
125+
.newBuilder(dataGlob, FormatOptions.parquet())
126+
.setAutodetect(true)
127+
128+
if (partitionColumns.nonEmpty) {
129+
val timePartitioning = HivePartitioningOptions
130+
.newBuilder()
131+
.setFields(partitionColumns.toJava)
132+
.setSourceUriPrefix(path)
133+
.setMode("STRINGS")
134+
.build()
135+
baseTableDef.setHivePartitioningOptions(timePartitioning)
136+
}
137+
138+
val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build()
139+
val createdTable = bigQueryClient.create(tableInfo)
140+
141+
println(s"Created external table ${createdTable.getTableId}")
142+
143+
TableCreatedWithInitialData
144+
}
145+
146+
inner(df, tableName, partitionColumns)
147+
}
148+
75149
def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table")
76150

77151
def fileFormatString(format: String): String = ""

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,9 @@
11
package ai.chronon.integrations.cloud_gcp
2-
import ai.chronon.spark.format.Format
3-
import ai.chronon.spark.format.FormatProvider
2+
import ai.chronon.api.Extensions.StringOps
3+
import ai.chronon.spark.TableUtils
4+
import ai.chronon.spark.format.{Format, FormatProvider}
45
import com.google.cloud.bigquery.connector.common.BigQueryUtil
5-
import com.google.cloud.spark.bigquery.SparkBigQueryConfig
6-
import com.google.cloud.spark.bigquery.SparkBigQueryConfig.IntermediateFormat
7-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQuery
8-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions
9-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.ExternalTableDefinition
10-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.FormatOptions
11-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.JobInfo
12-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.StandardTableDefinition
13-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Table
14-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableDefinition
15-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId
6+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery._
167
import org.apache.spark.sql.SparkSession
178

189
import scala.jdk.CollectionConverters._
@@ -38,24 +29,14 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
3829
}
3930
.getOrElse(tableName)
4031

41-
override def readFormat(tableName: String): Option[Format] = format(tableName)
32+
override def readFormat(tableName: String): scala.Option[Format] = format(tableName)
4233

4334
override def writeFormat(table: String): Format = {
44-
val tableId = BigQueryUtil.parseTableId(table)
45-
assert(scala.Option(tableId.getProject).isDefined, s"project required for ${table}")
46-
assert(scala.Option(tableId.getDataset).isDefined, s"dataset required for ${table}")
35+
val writePrefix = TableUtils(sparkSession).writePrefix
36+
require(writePrefix.nonEmpty, "Please set conf 'spark.chronon.table_write.prefix' pointing to a data bucket.")
4737

48-
val sparkOptions: Map[String, String] = Map(
49-
"temporaryGcsBucket" -> sparkSession.conf.get("spark.chronon.table.gcs.temporary_gcs_bucket"),
50-
"writeMethod" -> "indirect",
51-
SparkBigQueryConfig.INTERMEDIATE_FORMAT_OPTION -> IntermediateFormat.PARQUET.getDataSource,
52-
SparkBigQueryConfig.ENABLE_LIST_INFERENCE -> true.toString,
53-
"materializationProject" -> tableId.getProject,
54-
"materializationDataset" -> tableId.getDataset,
55-
"createDisposition" -> JobInfo.CreateDisposition.CREATE_NEVER.name
56-
)
57-
58-
BigQueryFormat(tableId.getProject, bigQueryClient, sparkOptions)
38+
val path = writePrefix.get + table.sanitize //split("/").map(_.sanitize).mkString("/")
39+
GCS(path, "PARQUET")
5940
}
6041

6142
private[cloud_gcp] def getFormat(table: Table): Format =
@@ -65,7 +46,8 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
6546
val formatOptions = definition.getFormatOptions
6647
.asInstanceOf[FormatOptions]
6748
val externalTable = table.getDefinition.asInstanceOf[ExternalTableDefinition]
68-
val uri = Option(externalTable.getHivePartitioningOptions)
49+
val uri = scala
50+
.Option(externalTable.getHivePartitioningOptions)
6951
.map(_.getSourceUriPrefix)
7052
.getOrElse {
7153
val uris = externalTable.getSourceUris.asScala
@@ -81,10 +63,10 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
8163
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
8264
}
8365

84-
private def format(tableName: String): Option[Format] = {
66+
private def format(tableName: String): scala.Option[Format] = {
8567

8668
val btTableIdentifier: TableId = BigQueryUtil.parseTableId(tableName)
87-
val table = Option(bigQueryClient.getTable(btTableIdentifier.getDataset, btTableIdentifier.getTable))
69+
val table = scala.Option(bigQueryClient.getTable(btTableIdentifier.getDataset, btTableIdentifier.getTable))
8870
table
8971
.map(getFormat)
9072

spark/src/main/scala/ai/chronon/spark/Extensions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ object Extensions {
326326
case "hive" | "delta" | "iceberg" =>
327327
optionDfw
328328
.format(normalized)
329+
.partitionBy(null: _*)
329330
.insertInto(dataPointer.tableOrPath)
330331
case _ =>
331332
throw new UnsupportedOperationException(s"Unsupported write catalog: ${normalized}")
@@ -335,6 +336,7 @@ object Extensions {
335336
// None case is just table against default catalog
336337
optionDfw
337338
.format("hive")
339+
.partitionBy(null: _*)
338340
.insertInto(dataPointer.tableOrPath))
339341
}
340342
}

0 commit comments

Comments
 (0)