Skip to content

Commit 6b482ea

Browse files
chore: Remove use of DelegatingTable and bubble up exceptions properly (#638)
## Summary - It's better to return the underlying table, that way we can let Spark understand what capabilities are possible based on class type matching. - Also modify the catalog to conform to the contract, throwing NoSuchTableException. Improve logging in TableUtils. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Simplified and streamlined table loading and error handling for BigQuery and external tables. - Improved reliability by updating exception handling and type-based logic for table formats. - **Chores** - Removed unused classes, objects, and imports to reduce code complexity. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> Co-authored-by: Thomas Chow <[email protected]>
1 parent 3cfd5eb commit 6b482ea

File tree

2 files changed

+40
-77
lines changed

2 files changed

+40
-77
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 27 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -9,61 +9,21 @@ import com.google.cloud.bigquery.{
99
TableId
1010
}
1111
import com.google.cloud.spark.bigquery.BigQueryCatalog
12+
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
1213
import org.apache.iceberg.spark.SparkCatalog
1314
import org.apache.spark.sql.SparkSession
1415
import org.apache.spark.sql.connector.catalog._
1516
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
1617
import org.apache.spark.sql.connector.expressions.Transform
17-
import org.apache.spark.sql.connector.read.ScanBuilder
18-
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
1918
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
2019
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
2120
import org.apache.spark.sql.types.StructType
2221
import org.apache.spark.sql.util.CaseInsensitiveStringMap
23-
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
24-
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
2522

2623
import java.util
2724
import scala.jdk.CollectionConverters._
28-
import scala.util.Try
29-
30-
/** A table that delegates all operations to an internal table, but with additional properties.
31-
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly.
32-
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table
33-
* with that information, before we pass it back to the Spark compute engine.
34-
*
35-
* Down the line, we could also support custom partition management.
36-
*/
37-
class DelegatingTable(internalTable: Table,
38-
additionalProperties: Map[String, String],
39-
partitioning: Option[Array[Transform]] = None)
40-
extends Table
41-
with SupportsRead
42-
with SupportsWrite {
43-
44-
override def name(): String = internalTable.name
45-
46-
override def schema(): StructType = internalTable.schema
47-
48-
override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()
49-
50-
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
51-
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
52-
53-
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
54-
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
55-
56-
override def properties(): util.Map[String, String] =
57-
(internalTable.properties().asScala ++ additionalProperties).asJava
58-
59-
override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning())
60-
61-
}
62-
63-
object DelegatingTable {
64-
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table =
65-
new DelegatingTable(table, additionalProperties = additionalProperties)
66-
}
25+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
26+
import scala.util.{Failure, Success, Try}
6727

6828
/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for
6929
* querying of a variety of table types directly in spark sql or the dataframe api.
@@ -118,13 +78,10 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
11878

11979
override def loadTable(identNoCatalog: Identifier): Table = {
12080
Try {
121-
val icebergSparkTable = icebergCatalog.loadTable(identNoCatalog)
122-
DelegatingTable(icebergSparkTable,
123-
additionalProperties =
124-
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "ICEBERG"))
81+
icebergCatalog.loadTable(identNoCatalog)
12582
}
12683
.recover {
127-
case _ => {
84+
case noIcebergTableEx: NoSuchTableException => {
12885
val project =
12986
catalogProps.getOrElse(BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_PROJECT, bqOptions.getProjectId)
13087
val tId = identNoCatalog.namespace().toList match {
@@ -134,7 +91,9 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
13491
throw new IllegalArgumentException(
13592
s"Table identifier namespace ${identNoCatalog} must have at least one part.")
13693
}
137-
val table = bigQueryClient.getTable(tId)
94+
val table = scala
95+
.Option(bigQueryClient.getTable(tId))
96+
.getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found."))
13897
table.getDefinition.asInstanceOf[TableDefinition] match {
13998
case externalTable: ExternalTableDefinition => {
14099
val uris = externalTable.getSourceUris.asScala
@@ -146,33 +105,36 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
146105
uris.head.replaceAll("/\\*\\.parquet$", "")
147106
}
148107

149-
val fileBasedTable = ParquetTable(tId.toString,
150-
SparkSession.active,
151-
CaseInsensitiveStringMap.empty(),
152-
List(uri),
153-
None,
154-
classOf[ParquetFileFormat])
155-
DelegatingTable(fileBasedTable,
156-
Map(TableCatalog.PROP_EXTERNAL -> "true",
157-
TableCatalog.PROP_LOCATION -> uri,
158-
TableCatalog.PROP_PROVIDER -> "PARQUET"))
108+
val fileBasedTable = ParquetTable(
109+
tId.toString,
110+
SparkSession.active,
111+
new CaseInsensitiveStringMap(
112+
Map(TableCatalog.PROP_EXTERNAL -> "true",
113+
TableCatalog.PROP_LOCATION -> uri,
114+
TableCatalog.PROP_PROVIDER -> "PARQUET").asJava),
115+
List(uri),
116+
None,
117+
classOf[ParquetFileFormat]
118+
)
119+
fileBasedTable
159120
}
160121
case _: StandardTableDefinition => {
161122
//todo(tchow): Support partitioning
162123

163124
// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
164125
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340
165-
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
166126
// ideally it should be the below:
167127
// val connectorTable = connectorCatalog.loadTable(ident)
168-
DelegatingTable(connectorTable,
169-
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "BIGQUERY"))
128+
connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
170129
}
171-
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
130+
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
172131
}
173132
}
174-
}
175-
.getOrElse(throw new NoSuchTableException(f"Table: ${identNoCatalog} not found in bigquery catalog."))
133+
case other: Throwable => throw other
134+
} match {
135+
case Success(table) => table
136+
case Failure(exception) => throw exception
137+
}
176138
}
177139

178140
override def createTable(ident: Identifier,

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package ai.chronon.integrations.cloud_gcp
22
import ai.chronon.spark.format.{DefaultFormatProvider, Format, Iceberg}
33
import com.google.cloud.bigquery._
4+
import com.google.cloud.spark.bigquery.v2.Spark31BigQueryTable
45
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
56
import org.apache.iceberg.spark.SparkCatalog
7+
import org.apache.iceberg.spark.source.SparkTable
68
import org.apache.spark.sql.SparkSession
7-
import org.apache.spark.sql.connector.catalog.TableCatalog
9+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
810

911
import scala.jdk.CollectionConverters._
10-
import scala.util.Try
12+
import scala.util.{Failure, Success, Try}
1113

1214
class GcpFormatProvider(override val sparkSession: SparkSession) extends DefaultFormatProvider(sparkSession) {
1315

@@ -26,18 +28,17 @@ class GcpFormatProvider(override val sparkSession: SparkSession) extends Default
2628
cat match {
2729
case delegating: DelegatingBigQueryMetastoreCatalog =>
2830
Try {
29-
delegating
30-
.loadTable(identifier)
31-
.properties
32-
.asScala
33-
.getOrElse(TableCatalog.PROP_PROVIDER, "")
34-
.toUpperCase match {
35-
case "ICEBERG" => Iceberg
36-
case "BIGQUERY" => BigQueryNative
37-
case "PARQUET" => BigQueryExternal
31+
val tbl = delegating.loadTable(identifier)
32+
tbl match {
33+
case iceberg: SparkTable => Iceberg
34+
case bigquery: Spark31BigQueryTable => BigQueryNative
35+
case parquet: ParquetTable => BigQueryExternal
3836
case unsupported => throw new IllegalStateException(s"Unsupported provider type: ${unsupported}")
3937
}
40-
}.toOption
38+
} match {
39+
case s @ Success(_) => s.toOption
40+
case Failure(exception) => throw exception
41+
}
4142
case iceberg: SparkCatalog if (iceberg.icebergCatalog().isInstanceOf[BigQueryMetastoreCatalog]) =>
4243
scala.Option(Iceberg)
4344
case _ => super.readFormat(tableName)

0 commit comments

Comments
 (0)