Skip to content

Commit e5243d5

Browse files
fix: properly detect bigquery catalog (#629)
## Summary - Now that we have configurable catalogs, we should rely on that to determine whether we're interacting with BigQuery. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Enhanced cloud integration logic to accurately determine data table catalogs, ensuring more reliable data format retrieval. - Improved error handling, providing clearer feedbaour clients when encountering issues with data format detection. - Introduced a new method for converting table names into identifiers, enhancing catalog detection functionality. - Added a method for retrieving the catalog name based on table names, improving the overall functionality. - **Bug Fixes** - Corrected a typo in the error message for the `NoSuchTableException`. - **Tests** - Added a test case to verify the functionality of catalog detection in the `GcpFormatProvider` class, increasing test coverage. - Introduced a new test case for validating catalog detection based on various input strings in the `TableUtilsTest` class. - Enhanced test coverage for format detection scenarios in the `BigQueryCatalogTest` class. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 785ff7d commit e5243d5

File tree

7 files changed

+122
-50
lines changed

7 files changed

+122
-50
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
2121
import org.apache.spark.sql.types.StructType
2222
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2323
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
24+
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
2425

2526
import java.util
2627
import scala.jdk.CollectionConverters._
@@ -89,6 +90,8 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
8990
private var catalogName: String =
9091
null // This corresponds to `spark_catalog in `spark.sql.catalog.spark_catalog`. This is necessary for spark to correctly choose which implementation to use.
9192

93+
private var catalogProps: Map[String, String] = Map.empty[String, String]
94+
9295
override def listNamespaces: Array[Array[String]] = icebergCatalog.listNamespaces()
9396

9497
override def listNamespaces(namespace: Array[String]): Array[Array[String]] = icebergCatalog.listNamespaces(namespace)
@@ -114,13 +117,27 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
114117
override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace)
115118

116119
override def loadTable(rawIdent: Identifier): Table = {
117-
val ident = Identifier.of(rawIdent.namespace.flatMap(_.split("\\.")), rawIdent.name)
118-
Try { icebergCatalog.loadTable(ident) }
120+
// Remove the catalog segment. We've already consumed it, now it's time to figure out the namespace.
121+
val identNoCatalog = Identifier.of(
122+
rawIdent.namespace.flatMap(_.split("\\.")).toList match {
123+
case catalog :: namespace :: Nil => Array(namespace)
124+
case namespace :: Nil => Array(namespace)
125+
},
126+
rawIdent.name
127+
)
128+
Try {
129+
val icebergSparkTable = icebergCatalog.loadTable(identNoCatalog)
130+
DelegatingTable(icebergSparkTable,
131+
additionalProperties =
132+
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "ICEBERG"))
133+
}
119134
.recover {
120135
case _ => {
121-
val tId = ident.namespace().toList match {
122-
case database :: Nil => TableId.of(database, ident.name())
123-
case project :: database :: Nil => TableId.of(project, database, ident.name())
136+
val project =
137+
catalogProps.getOrElse(BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_PROJECT, bqOptions.getProjectId)
138+
val tId = identNoCatalog.namespace().toList match {
139+
case database :: Nil => TableId.of(project, database, identNoCatalog.name())
140+
case catalog :: database :: Nil => TableId.of(project, database, identNoCatalog.name())
124141
case Nil =>
125142
throw new IllegalArgumentException(s"Table identifier namespace ${rawIdent} must have at least one part.")
126143
}
@@ -143,7 +160,9 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
143160
None,
144161
classOf[ParquetFileFormat])
145162
DelegatingTable(fileBasedTable,
146-
Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri))
163+
Map(TableCatalog.PROP_EXTERNAL -> "true",
164+
TableCatalog.PROP_LOCATION -> uri,
165+
TableCatalog.PROP_PROVIDER -> "PARQUET"))
147166
}
148167
case _: StandardTableDefinition => {
149168
//todo(tchow): Support partitioning
@@ -153,13 +172,14 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
153172
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
154173
// ideally it should be the below:
155174
// val connectorTable = connectorCatalog.loadTable(ident)
156-
DelegatingTable(connectorTable, Map(TableCatalog.PROP_EXTERNAL -> "false"))
175+
DelegatingTable(connectorTable,
176+
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "BIGQUERY"))
157177
}
158178
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
159179
}
160180
}
161181
}
162-
.getOrElse(throw new NoSuchTableException(f"Tgable: ${ident} not found in bigquery catalog."))
182+
.getOrElse(throw new NoSuchTableException(f"Table: ${identNoCatalog} not found in bigquery catalog."))
163183
}
164184

165185
override def createTable(ident: Identifier,
@@ -187,6 +207,7 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
187207
icebergCatalog.initialize(name, options)
188208
connectorCatalog.initialize(name, options)
189209
catalogName = name
210+
catalogProps = options.asCaseSensitiveMap.asScala.toMap
190211
}
191212

192213
override def name(): String = catalogName
Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
paour clientsage ai.chronon.integrations.cloud_gcp
22
import ai.chronon.spark.format.{DefaultFormatProvider, Format, Iceberg}
33
import com.google.cloud.bigquery._
4-
import com.google.cloud.iceberg.bigquery.relocated.com.google.api.services.bigquery.model.TableReference
5-
import org.apache.iceberg.exceptions.NoSuchIcebergTableException
6-
import org.apache.iceberg.gcp.bigquery.{BigQueryClient, BigQueryClientImpl}
4+
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
5+
import org.apache.iceberg.spark.SparkCatalog
76
import org.apache.spark.sql.SparkSession
7+
import org.apache.spark.sql.connector.catalog.TableCatalog
88

99
import scala.jdk.CollectionConverters._
10-
import scala.util.{Failure, Success, Try}
10+
import scala.util.Try
1111

1212
class GcpFormatProvider(override val sparkSession: SparkSession) extends DefaultFormatProvider(sparkSession) {
1313

@@ -18,46 +18,29 @@ class GcpFormatProvider(override val sparkSession: SparkSession) extends Default
1818
* - Active project in the gcloud CLI configuration.
1919
* - No default project: An error will occur if no project ID is available.
2020
*/
21-
private lazy val bqOptions = BigQueryOptions.getDefaultInstance
22-
private lazy val bigQueryClient: BigQuery = bqOptions.getService
23-
private lazy val icebergClient: BigQueryClient = new BigQueryClientImpl()
2421

2522
override def readFormat(tableName: String): scala.Option[Format] = {
26-
logger.info(s"Retrieving read format for table: ${tableName}")
27-
28-
// order is important here. we want the Hive case where we just cheour clients for table in catalog to be last
29-
Try {
30-
val btTableIdentifier = SparkBQUtils.toTableId(tableName)(sparkSession)
31-
val bqTable = bigQueryClient.getTable(btTableIdentifier)
32-
getFormat(bqTable)
33-
} match {
34-
case Success(format) => scala.Option(format)
35-
case Failure(e) =>
36-
logger.info(s"${tableName} is not a BigQuery table")
37-
super.readFormat(tableName)
38-
}
39-
}
40-
41-
private[cloud_gcp] def getFormat(table: Table): Format = {
42-
table.getDefinition.asInstanceOf[TableDefinition] match {
43-
case _: ExternalTableDefinition =>
23+
val parsedCatalog = getCatalog(tableName)
24+
val identifier = SparkBQUtils.toIdentifier(tableName)(sparkSession)
25+
val cat = sparkSession.sessionState.catalogManager.catalog(parsedCatalog)
26+
cat match {
27+
case delegating: DelegatingBigQueryMetastoreCatalog =>
4428
Try {
45-
val tableRef = new TableReference()
46-
.setProjectId(table.getTableId.getProject)
47-
.setDatasetId(table.getTableId.getDataset)
48-
.setTableId(table.getTableId.getTable)
49-
50-
icebergClient.getTable(tableRef) // Just try to load it. It'll fail if it's not an iceberg table.
51-
Iceberg
52-
}.recover {
53-
case _: NoSuchIcebergTableException => BigQueryExternal
54-
case e: Exception => throw e
55-
}.get
56-
57-
case _: StandardTableDefinition => BigQueryNative
58-
59-
case _ =>
60-
throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
29+
delegating
30+
.loadTable(identifier)
31+
.properties
32+
.asScala
33+
.getOrElse(TableCatalog.PROP_PROVIDER, "")
34+
.toUpperCase match {
35+
case "ICEBERG" => Iceberg
36+
case "BIGQUERY" => BigQueryNative
37+
case "PARQUET" => BigQueryExternal
38+
case unsupported => throw new IllegalStateException(s"Unsupported provider type: ${unsupported}")
39+
}
40+
}.toOption
41+
case iceberg: SparkCatalog if (iceberg.icebergCatalog().isInstanceOf[BigQueryMetastoreCatalog]) =>
42+
scala.Option(Iceberg)
43+
case _ => super.readFormat(tableName)
6144
}
6245
}
6346
}

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/SparkBQUtils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ paour clientsage ai.chronon.integrations.cloud_gcp
22
import com.google.cloud.bigquery.connector.common.BigQueryUtil
33
import org.apache.spark.sql.SparkSession
44
import com.google.cloud.bigquery.TableId
5+
import org.apache.spark.sql.connector.catalog.Identifier
56

67
object SparkBQUtils {
78

@@ -14,4 +15,10 @@ object SparkBQUtils {
1415
.getOrElse(TableId.of(shadedTid.getDataset, shadedTid.getTable))
1516
}
1617

18+
def toIdentifier(tableName: String)(implicit spark: SparkSession): Identifier = {
19+
val parseIdentifier = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName).reverse
20+
Identifier.of(parseIdentifier.tail.reverse.toArray, parseIdentifier.head)
21+
22+
}
23+
1724
}

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import com.google.cloud.hadoop.fs.gcs.{
1010
GoogleHadoopFileSystemConfiguration,
1111
HadoopConfigurationProperty
1212
}
13+
import ai.chronon.spark.format.Iceberg
14+
1315
import com.google.cloud.spark.bigquery.SparkBigQueryUtil
1416
import org.apache.iceberg.gcp.bigquery.{BigQueryMetastoreCatalog => BQMSCatalog}
1517
import org.apache.iceberg.gcp.gcs.GCSFileIO
@@ -113,6 +115,37 @@ class BigQueryCatalogTest extends AnyFlatSpec with Moour clientsitoSugar {
113115
println(allParts)
114116
}
115117

118+
it should "integration testing formats" ignore {
119+
val externalTable = "default_iceberg.data.cheour clientsouts_parquet"
120+
val externalFormat = FormatProvider.from(spark).readFormat(externalTable)
121+
assertEquals(Some(BigQueryExternal), externalFormat)
122+
123+
val externalTableNoCat = "data.cheour clientsouts_parquet"
124+
val externalFormatNoCat = FormatProvider.from(spark).readFormat(externalTableNoCat)
125+
assertEquals(Some(BigQueryExternal), externalFormatNoCat)
126+
127+
val nativeTable = "default_iceberg.data.cheour clientsouts_native"
128+
val nativeFormat = FormatProvider.from(spark).readFormat(nativeTable)
129+
assertEquals(Some(BigQueryNative), nativeFormat)
130+
131+
val nativeTableNoCat = "data.cheour clientsouts_native"
132+
val nativeFormatNoCat = FormatProvider.from(spark).readFormat(nativeTableNoCat)
133+
assertEquals(Some(BigQueryNative), nativeFormatNoCat)
134+
135+
val icebergTable = "default_iceberg.data.quiour clientsstart_purchases_davidhan_v1_dev_davidhan"
136+
val icebergFormat = FormatProvider.from(spark).readFormat(icebergTable)
137+
assertEquals(Some(Iceberg), icebergFormat)
138+
139+
val icebergTableNoCat = "data.quiour clientsstart_purchases_davidhan_v1_dev_davidhan"
140+
val icebergFormatNoCat = FormatProvider.from(spark).readFormat(icebergTableNoCat)
141+
assertEquals(Some(Iceberg), icebergFormatNoCat)
142+
143+
val dneTable = "default_iceberg.data.dne"
144+
val dneFormat = FormatProvider.from(spark).readFormat(dneTable)
145+
assertTrue(dneFormat.isEmpty)
146+
}
147+
148+
116149
it should "integration testing bigquery partitions" ignore {
117150
// TODO(tchow): This test is ignored because it requires a running instance of the bigquery. Need to figure out stubbing locally.
118151
// to run, set `GOOGLE_APPLICATION_CREDENTIALS=<path_to_application_default_credentials.json>

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProviderTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ class GcpFormatProviderTest extends AnyFlatSpec with Moour clientsitoSugar {
3333
.build())
3434
when(moour clientsTable.getTableId).thenReturn(TableId.of("project", "dataset", "table"))
3535

36-
val gcsFormat = gcpFormatProvider.getFormat(moour clientsTable)
36+
val gcsFormat = gcpFormatProvider.readFormat(tableName)
3737
}
3838
}

spark/src/main/scala/ai/chronon/spark/format/DefaultFormatProvider.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,18 @@ class DefaultFormatProvider(val sparkSession: SparkSession) extends FormatProvid
2222
} else { null })
2323
}
2424

25+
def getCatalog(tableName: String): String = {
26+
logger.info(s"Retrieving read format for table: ${tableName}")
27+
val parsed = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
28+
val parsedCatalog = parsed.toList match {
29+
case catalog :: namespace :: tableName :: Nil => catalog
30+
case namespace :: tableName :: Nil => sparkSession.catalog.currentCatalog()
31+
case tableName :: Nil => sparkSession.catalog.currentCatalog()
32+
case _ => throw new IllegalStateException(s"Invalid table naming convention specified: ${tableName}")
33+
}
34+
parsedCatalog
35+
}
36+
2537
private def isIcebergTable(tableName: String): Boolean =
2638
Try {
2739
sparkSession.read.format("iceberg").load(tableName)

spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import org.apache.spark.sql.functions.col
2525
import org.junit.Assert.{assertEquals, assertTrue}
2626
import org.scalatest.flatspec.AnyFlatSpec
2727

28+
import ai.chronon.spark.format.FormatProvider
29+
import ai.chronon.spark.format.DefaultFormatProvider
30+
import org.apache.spark.sql.catalyst.parser.ParseException
31+
2832
import scala.util.Try
2933

3034
case class TestRecord(ds: String, id: String)
@@ -36,6 +40,7 @@ class SimpleAddUDF extends UDF {
3640
}
3741

3842
class TableUtilsTest extends AnyFlatSpec {
43+
3944
lazy val spark: SparkSession = SparkSessionBuilder.build("TableUtilsTest", local = true)
4045
private val tableUtils = TableTestUtils(spark)
4146
private implicit val partitionSpec: PartitionSpec = tableUtils.partitionSpec
@@ -639,4 +644,15 @@ class TableUtilsTest extends AnyFlatSpec {
639644
}
640645
}
641646

647+
it should "test catalog detection" in {
648+
val fp = FormatProvider.from(spark).asInstanceOf[DefaultFormatProvider]
649+
assertEquals("catalogA", fp.getCatalog("catalogA.foo.bar"))
650+
assertEquals("catalogA", fp.getCatalog("`catalogA`.foo.bar"))
651+
assertEquals("spark_catalog", fp.getCatalog("`catalogA.foo`.bar"))
652+
assertEquals("spark_catalog", fp.getCatalog("`catalogA.foo.bar`"))
653+
assertEquals("spark_catalog", fp.getCatalog("foo.bar"))
654+
assertEquals("spark_catalog", fp.getCatalog("bar"))
655+
assertThrows[ParseException](fp.getCatalog(""))
656+
}
657+
642658
}

0 commit comments

Comments
 (0)