Skip to content

Commit cbacae3

Browse files
feat: do partition filtering on bq native tables by union individual partitions (#690)
## Summary - Getting a 403 querying for a range of partitions in bigquery native tables: ``` Response too large to return. Consider specifying a destination table in your job configuration ``` - instead, let's just query individual partitions of data as separate dataframes and union them together. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Improved handling of BigQuery partitioned tables, ensuring more accurate partition filtering and data retrieval. - **Refactor** - Streamlined the process for reading partitioned data from BigQuery, resulting in a clearer and more consistent approach for users working with partitioned tables. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 0e3c2e4 commit cbacae3

File tree

1 file changed

+31
-26
lines changed

1 file changed

+31
-26
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import ai.chronon.spark.catalog.Format
55
import com.google.cloud.bigquery.BigQueryOptions
66
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTableProvider
77
import org.apache.spark.sql.{DataFrame, SparkSession}
8-
import org.apache.spark.sql.functions.{col, date_format, to_date}
8+
import org.apache.spark.sql.functions.{col, date_format, to_date, lit}
99

1010
case object BigQueryNative extends Format {
1111

@@ -16,56 +16,61 @@ case object BigQueryNative extends Format {
1616

1717
override def table(tableName: String, partitionFilters: String)(implicit sparkSession: SparkSession): DataFrame = {
1818
import sparkSession.implicits._
19+
20+
// First, need to clean the spark-based table name for the bigquery queries below.
1921
val bqTableId = SparkBQUtils.toTableId(tableName)
2022
val providedProject = scala.Option(bqTableId.getProject).getOrElse(bqOptions.getProjectId)
2123
val bqFriendlyName = f"${providedProject}.${bqTableId.getDataset}.${bqTableId.getTable}"
2224

25+
// Then, we query the BQ information schema to grab the table's partition column.
2326
val partColsSql =
2427
s"""
25-
|SELECT column_name, IS_SYSTEM_DEFINED FROM `${providedProject}.${bqTableId.getDataset}.INFORMATION_SCHEMA.COLUMNS`
28+
|SELECT column_name FROM `${providedProject}.${bqTableId.getDataset}.INFORMATION_SCHEMA.COLUMNS`
2629
|WHERE table_name = '${bqTableId.getTable}' AND is_partitioning_column = 'YES'
2730
|
2831
|""".stripMargin
2932

30-
val (partColName, systemDefined) = sparkSession.read
33+
val partColName = sparkSession.read
3134
.format(bqFormat)
3235
.option("project", providedProject)
3336
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
3437
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations
3538
.option("viewsEnabled", true)
3639
.option("materializationDataset", bqTableId.getDataset)
3740
.load(partColsSql)
38-
.as[(String, String)]
41+
.as[String]
3942
.collect
4043
.headOption
41-
.getOrElse(throw new UnsupportedOperationException(s"No partition column for table ${tableName} found."))
42-
43-
val isPseudoColumn = systemDefined match {
44-
case "YES" => true
45-
case "NO" => false
46-
case _ => throw new IllegalArgumentException(s"Unknown partition column system definition: ${systemDefined}")
47-
}
48-
49-
logger.info(
50-
s"Found bigquery partition column: ${partColName} with system defined status: ${systemDefined} for table: ${tableName}")
44+
.getOrElse(
45+
throw new UnsupportedOperationException(s"No partition column for table ${tableName} found.")
46+
) // TODO: support unpartitioned tables (uncommon case).
5147

48+
// Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter.
5249
val partitionWheres = if (partitionFilters.nonEmpty) s"WHERE ${partitionFilters}" else partitionFilters
5350
val partitionFormat = TableUtils(sparkSession).partitionFormat
54-
val dfw = sparkSession.read
51+
val select = s"SELECT distinct(${partColName}) AS ${internalBQCol} FROM ${bqFriendlyName} ${partitionWheres}"
52+
val selectedParts = sparkSession.read
5553
.format(bqFormat)
5654
.option("viewsEnabled", true)
5755
.option("materializationDataset", bqTableId.getDataset)
58-
if (isPseudoColumn) {
59-
val select = s"SELECT ${partColName} AS ${internalBQCol}, * FROM ${bqFriendlyName} ${partitionWheres}"
60-
logger.info(s"BQ select: ${select}")
61-
dfw
62-
.load(select)
63-
.withColumn(partColName, date_format(col(internalBQCol), partitionFormat))
64-
.drop(internalBQCol)
65-
} else {
66-
dfw
67-
.load(s"SELECT * FROM ${bqFriendlyName} ${partitionWheres}")
68-
}
56+
.load(select)
57+
.select(date_format(col(internalBQCol), partitionFormat))
58+
.as[String]
59+
.collect
60+
.toList
61+
logger.info(s"Part values: ${selectedParts}")
62+
63+
// Finally, we query the BQ table for each of the selected partition values and union them together.
64+
selectedParts
65+
.map((partValue) => {
66+
val pFilter = f"${partColName} = '${partValue}'"
67+
sparkSession.read
68+
.format(bqFormat)
69+
.option("filter", pFilter)
70+
.load(bqFriendlyName)
71+
.withColumn(partColName, lit(partValue))
72+
}) // todo: make it nullable
73+
.reduce(_ unionByName _)
6974
}
7075

7176
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(

0 commit comments

Comments
 (0)