Skip to content

Commit 53b06da

Browse files
authored
Revert "chore: Remove use of DelegatingTable and bubble up exceptions properl…"
This reverts commit 6b482ea.
1 parent 0a4115a commit 53b06da

File tree

2 files changed

+77
-40
lines changed

2 files changed

+77
-40
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,61 @@ import com.google.cloud.bigquery.{
99
TableId
1010
}
1111
import com.google.cloud.spark.bigquery.BigQueryCatalog
12-
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
1312
import org.apache.iceberg.spark.SparkCatalog
1413
import org.apache.spark.sql.SparkSession
1514
import org.apache.spark.sql.connector.catalog._
1615
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
1716
import org.apache.spark.sql.connector.expressions.Transform
17+
import org.apache.spark.sql.connector.read.ScanBuilder
18+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
1819
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
1920
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
2021
import org.apache.spark.sql.types.StructType
2122
import org.apache.spark.sql.util.CaseInsensitiveStringMap
23+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
24+
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
2225

2326
import java.util
2427
import scala.jdk.CollectionConverters._
25-
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
26-
import scala.util.{Failure, Success, Try}
28+
import scala.util.Try
29+
30+
/** A table that delegates all operations to an internal table, but with additional properties.
31+
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly.
32+
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table
33+
* with that information, before we pass it back to the Spark compute engine.
34+
*
35+
* Down the line, we could also support custom partition management.
36+
*/
37+
class DelegatingTable(internalTable: Table,
38+
additionalProperties: Map[String, String],
39+
partitioning: Option[Array[Transform]] = None)
40+
extends Table
41+
with SupportsRead
42+
with SupportsWrite {
43+
44+
override def name(): String = internalTable.name
45+
46+
override def schema(): StructType = internalTable.schema
47+
48+
override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()
49+
50+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
51+
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
52+
53+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
54+
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
55+
56+
override def properties(): util.Map[String, String] =
57+
(internalTable.properties().asScala ++ additionalProperties).asJava
58+
59+
override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning())
60+
61+
}
62+
63+
object DelegatingTable {
64+
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table =
65+
new DelegatingTable(table, additionalProperties = additionalProperties)
66+
}
2767

2868
/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for
2969
* querying of a variety of table types directly in spark sql or the dataframe api.
@@ -78,10 +118,13 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
78118

79119
override def loadTable(identNoCatalog: Identifier): Table = {
80120
Try {
81-
icebergCatalog.loadTable(identNoCatalog)
121+
val icebergSparkTable = icebergCatalog.loadTable(identNoCatalog)
122+
DelegatingTable(icebergSparkTable,
123+
additionalProperties =
124+
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "ICEBERG"))
82125
}
83126
.recover {
84-
case noIcebergTableEx: NoSuchTableException => {
127+
case _ => {
85128
val project =
86129
catalogProps.getOrElse(BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_PROJECT, bqOptions.getProjectId)
87130
val tId = identNoCatalog.namespace().toList match {
@@ -91,9 +134,7 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
91134
throw new IllegalArgumentException(
92135
s"Table identifier namespace ${identNoCatalog} must have at least one part.")
93136
}
94-
val table = scala
95-
.Option(bigQueryClient.getTable(tId))
96-
.getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found."))
137+
val table = bigQueryClient.getTable(tId)
97138
table.getDefinition.asInstanceOf[TableDefinition] match {
98139
case externalTable: ExternalTableDefinition => {
99140
val uris = externalTable.getSourceUris.asScala
@@ -105,36 +146,33 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
105146
uris.head.replaceAll("/\\*\\.parquet$", "")
106147
}
107148

108-
val fileBasedTable = ParquetTable(
109-
tId.toString,
110-
SparkSession.active,
111-
new CaseInsensitiveStringMap(
112-
Map(TableCatalog.PROP_EXTERNAL -> "true",
113-
TableCatalog.PROP_LOCATION -> uri,
114-
TableCatalog.PROP_PROVIDER -> "PARQUET").asJava),
115-
List(uri),
116-
None,
117-
classOf[ParquetFileFormat]
118-
)
119-
fileBasedTable
149+
val fileBasedTable = ParquetTable(tId.toString,
150+
SparkSession.active,
151+
CaseInsensitiveStringMap.empty(),
152+
List(uri),
153+
None,
154+
classOf[ParquetFileFormat])
155+
DelegatingTable(fileBasedTable,
156+
Map(TableCatalog.PROP_EXTERNAL -> "true",
157+
TableCatalog.PROP_LOCATION -> uri,
158+
TableCatalog.PROP_PROVIDER -> "PARQUET"))
120159
}
121160
case _: StandardTableDefinition => {
122161
//todo(tchow): Support partitioning
123162

124163
// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
125164
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340
165+
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
126166
// ideally it should be the below:
127167
// val connectorTable = connectorCatalog.loadTable(ident)
128-
connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
168+
DelegatingTable(connectorTable,
169+
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "BIGQUERY"))
129170
}
130-
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
171+
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
131172
}
132173
}
133-
case other: Throwable => throw other
134-
} match {
135-
case Success(table) => table
136-
case Failure(exception) => throw exception
137-
}
174+
}
175+
.getOrElse(throw new NoSuchTableException(f"Table: ${identNoCatalog} not found in bigquery catalog."))
138176
}
139177

140178
override def createTable(ident: Identifier,

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package ai.chronon.integrations.cloud_gcp
22
import ai.chronon.spark.format.{DefaultFormatProvider, Format, Iceberg}
33
import com.google.cloud.bigquery._
4-
import com.google.cloud.spark.bigquery.v2.Spark31BigQueryTable
54
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
65
import org.apache.iceberg.spark.SparkCatalog
7-
import org.apache.iceberg.spark.source.SparkTable
86
import org.apache.spark.sql.SparkSession
9-
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
7+
import org.apache.spark.sql.connector.catalog.TableCatalog
108

119
import scala.jdk.CollectionConverters._
12-
import scala.util.{Failure, Success, Try}
10+
import scala.util.Try
1311

1412
class GcpFormatProvider(override val sparkSession: SparkSession) extends DefaultFormatProvider(sparkSession) {
1513

@@ -28,17 +26,18 @@ class GcpFormatProvider(override val sparkSession: SparkSession) extends Default
2826
cat match {
2927
case delegating: DelegatingBigQueryMetastoreCatalog =>
3028
Try {
31-
val tbl = delegating.loadTable(identifier)
32-
tbl match {
33-
case iceberg: SparkTable => Iceberg
34-
case bigquery: Spark31BigQueryTable => BigQueryNative
35-
case parquet: ParquetTable => BigQueryExternal
29+
delegating
30+
.loadTable(identifier)
31+
.properties
32+
.asScala
33+
.getOrElse(TableCatalog.PROP_PROVIDER, "")
34+
.toUpperCase match {
35+
case "ICEBERG" => Iceberg
36+
case "BIGQUERY" => BigQueryNative
37+
case "PARQUET" => BigQueryExternal
3638
case unsupported => throw new IllegalStateException(s"Unsupported provider type: ${unsupported}")
3739
}
38-
} match {
39-
case s @ Success(_) => s.toOption
40-
case Failure(exception) => throw exception
41-
}
40+
}.toOption
4241
case iceberg: SparkCatalog if (iceberg.icebergCatalog().isInstanceOf[BigQueryMetastoreCatalog]) =>
4342
scala.Option(Iceberg)
4443
case _ => super.readFormat(tableName)

0 commit comments

Comments
 (0)