-
Notifications
You must be signed in to change notification settings - Fork 0
feat: bigquery catalog with iceberg support #393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
name: branch_protection | ||
on: | ||
pull_request: | ||
push: | ||
jobs: | ||
enforce_triggered_workflows: | ||
runs-on: ubuntu-latest | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package ai.chronon.integrations.cloud_gcp | ||
import ai.chronon.spark.ChrononKryoRegistrator | ||
import com.esotericsoftware.kryo.Kryo | ||
import com.esotericsoftware.kryo.serializers.JavaSerializer | ||
import org.apache.iceberg.gcp.gcs.GCSFileIO | ||
|
||
class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator { | ||
override def registerClasses(kryo: Kryo): Unit = { | ||
super.registerClasses(kryo) | ||
|
||
// Have not been able to get kryo serialization to work with the closure in the GCSFileIO class. | ||
// See: https://github.com/apache/iceberg/blob/cc4fe4cc50043ccba89700f7948090ff87a5baee/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java#L138-L173 | ||
// There are unit tests for this in the iceberg project: https://github.com/apache/iceberg/blob/cc4fe4cc50043ccba89700f7948090ff87a5baee/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java#L201-L209 | ||
// However for some reason this still fails when we run for real. Should consider testing this again once we | ||
// bump iceberg versions. To test, we simply remove this line and run any integration job that writes iceberg to GCS. | ||
kryo.register(classOf[GCSFileIO], new JavaSerializer) | ||
|
||
val additionalClassNames = Seq( | ||
"org.apache.iceberg.DataFile", | ||
"org.apache.iceberg.FileContent", | ||
"org.apache.iceberg.FileFormat", | ||
"org.apache.iceberg.GenericDataFile", | ||
"org.apache.iceberg.PartitionData", | ||
"org.apache.iceberg.SerializableByteBufferMap", | ||
"org.apache.iceberg.SerializableTable$SerializableConfSupplier", | ||
"org.apache.iceberg.SnapshotRef", | ||
"org.apache.iceberg.SnapshotRefType", | ||
"org.apache.iceberg.encryption.PlaintextEncryptionManager", | ||
"org.apache.iceberg.gcp.GCPProperties", | ||
"org.apache.iceberg.hadoop.HadoopFileIO", | ||
"org.apache.iceberg.hadoop.HadoopMetricsContext", | ||
"org.apache.iceberg.MetadataTableType", | ||
"org.apache.iceberg.io.ResolvingFileIO", | ||
"org.apache.iceberg.spark.source.SerializableTableWithSize", | ||
"org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize", | ||
"org.apache.iceberg.spark.source.SparkWrite$TaskCommit", | ||
"org.apache.iceberg.types.Types$DateType", | ||
"org.apache.iceberg.types.Types$NestedField", | ||
"org.apache.iceberg.types.Types$StructType", | ||
"org.apache.iceberg.util.SerializableMap" | ||
) | ||
additionalClassNames.foreach(name => doRegister(name, kryo)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,199 @@ | ||||||||
package ai.chronon.integrations.cloud_gcp | ||||||||
|
||||||||
import com.google.cloud.bigquery.{ | ||||||||
BigQuery, | ||||||||
BigQueryOptions, | ||||||||
ExternalTableDefinition, | ||||||||
StandardTableDefinition, | ||||||||
TableDefinition, | ||||||||
TableId | ||||||||
} | ||||||||
import com.google.cloud.spark.bigquery.BigQueryCatalog | ||||||||
import org.apache.iceberg.spark.SparkCatalog | ||||||||
import org.apache.spark.sql.SparkSession | ||||||||
import org.apache.spark.sql.connector.catalog._ | ||||||||
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction | ||||||||
import org.apache.spark.sql.connector.expressions.Transform | ||||||||
import org.apache.spark.sql.connector.read.ScanBuilder | ||||||||
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} | ||||||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||||||||
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable | ||||||||
import org.apache.spark.sql.types.StructType | ||||||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||||||||
|
||||||||
import java.util | ||||||||
import scala.jdk.CollectionConverters._ | ||||||||
import scala.util.Try | ||||||||
|
||||||||
/** A table that delegates all operations to an internal table, but with additional properties. | ||||||||
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly. | ||||||||
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table | ||||||||
* with that information, before we pass it back to the Spark compute engine. | ||||||||
* | ||||||||
* Down the line, we could also support custom partition management. | ||||||||
*/ | ||||||||
class DelegatingTable(internalTable: Table, | ||||||||
additionalProperties: Map[String, String], | ||||||||
partitioning: Option[Array[Transform]] = None) | ||||||||
extends Table | ||||||||
with SupportsRead | ||||||||
with SupportsWrite { | ||||||||
|
||||||||
override def name(): String = internalTable.name | ||||||||
|
||||||||
override def schema(): StructType = internalTable.schema | ||||||||
|
||||||||
override def capabilities(): util.Set[TableCapability] = internalTable.capabilities() | ||||||||
|
||||||||
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = | ||||||||
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) | ||||||||
|
||||||||
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = | ||||||||
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) | ||||||||
|
||||||||
override def properties(): util.Map[String, String] = | ||||||||
(internalTable.properties().asScala ++ additionalProperties).asJava | ||||||||
|
||||||||
override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning()) | ||||||||
|
||||||||
} | ||||||||
|
||||||||
object DelegatingTable { | ||||||||
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table = | ||||||||
new DelegatingTable(table, additionalProperties = additionalProperties) | ||||||||
} | ||||||||
|
||||||||
/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for | ||||||||
* querying of a variety of table types directly in spark sql or the dataframe api. | ||||||||
* This is analogous to iceberg's [[org.apache.iceberg.spark.SparkSessionCatalog]] in that it will | ||||||||
* apply a fallback when querying for tables. It will always attempt to load a table reference | ||||||||
* as an iceberg table first and falling back to bigquery. | ||||||||
* | ||||||||
* To interact with iceberg, we use Google's https://cloud.google.com/blog/products/data-analytics/introducing-bigquery-metastore-fully-managed-metadata-service | ||||||||
* metastore catalog library. By default, all catalog operations will delegate to this library, and this abstraction | ||||||||
* is meant to remain incredibly thin. BE CAREFUL WHEN OVERRIDING THIS BEHAVIOR. You shouldn't be needing too much additional | ||||||||
* functionality. Before you do this, consider upgrading the `iceberg_bigquery_catalog_lib` dependency and/or iceberg first. | ||||||||
* | ||||||||
* NOTE that this abstraction currently only supports querying tables that all belong to the same GCP project. Multi-project | ||||||||
* support will depend on underlying libraries to support them. | ||||||||
*/ | ||||||||
class DelegatingBigQueryMetastoreCatalog extends CatalogExtension { | ||||||||
tchow-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
|
||||||||
@transient private lazy val bqOptions = BigQueryOptions.getDefaultInstance | ||||||||
@transient private lazy val bigQueryClient: BigQuery = bqOptions.getService | ||||||||
|
||||||||
@transient private lazy val icebergCatalog: SparkCatalog = new SparkCatalog() | ||||||||
@transient private lazy val connectorCatalog: BigQueryCatalog = new BigQueryCatalog() | ||||||||
|
||||||||
// Some stupid spark settings. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😆 are these required? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We don't really use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, I think there might be more correct way to map the GCP primitives to these values. I will do that in a followup and add unit tests. |
||||||||
private var defaultSessionCatalog: CatalogPlugin = null | ||||||||
private var catalogName: String = | ||||||||
null // This corresponds to `spark_catalog in `spark.sql.catalog.spark_catalog`. This is necessary for spark to correctly choose which implementation to use. | ||||||||
|
||||||||
override def listNamespaces: Array[Array[String]] = icebergCatalog.listNamespaces() | ||||||||
|
||||||||
override def listNamespaces(namespace: Array[String]): Array[Array[String]] = icebergCatalog.listNamespaces(namespace) | ||||||||
|
||||||||
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = | ||||||||
icebergCatalog.loadNamespaceMetadata(namespace) | ||||||||
|
||||||||
override def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit = { | ||||||||
icebergCatalog.createNamespace(namespace, metadata) | ||||||||
} | ||||||||
|
||||||||
override def purgeTable(ident: Identifier): Boolean = { | ||||||||
icebergCatalog.purgeTable(ident) | ||||||||
} | ||||||||
|
||||||||
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = { | ||||||||
icebergCatalog.alterNamespace(namespace, changes: _*) | ||||||||
} | ||||||||
|
||||||||
override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = | ||||||||
icebergCatalog.dropNamespace(namespace, cascade) | ||||||||
|
||||||||
override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for this method and the ones above it, do we eventually need to implement the bq catalog versions of them as fallbacks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as far as I can tell, the iceberg catalog will do regular bq catalog stuff as intended with the only difference being when it loads the table. It'll fail to |
||||||||
|
||||||||
override def loadTable(rawIdent: Identifier): Table = { | ||||||||
val ident = Identifier.of(rawIdent.namespace.flatMap(_.split("\\.")), rawIdent.name) | ||||||||
Try { icebergCatalog.loadTable(ident) } | ||||||||
.recover { | ||||||||
case _ => { | ||||||||
val tId = ident.namespace().toList match { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pull this out into its own function / class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will do in a followup: #461 |
||||||||
case database :: Nil => TableId.of(database, ident.name()) | ||||||||
case project :: database :: Nil => TableId.of(project, database, ident.name()) | ||||||||
case Nil => | ||||||||
throw new IllegalArgumentException(s"Table identifier namespace ${rawIdent} must have at least one part.") | ||||||||
} | ||||||||
val table = bigQueryClient.getTable(tId) | ||||||||
table.getDefinition.asInstanceOf[TableDefinition] match { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle null table. + require(table != null, s"Table with ID $tId not found in BigQuery.") 📝 Committable suggestion
Suggested change
|
||||||||
case externalTable: ExternalTableDefinition => { | ||||||||
val uris = externalTable.getSourceUris.asScala | ||||||||
val uri = scala | ||||||||
.Option(externalTable.getHivePartitioningOptions) | ||||||||
.map(_.getSourceUriPrefix) | ||||||||
.getOrElse { | ||||||||
require(uris.size == 1, s"External table ${table} can be backed by only one URI.") | ||||||||
uris.head.replaceAll("/\\*\\.parquet$", "") | ||||||||
} | ||||||||
Comment on lines
+131
to
+138
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainEnhance URI pattern validation. URI handling could fail silently if pattern doesn't match. 🏁 Script executed: #!/bin/bash
# Check for non-standard URI patterns in external tables
rg -A 2 'setSourceUris|setSourceUriPrefix' . Length of output: 963 Update URI Validation Logic
|
||||||||
|
||||||||
val fileBasedTable = ParquetTable(tId.toString, | ||||||||
SparkSession.active, | ||||||||
CaseInsensitiveStringMap.empty(), | ||||||||
List(uri), | ||||||||
None, | ||||||||
classOf[ParquetFileFormat]) | ||||||||
DelegatingTable(fileBasedTable, | ||||||||
Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri)) | ||||||||
} | ||||||||
case _: StandardTableDefinition => { | ||||||||
//todo(tchow): Support partitioning | ||||||||
|
||||||||
// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId. | ||||||||
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340 | ||||||||
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable)) | ||||||||
// ideally it should be the below: | ||||||||
// val connectorTable = connectorCatalog.loadTable(ident) | ||||||||
DelegatingTable(connectorTable, Map(TableCatalog.PROP_EXTERNAL -> "false")) | ||||||||
} | ||||||||
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}") | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
.getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(rawIdent)) | ||||||||
} | ||||||||
|
||||||||
override def createTable(ident: Identifier, | ||||||||
schema: StructType, | ||||||||
partitions: Array[Transform], | ||||||||
properties: util.Map[String, String]): Table = { | ||||||||
icebergCatalog.createTable(ident, schema, partitions, properties) | ||||||||
} | ||||||||
|
||||||||
override def alterTable(ident: Identifier, changes: TableChange*): Table = { | ||||||||
icebergCatalog.alterTable(ident, changes: _*) | ||||||||
} | ||||||||
|
||||||||
override def dropTable(ident: Identifier): Boolean = icebergCatalog.dropTable(ident) | ||||||||
|
||||||||
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { | ||||||||
icebergCatalog.renameTable(oldIdent, newIdent) | ||||||||
} | ||||||||
|
||||||||
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { | ||||||||
icebergCatalog.initialize(name, options) | ||||||||
connectorCatalog.initialize(name, options) | ||||||||
catalogName = name | ||||||||
} | ||||||||
|
||||||||
override def name(): String = catalogName | ||||||||
|
||||||||
override def setDelegateCatalog(delegate: CatalogPlugin): Unit = { | ||||||||
defaultSessionCatalog = delegate | ||||||||
} | ||||||||
|
||||||||
override def listFunctions(namespace: Array[String]): Array[Identifier] = icebergCatalog.listFunctions(namespace) | ||||||||
|
||||||||
override def loadFunction(ident: Identifier): UnboundFunction = icebergCatalog.loadFunction(ident) | ||||||||
|
||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed? or should we revert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the right change, we don't want this to remain in a failed state after we continue to push to existing PR's.