Skip to content

Commit 6d4c5bd

Browse files
direct write and rebase
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
1 parent 76466ad commit 6d4c5bd

File tree

3 files changed

+26
-37
lines changed

3 files changed

+26
-37
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package ai.chronon.integrations.cloud_gcp
2-
3-
import ai.chronon.spark.TableUtils
42
import ai.chronon.spark.format.Format
53
import ai.chronon.spark.format.FormatProvider
64
import ai.chronon.spark.format.Hive
7-
import com.google.cloud.bigquery.BigQuery
8-
import com.google.cloud.bigquery.BigQueryOptions
9-
import com.google.cloud.bigquery.ExternalTableDefinition
10-
import com.google.cloud.bigquery.FormatOptions
11-
import com.google.cloud.bigquery.StandardTableDefinition
12-
import com.google.cloud.bigquery.Table
13-
import com.google.cloud.bigquery.TableDefinition
145
import com.google.cloud.bigquery.connector.common.BigQueryUtil
6+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQuery
7+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions
8+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.ExternalTableDefinition
9+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.FormatOptions
10+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.JobInfo
11+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.StandardTableDefinition
12+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Table
13+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableDefinition
1514
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId
1615
import org.apache.spark.sql.SparkSession
1716

@@ -43,17 +42,10 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
4342
assert(scala.Option(tableId.getProject).isDefined, s"project required for ${table}")
4443
assert(scala.Option(tableId.getDataset).isDefined, s"dataset required for ${table}")
4544

46-
val tu = TableUtils(sparkSession)
47-
val partitionColumnOption =
48-
if (tu.tableReachable(table)) Map.empty else Map("partitionField" -> tu.partitionColumn)
49-
5045
val sparkOptions: Map[String, String] = Map(
51-
// todo(tchow): No longer needed after https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1320
52-
"temporaryGcsBucket" -> sparkSession.conf.get("spark.chronon.table.gcs.temporary_gcs_bucket"),
53-
"writeMethod" -> "indirect", // writeMethod direct does not output partitioned tables. keep as indirect.
54-
"materializationProject" -> tableId.getProject,
55-
"materializationDataset" -> tableId.getDataset
56-
) ++ partitionColumnOption
46+
"writeMethod" -> "direct",
47+
"createDisposition" -> JobInfo.CreateDisposition.CREATE_NEVER.name
48+
)
5749

5850
BigQueryFormat(tableId.getProject, bigQueryClient, sparkOptions)
5951
}

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProviderTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package ai.chronon.integrations.cloud_gcp
22

33
import ai.chronon.spark.SparkSessionBuilder
4-
import com.google.cloud.bigquery._
4+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery._
55
import org.apache.spark.sql.SparkSession
66
import org.mockito.Mockito.when
77
import org.scalatest.flatspec.AnyFlatSpec

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -284,12 +284,12 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
284284
tableProperties: Map[String, String] = null,
285285
fileFormat: String,
286286
autoExpand: Boolean = false): Unit = {
287+
val writeFormat = tableFormatProvider.writeFormat(tableName)
287288

288289
if (!tableReachable(tableName)) {
289290

290291
try {
291292

292-
val writeFormat = tableFormatProvider.writeFormat(tableName)
293293
val createTableOperation =
294294
writeFormat.createTable(df, tableName, partitionColumns, tableProperties, fileFormat)
295295

@@ -309,11 +309,13 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
309309

310310
// TODO: we need to also allow for bigquery tables to have their table properties (or tags) to be persisted too.
311311
// https://app.asana.com/0/1208949807589885/1209111629687568/f
312-
if (tableProperties != null && tableProperties.nonEmpty) {
313-
sql(alterTablePropertiesSql(tableName, tableProperties))
314-
}
315-
if (autoExpand) {
316-
expandTable(tableName, df.schema)
312+
if (writeFormat.name.toUpperCase != "BIGQUERY") {
313+
if (tableProperties != null && tableProperties.nonEmpty) {
314+
sql(alterTablePropertiesSql(tableName, tableProperties))
315+
}
316+
if (autoExpand) {
317+
expandTable(tableName, df.schema)
318+
}
317319
}
318320
}
319321

@@ -328,12 +330,12 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
328330
stats: Option[DfStats] = None,
329331
sortByCols: Seq[String] = Seq.empty): Unit = {
330332
// partitions to the last
331-
val dfRearranged: DataFrame = if (!df.columns.endsWith(partitionColumns)) {
332-
val colOrder = df.columns.diff(partitionColumns) ++ partitionColumns
333-
df.select(colOrder.map(df.col): _*)
334-
} else {
335-
df
336-
}
333+
val colOrder = df.columns.diff(partitionColumns) ++ partitionColumns
334+
val dfRearranged: DataFrame = df.select(colOrder.map {
335+
case c if c == partitionColumn =>
336+
to_date(df.col(c), partitionFormat).as(partitionColumn)
337+
case c => df.col(c)
338+
}: _*)
337339

338340
createTable(dfRearranged, tableName, partitionColumns, tableProperties, fileFormat, autoExpand)
339341

@@ -526,11 +528,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
526528
val dataPointer = DataPointer.from(tableName, sparkSession)
527529

528530
saltedDf
529-
.select(saltedDf.columns.map {
530-
case c if c == partitionColumn && dataPointer.writeFormat.map(_.toUpperCase).exists("BIGQUERY".equals) =>
531-
to_date(saltedDf.col(c), partitionFormat).as(partitionColumn)
532-
case c => saltedDf.col(c)
533-
}.toList: _*)
534531
.repartition(shuffleParallelism, repartitionCols.map(saltedDf.col): _*)
535532
.drop(saltCol)
536533
.sortWithinPartitions(partitionSortCols.map(col): _*)

0 commit comments

Comments
 (0)