Skip to content

Commit 31c78af

Browse files
feat: use direct writes to bigquery (#264)
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 7697215 commit 31c78af

File tree

2 files changed

+18
-29
lines changed

2 files changed

+18
-29
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
package ai.chronon.integrations.cloud_gcp
2-
3-
import ai.chronon.spark.TableUtils
42
import ai.chronon.spark.format.Format
53
import ai.chronon.spark.format.FormatProvider
64
import ai.chronon.spark.format.Hive
@@ -36,17 +34,10 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
3634
assert(scala.Option(tableId.getProject).isDefined, s"project required for ${table}")
3735
assert(scala.Option(tableId.getDataset).isDefined, s"dataset required for ${table}")
3836

39-
val tu = TableUtils(sparkSession)
40-
val partitionColumnOption =
41-
if (tu.tableReachable(table)) Map.empty else Map("partitionField" -> tu.partitionColumn)
42-
4337
val sparkOptions: Map[String, String] = Map(
44-
// todo(tchow): No longer needed after https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1320
45-
"temporaryGcsBucket" -> sparkSession.conf.get("spark.chronon.table.gcs.temporary_gcs_bucket"),
46-
"writeMethod" -> "indirect", // writeMethod direct does not output partitioned tables. keep as indirect.
47-
"materializationProject" -> tableId.getProject,
48-
"materializationDataset" -> tableId.getDataset
49-
) ++ partitionColumnOption
38+
"writeMethod" -> "direct",
39+
"createDisposition" -> JobInfo.CreateDisposition.CREATE_NEVER.name
40+
)
5041

5142
BigQueryFormat(tableId.getProject, bigQueryClient, sparkOptions)
5243
}

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -284,12 +284,12 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
284284
tableProperties: Map[String, String] = null,
285285
fileFormat: String,
286286
autoExpand: Boolean = false): Unit = {
287+
val writeFormat = tableFormatProvider.writeFormat(tableName)
287288

288289
if (!tableReachable(tableName)) {
289290

290291
try {
291292

292-
val writeFormat = tableFormatProvider.writeFormat(tableName)
293293
val createTableOperation =
294294
writeFormat.createTable(df, tableName, partitionColumns, tableProperties, fileFormat)
295295

@@ -309,11 +309,13 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
309309

310310
// TODO: we need to also allow for bigquery tables to have their table properties (or tags) to be persisted too.
311311
// https://app.asana.com/0/1208949807589885/1209111629687568/f
312-
if (tableProperties != null && tableProperties.nonEmpty) {
313-
sql(alterTablePropertiesSql(tableName, tableProperties))
314-
}
315-
if (autoExpand) {
316-
expandTable(tableName, df.schema)
312+
if (writeFormat.name.toUpperCase != "BIGQUERY") {
313+
if (tableProperties != null && tableProperties.nonEmpty) {
314+
sql(alterTablePropertiesSql(tableName, tableProperties))
315+
}
316+
if (autoExpand) {
317+
expandTable(tableName, df.schema)
318+
}
317319
}
318320
}
319321

@@ -328,12 +330,13 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
328330
stats: Option[DfStats] = None,
329331
sortByCols: Seq[String] = Seq.empty): Unit = {
330332
// partitions to the last
331-
val dfRearranged: DataFrame = if (!df.columns.endsWith(partitionColumns)) {
332-
val colOrder = df.columns.diff(partitionColumns) ++ partitionColumns
333-
df.select(colOrder.map(df.col): _*)
334-
} else {
335-
df
336-
}
333+
val dataPointer = DataPointer.from(tableName, sparkSession)
334+
val colOrder = df.columns.diff(partitionColumns) ++ partitionColumns
335+
val dfRearranged: DataFrame = df.select(colOrder.map {
336+
case c if c == partitionColumn && dataPointer.writeFormat.map(_.toUpperCase).exists("BIGQUERY".equals) =>
337+
to_date(df.col(c), partitionFormat).as(partitionColumn)
338+
case c => df.col(c)
339+
}: _*)
337340

338341
createTable(dfRearranged, tableName, partitionColumns, tableProperties, fileFormat, autoExpand)
339342

@@ -526,11 +529,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
526529
val dataPointer = DataPointer.from(tableName, sparkSession)
527530

528531
saltedDf
529-
.select(saltedDf.columns.map {
530-
case c if c == partitionColumn && dataPointer.writeFormat.map(_.toUpperCase).exists("BIGQUERY".equals) =>
531-
to_date(saltedDf.col(c), partitionFormat).as(partitionColumn)
532-
case c => saltedDf.col(c)
533-
}.toList: _*)
534532
.repartition(shuffleParallelism, repartitionCols.map(saltedDf.col): _*)
535533
.drop(saltCol)
536534
.sortWithinPartitions(partitionSortCols.map(col): _*)

0 commit comments

Comments
 (0)