Skip to content

Commit ff7d6df

Browse files
hack
Co-authored-by: Thomas Chow <[email protected]>
1 parent 4d01488 commit ff7d6df

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package ai.chronon.integrations.cloud_gcp
22

3-
import com.google.cloud.bigquery.{
3+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.{
44
BigQuery,
55
BigQueryOptions,
66
ExternalTableDefinition,
77
StandardTableDefinition,
88
TableDefinition,
9-
TableId
9+
TableId,
1010
}
1111
import com.google.cloud.spark.bigquery.BigQueryCatalog
1212
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
@@ -19,6 +19,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
1919
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
2020
import org.apache.spark.sql.types.StructType
2121
import org.apache.spark.sql.util.CaseInsensitiveStringMap
22+
import com.google.cloud.spark.bigquery.{SchemaConverters, SchemaConvertersConfiguration}
2223

2324
import java.util
2425
import scala.jdk.CollectionConverters._
@@ -122,22 +123,22 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
122123
fileBasedTable
123124
}
124125
case stTable: StandardTableDefinition => {
126+
import com.google.cloud.spark.bigquery.repackaged.com.google.inject.Injector
127+
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTable
125128
//todo(tchow): Support partitioning
126129

127130
// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
128131
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340
129132
// ideally it should be the below:
130133
// val connectorTable = connectorCatalog.loadTable(ident)
134+
// So instead, we read the bigqueryTable ourselves to get access to the schema and pass it through.
131135
val nativeTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
132-
logger.info(s"Table: ${nativeTable}")
133-
logger.info(s"Table name: ${nativeTable.name()}")
134-
logger.info(s"Table properties: ${nativeTable.properties()}")
135-
logger.info(s"Table partitioning: ${nativeTable.partitioning()}")
136-
logger.info("Table schema:")
137-
logger.info(f"${nativeTable.schema()}")
138-
logger.info("Table columns:")
139-
logger.info(f"${nativeTable.columns()}")
140-
nativeTable
136+
val injector = nativeTable.getClass.getDeclaredField("injector")
137+
injector.setAccessible(true)
138+
val value = injector.get(nativeTable).asInstanceOf[Injector]
139+
val sc = SchemaConverters.from(SchemaConvertersConfiguration.createDefault())
140+
val sparkSchema = sc.toSpark(stTable.getSchema)
141+
new Spark35BigQueryTable(value, () => sparkSchema)
141142
}
142143
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
143144
}

0 commit comments

Comments
 (0)