Skip to content

Commit 1aaed62

Browse files
committed
bq client native
1 parent 977be59 commit 1aaed62

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,28 @@ import com.google.cloud.bigquery.ExternalTableDefinition
1212
import com.google.cloud.bigquery.StandardTableDefinition
1313

1414
case class GCPFormatProvider(sparkSession: SparkSession) extends FormatProvider {
15-
lazy val bigQueryClient = BigQueryOptions.getDefaultInstance.getService
15+
lazy val bqOptions = BigQueryOptions.getDefaultInstance
16+
lazy val bigQueryClient = bqOptions.getService
1617
def readFormat(tableName: String): Format = {
1718

1819
val btTableIdentifier: TableId = BigQueryUtil.parseTableId(tableName)
20+
21+
// Order of Precedence for Default Project
22+
// Explicitly configured project in code (e.g., setProjectId()).
23+
// GOOGLE_CLOUD_PROJECT environment variable.
24+
// project_id from the ADC service account JSON file.
25+
// Active project in the gcloud CLI configuration.
26+
// No default project: An error will occur if no project ID is available.
27+
1928
val unshadedTI: BTableId =
20-
BTableId.of(btTableIdentifier.getProject, btTableIdentifier.getDataset, btTableIdentifier.getTable)
29+
BTableId.of(bqOptions.getProjectId, btTableIdentifier.getDataset, btTableIdentifier.getTable)
2130

2231
val tableOpt = Option(bigQueryClient.getTable(unshadedTI))
23-
2432
tableOpt match {
2533
case Some(table) => {
26-
table.getDefinition match {
27-
case ExternalTableDefinition => BQuery(unshadedTI.getProject)
28-
case StandardTableDefinition => GCS(unshadedTI.getProject)
29-
}
34+
if (table.getDefinition.isInstanceOf[ExternalTableDefinition]) GCS(unshadedTI.getProject)
35+
else if (table.getDefinition.isInstanceOf[StandardTableDefinition]) BQuery(unshadedTI.getProject)
36+
else throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
3037
}
3138
case None => Hive
3239
}

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
4848
table.show
4949
}
5050

51-
ignore("integration testing bigquery partitions") {
51+
test("integration testing bigquery partitions") {
5252
// TODO(tchow): This test is ignored because it requires a running instance of the bigquery. Need to figure out stubbing locally.
5353
// to run this:
5454
// 1. Set up a tunnel to dataproc federation proxy:

0 commit comments

Comments
 (0)