Skip to content

Commit bd30e04

Browse files
tchow-zlaithomaschow
authored andcommitted
feat: Make delegating BigQueryMetastore just a SparkCatalog (#520)
## Summary - Do not use the `DelegatingBigQueryMetastore` as a session catalog, just have it be a custom catalog. This will change the following configuration set From: ```bash spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-etsy/data/tables/" spark.sql.catalog.spark_catalog.gcp_location: "us" spark.sql.catalog.spark_catalog.gcp_project: "etsy-zipline-dev" spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.catalog.default_iceberg: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.default_iceberg.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.default_iceberg.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.catalog.default_iceberg.warehouse: "gs://zipline-warehouse-etsy/data/tables/" spark.sql.catalog.default_iceberg.gcp_location: "us" spark.sql.catalog.default_iceberg.gcp_project: "etsy-zipline-dev" spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.ChrononIcebergKryoRegistrator" ``` to: ```bash spark.sql.defaultCatalog: "default_iceberg" spark.sql.catalog.default_iceberg: "ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog" spark.sql.catalog.default_iceberg.catalog-impl: "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" spark.sql.catalog.default_iceberg.io-impl: "org.apache.iceberg.io.ResolvingFileIO" spark.sql.catalog.default_iceberg.warehouse: "gs://zipline-warehouse-etsy/data/tables/" spark.sql.catalog.default_iceberg.gcp_location: "us" spark.sql.catalog.default_iceberg.gcp_project: "etsy-zipline-dev" spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.ChrononIcebergKryoRegistrator" spark.sql.catalog.default_bigquery: "ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog" ``` ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Improved internal table processing by restructuring class integrations and enhancing error messaging when a table isn’t found. - **Tests** - Updated integration settings and adjusted reference parameters to ensure validations remain aligned with the new catalog implementation. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 18b9988 commit bd30e04

File tree

2 files changed

+5
-11
lines changed

2 files changed

+5
-11
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
2020
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
2121
import org.apache.spark.sql.types.StructType
2222
import org.apache.spark.sql.util.CaseInsensitiveStringMap
23+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2324

2425
import java.util
2526
import scala.jdk.CollectionConverters._
@@ -77,16 +78,14 @@ object DelegatingTable {
7778
* NOTE that this abstraction currently only supports querying tables that all belong to the same GCP project. Multi-project
7879
* support will depend on underlying libraries to support them.
7980
*/
80-
class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
81+
class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNamespaces with FunctionCatalog {
8182

8283
@transient private lazy val bqOptions = BigQueryOptions.getDefaultInstance
8384
@transient private lazy val bigQueryClient: BigQuery = bqOptions.getService
8485

8586
@transient private lazy val icebergCatalog: SparkCatalog = new SparkCatalog()
8687
@transient private lazy val connectorCatalog: BigQueryCatalog = new BigQueryCatalog()
8788

88-
// Some stupid spark settings.
89-
private var defaultSessionCatalog: CatalogPlugin = null
9089
private var catalogName: String =
9190
null // This corresponds to `spark_catalog in `spark.sql.catalog.spark_catalog`. This is necessary for spark to correctly choose which implementation to use.
9291

@@ -160,7 +159,7 @@ class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
160159
}
161160
}
162161
}
163-
.getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(rawIdent))
162+
.getOrElse(throw new NoSuchTableException(f"Tgable: ${ident} not found in bigquery catalog."))
164163
}
165164

166165
override def createTable(ident: Identifier,
@@ -192,12 +191,7 @@ class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
192191

193192
override def name(): String = catalogName
194193

195-
override def setDelegateCatalog(delegate: CatalogPlugin): Unit = {
196-
defaultSessionCatalog = delegate
197-
}
198-
199194
override def listFunctions(namespace: Array[String]): Array[Identifier] = icebergCatalog.listFunctions(namespace)
200195

201196
override def loadFunction(ident: Identifier): UnboundFunction = icebergCatalog.loadFunction(ident)
202-
203197
}

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
4141
"spark.sql.catalogImplementation" -> "in-memory",
4242

4343
// "spark.sql.defaultCatalog" -> "default_iceberg",
44-
// "spark.sql.catalog.default_iceberg" -> classOf[SparkCatalog].getName,
44+
// "spark.sql.catalog.default_iceberg" -> classOf[DelegatingBigQueryMetastoreCatalog].getName,
4545
// "spark.sql.catalog.default_iceberg.catalog-impl" -> classOf[BQMSCatalog].getName,
4646
// "spark.sql.catalog.default_iceberg.io-impl" -> classOf[ResolvingFileIO].getName,
4747
// "spark.sql.catalog.default_iceberg.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
@@ -95,7 +95,7 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
9595
}
9696

9797
it should "integration testing bigquery native table" ignore {
98-
val nativeTable = "data.sample_native"
98+
val nativeTable = "data.checkouts"
9999
val table = tableUtils.loadTable(nativeTable)
100100
table.show
101101
// val database = tableUtils.createDatabase("test_database")

0 commit comments

Comments
 (0)