Skip to content

Commit 9eb680d

Browse files
feat: BigQuery views (#698)
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for handling BigQuery views when loading tables, improving compatibility with a wider range of BigQuery table types. - **Bug Fixes** - Updated internal handling of partition column aliases to ensure accurate retrieval of partition data from BigQuery tables. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent df7673a commit 9eb680d

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ case object BigQueryNative extends Format {
1212
private val bqFormat = classOf[Spark35BigQueryTableProvider].getName
1313
private lazy val bqOptions = BigQueryOptions.getDefaultInstance
1414

15-
private val internalBQCol = "__chronon_internal_bq_col__"
15+
private val internalBQPartitionCol = "__chronon_internal_bq_partition_col__"
1616

1717
// TODO(tchow): use the cache flag
1818
override def table(tableName: String, partitionFilters: String, cacheDf: Boolean = false)(implicit
@@ -50,13 +50,14 @@ case object BigQueryNative extends Format {
5050
// Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter.
5151
val partitionWheres = if (partitionFilters.nonEmpty) s"WHERE ${partitionFilters}" else partitionFilters
5252
val partitionFormat = TableUtils(sparkSession).partitionFormat
53-
val select = s"SELECT distinct(${partColName}) AS ${internalBQCol} FROM ${bqFriendlyName} ${partitionWheres}"
53+
val select =
54+
s"SELECT distinct(${partColName}) AS ${internalBQPartitionCol} FROM ${bqFriendlyName} ${partitionWheres}"
5455
val selectedParts = sparkSession.read
5556
.format(bqFormat)
5657
.option("viewsEnabled", true)
5758
.option("materializationDataset", bqTableId.getDataset)
5859
.load(select)
59-
.select(date_format(col(internalBQCol), partitionFormat))
60+
.select(date_format(col(internalBQPartitionCol), partitionFormat))
6061
.as[String]
6162
.collect
6263
.toList

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.google.cloud.bigquery.{
55
BigQueryOptions,
66
ExternalTableDefinition,
77
StandardTableDefinition,
8+
ViewDefinition,
89
TableDefinition,
910
TableId
1011
}
@@ -95,6 +96,9 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
9596
.Option(bigQueryClient.getTable(tId))
9697
.getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found."))
9798
table.getDefinition.asInstanceOf[TableDefinition] match {
99+
case view: ViewDefinition => {
100+
connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
101+
}
98102
case externalTable: ExternalTableDefinition => {
99103
val uris = externalTable.getSourceUris.asScala
100104
val uri = scala

0 commit comments

Comments
 (0)