@@ -5,7 +5,7 @@ import ai.chronon.spark.catalog.Format
5
5
import com .google .cloud .bigquery .BigQueryOptions
6
6
import com .google .cloud .spark .bigquery .v2 .Spark35BigQueryTableProvider
7
7
import org .apache .spark .sql .{DataFrame , SparkSession }
8
- import org .apache .spark .sql .functions .{col , date_format , to_date }
8
+ import org .apache .spark .sql .functions .{col , date_format , to_date , lit }
9
9
10
10
case object BigQueryNative extends Format {
11
11
@@ -16,56 +16,59 @@ case object BigQueryNative extends Format {
16
16
17
17
override def table (tableName : String , partitionFilters : String )(implicit sparkSession : SparkSession ): DataFrame = {
18
18
import sparkSession .implicits ._
19
+
20
+ // First, need to clean the spark-based table name for the bigquery queries below.
19
21
val bqTableId = SparkBQUtils .toTableId(tableName)
20
22
val providedProject = scala.Option (bqTableId.getProject).getOrElse(bqOptions.getProjectId)
21
23
val bqFriendlyName = f " ${providedProject}. ${bqTableId.getDataset}. ${bqTableId.getTable}"
22
24
25
+ // Then, we query the BQ information schema to grab the table's partition column.
23
26
val partColsSql =
24
27
s """
25
- |SELECT column_name, IS_SYSTEM_DEFINED FROM ` ${providedProject}. ${bqTableId.getDataset}.INFORMATION_SCHEMA.COLUMNS`
28
+ |SELECT column_name FROM ` ${providedProject}. ${bqTableId.getDataset}.INFORMATION_SCHEMA.COLUMNS`
26
29
|WHERE table_name = ' ${bqTableId.getTable}' AND is_partitioning_column = 'YES'
27
30
|
28
31
| """ .stripMargin
29
32
30
- val ( partColName, systemDefined) = sparkSession.read
33
+ val partColName = sparkSession.read
31
34
.format(bqFormat)
32
35
.option(" project" , providedProject)
33
36
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
34
37
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations
35
38
.option(" viewsEnabled" , true )
36
39
.option(" materializationDataset" , bqTableId.getDataset)
37
40
.load(partColsSql)
38
- .as[( String , String ) ]
41
+ .as[String ]
39
42
.collect
40
43
.headOption
41
- .getOrElse(throw new UnsupportedOperationException (s " No partition column for table ${tableName} found. " ))
42
-
43
- val isPseudoColumn = systemDefined match {
44
- case " YES" => true
45
- case " NO" => false
46
- case _ => throw new IllegalArgumentException (s " Unknown partition column system definition: ${systemDefined}" )
47
- }
48
-
49
- logger.info(
50
- s " Found bigquery partition column: ${partColName} with system defined status: ${systemDefined} for table: ${tableName}" )
44
+ .getOrElse(throw new UnsupportedOperationException (s " No partition column for table ${tableName} found. " )) // TODO: support unpartitioned tables (uncommon case).
51
45
46
+ // Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter.
52
47
val partitionWheres = if (partitionFilters.nonEmpty) s " WHERE ${partitionFilters}" else partitionFilters
53
48
val partitionFormat = TableUtils (sparkSession).partitionFormat
54
- val dfw = sparkSession.read
49
+ val select = s " SELECT distinct( ${partColName}) AS ${internalBQCol} FROM ${bqFriendlyName} ${partitionWheres}"
50
+ val selectedParts = sparkSession.read
55
51
.format(bqFormat)
56
52
.option(" viewsEnabled" , true )
57
53
.option(" materializationDataset" , bqTableId.getDataset)
58
- if (isPseudoColumn) {
59
- val select = s " SELECT ${partColName} AS ${internalBQCol}, * FROM ${bqFriendlyName} ${partitionWheres}"
60
- logger.info(s " BQ select: ${select}" )
61
- dfw
62
- .load(select)
63
- .withColumn(partColName, date_format(col(internalBQCol), partitionFormat))
64
- .drop(internalBQCol)
65
- } else {
66
- dfw
67
- .load(s " SELECT * FROM ${bqFriendlyName} ${partitionWheres}" )
68
- }
54
+ .load(select)
55
+ .select(date_format(col(internalBQCol), partitionFormat))
56
+ .as[String ]
57
+ .collect
58
+ .toList
59
+ logger.info(s " Part values: ${selectedParts}" )
60
+
61
+ // Finally, we query the BQ table for each of the selected partition values and union them together.
62
+ selectedParts
63
+ .map((partValue) => {
64
+ val pFilter = f " ${partColName} = ' ${partValue}' "
65
+ sparkSession.read
66
+ .format(bqFormat)
67
+ .option(" filter" , pFilter)
68
+ .load(bqFriendlyName)
69
+ .withColumn(partColName, lit(partValue))
70
+ }) // todo: make it nullable
71
+ .reduce(_ unionByName _)
69
72
}
70
73
71
74
override def primaryPartitions (tableName : String , partitionColumn : String , subPartitionsFilter : Map [String , String ])(
0 commit comments