-
Notifications
You must be signed in to change notification settings - Fork 0
fix: properly detect bigquery catalog #629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
eb08cb1
8822ede
7af162a
484368a
7ace96b
527333f
22fae64
b7b44d4
1ba3541
ef6213a
565bdd0
a423261
a22660f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,10 @@ package ai.chronon.integrations.cloud_gcp | |
import ai.chronon.spark.format.{DefaultFormatProvider, Format, Iceberg} | ||
import com.google.cloud.bigquery._ | ||
import com.google.cloud.iceberg.bigquery.relocated.com.google.api.services.bigquery.model.TableReference | ||
import com.google.cloud.spark.bigquery.BigQueryCatalog | ||
import org.apache.iceberg.exceptions.NoSuchIcebergTableException | ||
import org.apache.iceberg.gcp.bigquery.{BigQueryClient, BigQueryClientImpl} | ||
import org.apache.iceberg.gcp.bigquery.{BigQueryClient, BigQueryClientImpl, BigQueryMetastoreCatalog} | ||
import org.apache.iceberg.spark.SparkCatalog | ||
import org.apache.spark.sql.SparkSession | ||
|
||
import scala.jdk.CollectionConverters._ | ||
|
@@ -23,19 +25,47 @@ class GcpFormatProvider(override val sparkSession: SparkSession) extends Default | |
private lazy val icebergClient: BigQueryClient = new BigQueryClientImpl() | ||
|
||
override def readFormat(tableName: String): scala.Option[Format] = { | ||
logger.info(s"Retrieving read format for table: ${tableName}") | ||
val parsedCatalog = getCatalog(tableName) | ||
|
||
if (isBigQueryCatalog(parsedCatalog)) { | ||
logger.info(s"Detected BigQuery catalog: $parsedCatalog") | ||
Try { | ||
val btTableIdentifier = SparkBQUtils.toTableId(tableName)(sparkSession) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remind me again but why do we need to convert tableName to a big table identifier? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's just what the bigQueryClient accepts - see the line below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tbh this isn't even entirely correct just yet. We actually need to find the project_id associated with the catalog, and replace that in the table Name before we pass it off to the BQ Client. but will do that in a follow-up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alright I've adjusted this actually to make it work with BigQuery. |
||
val bqTable = bigQueryClient.getTable(btTableIdentifier) | ||
getFormat(bqTable) | ||
} match { | ||
case Success(format) => scala.Option(format) | ||
case Failure(e) => | ||
throw new IllegalStateException( | ||
s"${tableName} belongs to bigquery catalog ${parsedCatalog} but could not be found", | ||
e) | ||
} | ||
} else { | ||
|
||
logger.info(s"Detected non-BigQuery catalog: $parsedCatalog") | ||
super.readFormat(tableName) | ||
} | ||
} | ||
|
||
// order is important here. we want the Hive case where we just check for table in catalog to be last | ||
Try { | ||
val btTableIdentifier = SparkBQUtils.toTableId(tableName)(sparkSession) | ||
val bqTable = bigQueryClient.getTable(btTableIdentifier) | ||
getFormat(bqTable) | ||
} match { | ||
case Success(format) => scala.Option(format) | ||
case Failure(e) => | ||
logger.info(s"${tableName} is not a BigQuery table") | ||
super.readFormat(tableName) | ||
private[cloud_gcp] def getCatalog(tableName: String): String = { | ||
logger.info(s"Retrieving read format for table: ${tableName}") | ||
val parsed = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) | ||
val parsedCatalog = parsed.toList match { | ||
case catalog :: namespace :: tableName :: Nil => catalog | ||
case namespace :: tableName :: Nil => sparkSession.catalog.currentCatalog() | ||
case tableName :: Nil => sparkSession.catalog.currentCatalog() | ||
case _ => throw new IllegalStateException(s"Invalid table naming convention specified: ${tableName}") | ||
} | ||
parsedCatalog | ||
} | ||
|
||
private[cloud_gcp] def isBigQueryCatalog(catalog: String): Boolean = { | ||
val cat = sparkSession.sessionState.catalogManager.catalog(catalog) | ||
cat.isInstanceOf[DelegatingBigQueryMetastoreCatalog] || cat | ||
.isInstanceOf[BigQueryCatalog] || (cat.isInstanceOf[SparkCatalog] && cat | ||
.asInstanceOf[SparkCatalog] | ||
.icebergCatalog() | ||
.isInstanceOf[BigQueryMetastoreCatalog]) | ||
} | ||
|
||
private[cloud_gcp] def getFormat(table: Table): Format = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Missing getCatalog implementation.
Method is called but appears to be missing implementation.
🏁 Script executed:
Length of output: 171
Implement or Inherit getCatalog
The call to
getCatalog
inGcpFormatProvider.scala
(line 23) doesn’t resolve locally. Although a similar method exists inDefaultFormatProvider.scala
, it isn’t automatically available here. Please either inherit from a common base that provides the implementation or add a GCP-specificgetCatalog
method.