Skip to content

Commit 40219d8

Browse files
format and tests
Co-authored-by: Thomas Chow <[email protected]>
1 parent 9a3e311 commit 40219d8

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
8282

8383
override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace)
8484

85-
override def loadTable(ident: Identifier): Table = {
85+
override def loadTable(rawIdent: Identifier): Table = {
86+
val ident = Identifier.of(rawIdent.namespace.flatMap(_.split("\\.")), rawIdent.name)
8687
Try { icebergCatalog.loadTable(ident) }
8788
.recover {
8889
case _ => {
@@ -113,7 +114,8 @@ class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
113114
Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri))
114115
}
115116
case _: StandardTableDefinition => {
116-
val connectorTable = connectorCatalog.loadTable(ident)
117+
// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
118+
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
117119
DelegatingTable(connectorTable, Map(TableCatalog.PROP_EXTERNAL -> "false"))
118120
}
119121
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
2828
Map(
2929
"spark.chronon.table.format_provider.class" -> classOf[GcpFormatProvider].getName,
3030
"hive.metastore.uris" -> "thrift://localhost:9083",
31-
"spark.chronon.partition.column" -> "c",
31+
"spark.chronon.partition.column" -> "ds",
3232
"spark.hadoop.fs.gs.impl" -> classOf[GoogleHadoopFileSystem].getName,
3333
"spark.hadoop.fs.AbstractFileSystem.gs.impl" -> classOf[GoogleHadoopFS].getName,
3434
"spark.hadoop.google.cloud.auth.service.account.enable" -> true.toString,
@@ -98,13 +98,14 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
9898

9999
it should "integration testing bigquery partitions" ignore {
100100
// TODO(tchow): This test is ignored because it requires a running instance of the bigquery. Need to figure out stubbing locally.
101-
val externalPartitions = tableUtils.partitions("data.checkouts_parquet")
102-
println(externalPartitions)
103-
val nativePartitions = tableUtils.partitions("data.sample_native")
104-
println(nativePartitions)
105-
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get
106-
val partitions = tblFormat.partitions("data.purchases")(spark)
107-
assertEquals(Set("ds"), partitions.flatMap(_.keys).toSet)
101+
val externalPartitions = tableUtils.partitions("data.checkouts_parquet_partitioned")
102+
assertEquals(Seq("2023-11-30"), externalPartitions)
103+
val nativePartitions = tableUtils.partitions("data.purchases")
104+
assertEquals(Set(20231118, 20231122, 20231125, 20231102, 20231123, 20231119, 20231130, 20231101, 20231117, 20231110, 20231108, 20231112, 20231115, 20231116, 20231113, 20231104, 20231103, 20231106, 20231121, 20231124, 20231128, 20231109, 20231127, 20231129, 20231126, 20231114, 20231107, 20231111, 20231120, 20231105).map(_.toString), nativePartitions.toSet)
105+
106+
val df = tableUtils.loadTable("`canary-443022.data`.purchases")
107+
df.show
108+
108109
}
109110

110111
it should "kryo serialization for ResolvingFileIO" in {

0 commit comments

Comments
 (0)