Skip to content

Revert "chore: Remove use of DelegatingTable and bubble up exceptions properly" #661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,61 @@ import com.google.cloud.bigquery.{
TableId
}
import com.google.cloud.spark.bigquery.BigQueryCatalog
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog

import java.util
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import scala.util.{Failure, Success, Try}
import scala.util.Try

/** A table that delegates all operations to an internal table, but with additional properties.
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly.
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table
* with that information, before we pass it back to the Spark compute engine.
*
* Down the line, we could also support custom partition management.
*/
class DelegatingTable(internalTable: Table,
additionalProperties: Map[String, String],
partitioning: Option[Array[Transform]] = None)
extends Table
with SupportsRead
with SupportsWrite {

override def name(): String = internalTable.name

override def schema(): StructType = internalTable.schema

override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)

Comment on lines +50 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard the casts to SupportsRead/Write.

asInstanceOf will blow up if internalTable lacks SupportsRead/SupportsWrite.
Prefer pattern‑match or collect { case t: SupportsRead => … } to fail fast with a clearer message.

override def properties(): util.Map[String, String] =
(internalTable.properties().asScala ++ additionalProperties).asJava

override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning())

}

object DelegatingTable {
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table =
new DelegatingTable(table, additionalProperties = additionalProperties)
}

/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for
* querying of a variety of table types directly in spark sql or the dataframe api.
Expand Down Expand Up @@ -78,10 +118,13 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames

override def loadTable(identNoCatalog: Identifier): Table = {
Try {
icebergCatalog.loadTable(identNoCatalog)
val icebergSparkTable = icebergCatalog.loadTable(identNoCatalog)
DelegatingTable(icebergSparkTable,
additionalProperties =
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "ICEBERG"))
}
.recover {
Comment on lines 120 to 126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Too broad fallback.

recover { case _ => … } hides non‑existence and real errors alike.
Match _: NoSuchTableException explicitly to avoid masking genuine failures.

case noIcebergTableEx: NoSuchTableException => {
case _ => {
val project =
catalogProps.getOrElse(BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_PROJECT, bqOptions.getProjectId)
val tId = identNoCatalog.namespace().toList match {
Expand All @@ -91,9 +134,7 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
throw new IllegalArgumentException(
s"Table identifier namespace ${identNoCatalog} must have at least one part.")
}
val table = scala
.Option(bigQueryClient.getTable(tId))
.getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found."))
val table = bigQueryClient.getTable(tId)
table.getDefinition.asInstanceOf[TableDefinition] match {
Comment on lines +137 to 138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Null‑safe BigQuery lookup needed.

getTable returns null when absent → NPE later.
Wrap with Option(...) and throw NoSuchTableException early.

-          val table = bigQueryClient.getTable(tId)
+          val table = Option(bigQueryClient.getTable(tId))
+            .getOrElse(throw new NoSuchTableException(s"Table $tId not found in BigQuery"))

case externalTable: ExternalTableDefinition => {
val uris = externalTable.getSourceUris.asScala
Expand All @@ -105,36 +146,33 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
uris.head.replaceAll("/\\*\\.parquet$", "")
}

val fileBasedTable = ParquetTable(
tId.toString,
SparkSession.active,
new CaseInsensitiveStringMap(
Map(TableCatalog.PROP_EXTERNAL -> "true",
TableCatalog.PROP_LOCATION -> uri,
TableCatalog.PROP_PROVIDER -> "PARQUET").asJava),
List(uri),
None,
classOf[ParquetFileFormat]
)
fileBasedTable
val fileBasedTable = ParquetTable(tId.toString,
SparkSession.active,
CaseInsensitiveStringMap.empty(),
List(uri),
None,
classOf[ParquetFileFormat])
DelegatingTable(fileBasedTable,
Map(TableCatalog.PROP_EXTERNAL -> "true",
TableCatalog.PROP_LOCATION -> uri,
TableCatalog.PROP_PROVIDER -> "PARQUET"))
}
case _: StandardTableDefinition => {
//todo(tchow): Support partitioning

// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
// ideally it should be the below:
// val connectorTable = connectorCatalog.loadTable(ident)
connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
DelegatingTable(connectorTable,
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "BIGQUERY"))
}
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
}
}
case other: Throwable => throw other
} match {
case Success(table) => table
case Failure(exception) => throw exception
}
}
.getOrElse(throw new NoSuchTableException(f"Table: ${identNoCatalog} not found in bigquery catalog."))
}

override def createTable(ident: Identifier,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark.format.{DefaultFormatProvider, Format, Iceberg}
import com.google.cloud.bigquery._
import com.google.cloud.spark.bigquery.v2.Spark31BigQueryTable
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.connector.catalog.TableCatalog

import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
import scala.util.Try

class GcpFormatProvider(override val sparkSession: SparkSession) extends DefaultFormatProvider(sparkSession) {

Expand All @@ -28,17 +26,18 @@ class GcpFormatProvider(override val sparkSession: SparkSession) extends Default
cat match {
case delegating: DelegatingBigQueryMetastoreCatalog =>
Try {
val tbl = delegating.loadTable(identifier)
tbl match {
case iceberg: SparkTable => Iceberg
case bigquery: Spark31BigQueryTable => BigQueryNative
case parquet: ParquetTable => BigQueryExternal
delegating
.loadTable(identifier)
.properties
.asScala
.getOrElse(TableCatalog.PROP_PROVIDER, "")
.toUpperCase match {
case "ICEBERG" => Iceberg
case "BIGQUERY" => BigQueryNative
case "PARQUET" => BigQueryExternal
case unsupported => throw new IllegalStateException(s"Unsupported provider type: ${unsupported}")
}
} match {
case s @ Success(_) => s.toOption
case Failure(exception) => throw exception
}
}.toOption
case iceberg: SparkCatalog if (iceberg.icebergCatalog().isInstanceOf[BigQueryMetastoreCatalog]) =>
scala.Option(Iceberg)
case _ => super.readFormat(tableName)
Expand Down