-
Notifications
You must be signed in to change notification settings - Fork 1
Revert "chore: Remove use of DelegatingTable and bubble up exceptions properly" #661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,21 +9,61 @@ import com.google.cloud.bigquery.{ | |
TableId | ||
} | ||
import com.google.cloud.spark.bigquery.BigQueryCatalog | ||
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog | ||
import org.apache.iceberg.spark.SparkCatalog | ||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.connector.catalog._ | ||
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction | ||
import org.apache.spark.sql.connector.expressions.Transform | ||
import org.apache.spark.sql.connector.read.ScanBuilder | ||
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} | ||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException | ||
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog | ||
|
||
import java.util | ||
import scala.jdk.CollectionConverters._ | ||
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException | ||
import scala.util.{Failure, Success, Try} | ||
import scala.util.Try | ||
|
||
/** A table that delegates all operations to an internal table, but with additional properties. | ||
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly. | ||
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table | ||
* with that information, before we pass it back to the Spark compute engine. | ||
* | ||
* Down the line, we could also support custom partition management. | ||
*/ | ||
class DelegatingTable(internalTable: Table, | ||
additionalProperties: Map[String, String], | ||
partitioning: Option[Array[Transform]] = None) | ||
extends Table | ||
with SupportsRead | ||
with SupportsWrite { | ||
|
||
override def name(): String = internalTable.name | ||
|
||
override def schema(): StructType = internalTable.schema | ||
|
||
override def capabilities(): util.Set[TableCapability] = internalTable.capabilities() | ||
|
||
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = | ||
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) | ||
|
||
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = | ||
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) | ||
|
||
override def properties(): util.Map[String, String] = | ||
(internalTable.properties().asScala ++ additionalProperties).asJava | ||
|
||
override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning()) | ||
|
||
} | ||
|
||
object DelegatingTable { | ||
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table = | ||
new DelegatingTable(table, additionalProperties = additionalProperties) | ||
} | ||
|
||
/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for | ||
* querying of a variety of table types directly in spark sql or the dataframe api. | ||
|
@@ -78,10 +118,13 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames | |
|
||
override def loadTable(identNoCatalog: Identifier): Table = { | ||
Try { | ||
icebergCatalog.loadTable(identNoCatalog) | ||
val icebergSparkTable = icebergCatalog.loadTable(identNoCatalog) | ||
DelegatingTable(icebergSparkTable, | ||
additionalProperties = | ||
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "ICEBERG")) | ||
} | ||
.recover { | ||
Comment on lines
120
to
126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Too broad fallback.
|
||
case noIcebergTableEx: NoSuchTableException => { | ||
case _ => { | ||
val project = | ||
catalogProps.getOrElse(BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_PROJECT, bqOptions.getProjectId) | ||
val tId = identNoCatalog.namespace().toList match { | ||
|
@@ -91,9 +134,7 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames | |
throw new IllegalArgumentException( | ||
s"Table identifier namespace ${identNoCatalog} must have at least one part.") | ||
} | ||
val table = scala | ||
.Option(bigQueryClient.getTable(tId)) | ||
.getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found.")) | ||
val table = bigQueryClient.getTable(tId) | ||
table.getDefinition.asInstanceOf[TableDefinition] match { | ||
Comment on lines
+137
to
138
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null‑safe BigQuery lookup needed.
- val table = bigQueryClient.getTable(tId)
+ val table = Option(bigQueryClient.getTable(tId))
+ .getOrElse(throw new NoSuchTableException(s"Table $tId not found in BigQuery")) |
||
case externalTable: ExternalTableDefinition => { | ||
val uris = externalTable.getSourceUris.asScala | ||
|
@@ -105,36 +146,33 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames | |
uris.head.replaceAll("/\\*\\.parquet$", "") | ||
} | ||
|
||
val fileBasedTable = ParquetTable( | ||
tId.toString, | ||
SparkSession.active, | ||
new CaseInsensitiveStringMap( | ||
Map(TableCatalog.PROP_EXTERNAL -> "true", | ||
TableCatalog.PROP_LOCATION -> uri, | ||
TableCatalog.PROP_PROVIDER -> "PARQUET").asJava), | ||
List(uri), | ||
None, | ||
classOf[ParquetFileFormat] | ||
) | ||
fileBasedTable | ||
val fileBasedTable = ParquetTable(tId.toString, | ||
SparkSession.active, | ||
CaseInsensitiveStringMap.empty(), | ||
List(uri), | ||
None, | ||
classOf[ParquetFileFormat]) | ||
DelegatingTable(fileBasedTable, | ||
Map(TableCatalog.PROP_EXTERNAL -> "true", | ||
TableCatalog.PROP_LOCATION -> uri, | ||
TableCatalog.PROP_PROVIDER -> "PARQUET")) | ||
} | ||
case _: StandardTableDefinition => { | ||
//todo(tchow): Support partitioning | ||
|
||
// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId. | ||
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340 | ||
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable)) | ||
// ideally it should be the below: | ||
// val connectorTable = connectorCatalog.loadTable(ident) | ||
connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable)) | ||
DelegatingTable(connectorTable, | ||
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "BIGQUERY")) | ||
} | ||
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}") | ||
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}") | ||
} | ||
} | ||
case other: Throwable => throw other | ||
} match { | ||
case Success(table) => table | ||
case Failure(exception) => throw exception | ||
} | ||
} | ||
.getOrElse(throw new NoSuchTableException(f"Table: ${identNoCatalog} not found in bigquery catalog.")) | ||
} | ||
|
||
override def createTable(ident: Identifier, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard the casts to SupportsRead/Write.
asInstanceOf
will blow up ifinternalTable
lacksSupportsRead
/SupportsWrite
.Prefer pattern‑match or
collect { case t: SupportsRead => … }
to fail fast with a clearer message.