Skip to content

feat: bigquery catalog with iceberg support #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/require_triggered_status_checks.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: branch_protection
on:
pull_request:
push:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed? or should we revert?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the right change, we don't want this to remain in a failed state after we continue to push to existing PR's.

jobs:
enforce_triggered_workflows:
runs-on: ubuntu-latest
Expand Down
105 changes: 44 additions & 61 deletions cloud_gcp/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,70 +1,58 @@
scala_library(
name = "cloud_gcp_lib",
srcs = glob(["src/main/**/*.scala"]),
format = select({
"//tools/config:scala_2_13": False, # Disable for 2.13
"//conditions:default": True, # Enable for other versions
}),
visibility = ["//visibility:public"],
deps = [
"//api:lib",
"//api:thrift_java",
"//online:lib",
"//spark:lib",
"//tools/build_rules/spark:spark-exec",
scala_artifact_with_suffix("org.scala-lang.modules:scala-java8-compat"),
scala_artifact_with_suffix("org.json4s:json4s-core"),
scala_artifact_with_suffix("org.json4s:json4s-jackson"),
scala_artifact_with_suffix("org.json4s:json4s-ast"),
scala_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),
scala_artifact_with_suffix("org.rogach:scallop"),
maven_artifact("com.google.cloud:google-cloud-core"),
maven_artifact("com.google.cloud:google-cloud-bigquery"),
maven_artifact("com.google.cloud:google-cloud-bigtable"),
maven_artifact("com.google.cloud:google-cloud-pubsub"),
maven_artifact("com.google.cloud:google-cloud-dataproc"),
maven_artifact("com.google.cloud.bigdataoss:gcsio"),
maven_artifact("com.google.cloud.bigdataoss:util-hadoop"),
maven_artifact("com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler"),
scala_artifact_with_suffix("com.google.cloud.spark:spark-bigquery-with-dependencies"),
maven_artifact("com.google.api:api-common"),
maven_artifact("com.google.api.grpc:proto-google-cloud-dataproc-v1"),
maven_artifact("com.google.api:gax"),
maven_artifact("com.google.guava:guava"),
maven_artifact("com.google.protobuf:protobuf-java"),
maven_artifact("org.yaml:snakeyaml"),
maven_artifact("io.grpc:grpc-netty-shaded"),
maven_artifact("ch.qos.reload4j:reload4j"),
maven_artifact("org.slf4j:slf4j-api"),
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
maven_artifact("org.threeten:threetenbp"),
maven_artifact("org.apache.kafka:kafka-clients"),
],
)

test_deps = [
":cloud_gcp_lib",
"//api:thrift_java",
shared_deps = [
":iceberg_bigquery_catalog_lib",
"//api:lib",
"//api:thrift_java",
"//online:lib",
"//spark:lib",
"//tools/build_rules/spark:spark-exec",
# Libraries
scala_artifact_with_suffix("org.scala-lang.modules:scala-java8-compat"),
scala_artifact_with_suffix("org.json4s:json4s-core"),
scala_artifact_with_suffix("org.json4s:json4s-jackson"),
scala_artifact_with_suffix("org.json4s:json4s-ast"),
scala_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),
scala_artifact_with_suffix("org.rogach:scallop"),
maven_artifact("com.google.cloud:google-cloud-core"),
maven_artifact("com.google.cloud:google-cloud-bigquery"),
maven_artifact("com.google.cloud:google-cloud-bigtable"),
maven_artifact("com.google.cloud:google-cloud-pubsub"),
maven_artifact("com.google.cloud:google-cloud-dataproc"),
maven_artifact("com.google.cloud.bigdataoss:gcs-connector"),
maven_artifact("com.google.cloud.bigdataoss:gcsio"),
maven_artifact("com.google.cloud.bigdataoss:gcs-connector"),
maven_artifact("com.google.cloud.bigdataoss:util"),
maven_artifact("com.google.cloud.bigdataoss:util-hadoop"),
maven_artifact("com.google.cloud:google-cloud-bigtable-emulator"),
maven_artifact("org.apache.hadoop:hadoop-client-api"),
maven_artifact("com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler"),
maven_artifact("com.google.api:api-common"),
maven_artifact("com.google.api.grpc:proto-google-cloud-dataproc-v1"),
scala_artifact_with_suffix("com.google.cloud.spark:spark-bigquery-with-dependencies"),
maven_artifact("com.google.api:gax"),
maven_artifact("com.google.guava:guava"),
maven_artifact("com.google.protobuf:protobuf-java"),
maven_artifact("org.apache.hadoop:hadoop-client-api"),
maven_artifact("org.yaml:snakeyaml"),
maven_artifact("io.grpc:grpc-netty-shaded"),
maven_artifact("ch.qos.reload4j:reload4j"),
maven_artifact("org.slf4j:slf4j-api"),
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
maven_artifact("org.threeten:threetenbp"),
maven_artifact("org.apache.kafka:kafka-clients"),
maven_artifact("com.google.cloud.spark:spark-3.5-bigquery"),
scala_artifact_with_suffix("org.apache.iceberg:iceberg-spark-runtime-3.5"),
maven_artifact("org.objenesis:objenesis"),
]

scala_library(
name = "cloud_gcp_lib",
srcs = glob(["src/main/**/*.scala"]),
format = select({
"//tools/config:scala_2_13": False, # Disable for 2.13
"//conditions:default": True, # Enable for other versions
}),
visibility = ["//visibility:public"],
deps = shared_deps,
)

test_deps = [
maven_artifact("com.google.cloud:google-cloud-bigtable-emulator"),

# Testing
scala_artifact_with_suffix("org.scalatest:scalatest-matchers-core"),
scala_artifact_with_suffix("org.scalatest:scalatest-core"),
Expand All @@ -81,15 +69,10 @@ test_deps = [
maven_artifact("com.novocode:junit-interface"),
]

scala_library(
name = "test_lib",
srcs = glob(["src/test/**/*.scala"]),
format = select({
"//tools/config:scala_2_13": False, # Disable for 2.13
"//conditions:default": True, # Enable for other versions
}),
java_import(
name = "iceberg_bigquery_catalog_lib",
jars = ["iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar"],
visibility = ["//visibility:public"],
deps = test_deps,
)

scala_test_suite(
Expand All @@ -98,5 +81,5 @@ scala_test_suite(
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
deps = shared_deps + test_deps + [":cloud_gcp_lib"],
)
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark.ChrononKryoRegistrator
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.serializers.JavaSerializer
import org.apache.iceberg.gcp.gcs.GCSFileIO

class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
super.registerClasses(kryo)

// Have not been able to get kryo serialization to work with the closure in the GCSFileIO class.
// See: https://github.com/apache/iceberg/blob/cc4fe4cc50043ccba89700f7948090ff87a5baee/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java#L138-L173
// There are unit tests for this in the iceberg project: https://github.com/apache/iceberg/blob/cc4fe4cc50043ccba89700f7948090ff87a5baee/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java#L201-L209
// However for some reason this still fails when we run for real. Should consider testing this again once we
// bump iceberg versions. To test, we simply remove this line and run any integration job that writes iceberg to GCS.
kryo.register(classOf[GCSFileIO], new JavaSerializer)

val additionalClassNames = Seq(
"org.apache.iceberg.DataFile",
"org.apache.iceberg.FileContent",
"org.apache.iceberg.FileFormat",
"org.apache.iceberg.GenericDataFile",
"org.apache.iceberg.PartitionData",
"org.apache.iceberg.SerializableByteBufferMap",
"org.apache.iceberg.SerializableTable$SerializableConfSupplier",
"org.apache.iceberg.SnapshotRef",
"org.apache.iceberg.SnapshotRefType",
"org.apache.iceberg.encryption.PlaintextEncryptionManager",
"org.apache.iceberg.gcp.GCPProperties",
"org.apache.iceberg.hadoop.HadoopFileIO",
"org.apache.iceberg.hadoop.HadoopMetricsContext",
"org.apache.iceberg.MetadataTableType",
"org.apache.iceberg.io.ResolvingFileIO",
"org.apache.iceberg.spark.source.SerializableTableWithSize",
"org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize",
"org.apache.iceberg.spark.source.SparkWrite$TaskCommit",
"org.apache.iceberg.types.Types$DateType",
"org.apache.iceberg.types.Types$NestedField",
"org.apache.iceberg.types.Types$StructType",
"org.apache.iceberg.util.SerializableMap"
)
additionalClassNames.foreach(name => doRegister(name, kryo))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package ai.chronon.integrations.cloud_gcp

import com.google.cloud.bigquery.{
BigQuery,
BigQueryOptions,
ExternalTableDefinition,
StandardTableDefinition,
TableDefinition,
TableId
}
import com.google.cloud.spark.bigquery.BigQueryCatalog
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util
import scala.jdk.CollectionConverters._
import scala.util.Try

/** A table that delegates all operations to an internal table, but with additional properties.
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly.
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table
* with that information, before we pass it back to the Spark compute engine.
*
* Down the line, we could also support custom partition management.
*/
class DelegatingTable(internalTable: Table,
additionalProperties: Map[String, String],
partitioning: Option[Array[Transform]] = None)
extends Table
with SupportsRead
with SupportsWrite {

override def name(): String = internalTable.name

override def schema(): StructType = internalTable.schema

override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)

override def properties(): util.Map[String, String] =
(internalTable.properties().asScala ++ additionalProperties).asJava

override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning())

}

object DelegatingTable {
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table =
new DelegatingTable(table, additionalProperties = additionalProperties)
}

/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for
* querying of a variety of table types directly in spark sql or the dataframe api.
* This is analogous to iceberg's [[org.apache.iceberg.spark.SparkSessionCatalog]] in that it will
* apply a fallback when querying for tables. It will always attempt to load a table reference
* as an iceberg table first and falling back to bigquery.
*
* To interact with iceberg, we use Google's https://cloud.google.com/blog/products/data-analytics/introducing-bigquery-metastore-fully-managed-metadata-service
* metastore catalog library. By default, all catalog operations will delegate to this library, and this abstraction
* is meant to remain incredibly thin. BE CAREFUL WHEN OVERRIDING THIS BEHAVIOR. You shouldn't be needing too much additional
* functionality. Before you do this, consider upgrading the `iceberg_bigquery_catalog_lib` dependency and/or iceberg first.
*
* NOTE that this abstraction currently only supports querying tables that all belong to the same GCP project. Multi-project
* support will depend on underlying libraries to support them.
*/
class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {

@transient private lazy val bqOptions = BigQueryOptions.getDefaultInstance
@transient private lazy val bigQueryClient: BigQuery = bqOptions.getService

@transient private lazy val icebergCatalog: SparkCatalog = new SparkCatalog()
@transient private lazy val connectorCatalog: BigQueryCatalog = new BigQueryCatalog()

// Some stupid spark settings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆 are these required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalogName corresponds to spark_catalog in the config prop, and it is required for spark to accurately route to this catalog extension. Let me add a comment

We don't really use defaultSessionCatalog at all in the GCP situation, it basically is spark's default HiveCatalogImpl.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think there might be more correct way to map the GCP primitives to these values. I will do that in a followup and add unit tests.

private var defaultSessionCatalog: CatalogPlugin = null
private var catalogName: String =
null // This corresponds to `spark_catalog in `spark.sql.catalog.spark_catalog`. This is necessary for spark to correctly choose which implementation to use.

override def listNamespaces: Array[Array[String]] = icebergCatalog.listNamespaces()

override def listNamespaces(namespace: Array[String]): Array[Array[String]] = icebergCatalog.listNamespaces(namespace)

override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] =
icebergCatalog.loadNamespaceMetadata(namespace)

override def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit = {
icebergCatalog.createNamespace(namespace, metadata)
}

override def purgeTable(ident: Identifier): Boolean = {
icebergCatalog.purgeTable(ident)
}

override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
icebergCatalog.alterNamespace(namespace, changes: _*)
}

override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean =
icebergCatalog.dropNamespace(namespace, cascade)

override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this method and the ones above it, do we eventually need to implement the bq catalog versions of them as fallbacks?

Copy link
Collaborator Author

@tchow-zlai tchow-zlai Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I can tell, the iceberg catalog will do regular bq catalog stuff as intended with the only difference being when it loads the table. It'll fail to loadTable if it's not an iceberg table, so we try other clients first before finally failing.


override def loadTable(rawIdent: Identifier): Table = {
val ident = Identifier.of(rawIdent.namespace.flatMap(_.split("\\.")), rawIdent.name)
Try { icebergCatalog.loadTable(ident) }
.recover {
case _ => {
val tId = ident.namespace().toList match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull this out into its own function / class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in a followup: #461

case database :: Nil => TableId.of(database, ident.name())
case project :: database :: Nil => TableId.of(project, database, ident.name())
case Nil =>
throw new IllegalArgumentException(s"Table identifier namespace ${rawIdent} must have at least one part.")
}
val table = bigQueryClient.getTable(tId)
table.getDefinition.asInstanceOf[TableDefinition] match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle null table.
getTable may return null if the table doesn't exist, leading to NullPointerException at table.getDefinition.

+  require(table != null, s"Table with ID $tId not found in BigQuery.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
table.getDefinition.asInstanceOf[TableDefinition] match {
require(table != null, s"Table with ID $tId not found in BigQuery.")
table.getDefinition.asInstanceOf[TableDefinition] match {

case externalTable: ExternalTableDefinition => {
val uris = externalTable.getSourceUris.asScala
val uri = scala
.Option(externalTable.getHivePartitioningOptions)
.map(_.getSourceUriPrefix)
.getOrElse {
require(uris.size == 1, s"External table ${table} can be backed by only one URI.")
uris.head.replaceAll("/\\*\\.parquet$", "")
}
Comment on lines +131 to +138
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Enhance URI pattern validation.

URI handling could fail silently if pattern doesn't match.


🏁 Script executed:

#!/bin/bash
# Check for non-standard URI patterns in external tables
rg -A 2 'setSourceUris|setSourceUriPrefix' .

Length of output: 963


Update URI Validation Logic

  • In DelegatingBigQueryMetastoreCatalog.scala (lines 98–105), the call to replaceAll("/\\*\\.parquet$", "") may leave the URI unchanged if it doesn't match exactly, causing silent failures.
  • Please add explicit validation (or a warning/error) to detect when the regex doesn't affect the URI, ensuring that non-standard patterns don't slip through.


val fileBasedTable = ParquetTable(tId.toString,
SparkSession.active,
CaseInsensitiveStringMap.empty(),
List(uri),
None,
classOf[ParquetFileFormat])
DelegatingTable(fileBasedTable,
Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri))
}
case _: StandardTableDefinition => {
//todo(tchow): Support partitioning

// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
// ideally it should be the below:
// val connectorTable = connectorCatalog.loadTable(ident)
DelegatingTable(connectorTable, Map(TableCatalog.PROP_EXTERNAL -> "false"))
}
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
}
}
}
.getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(rawIdent))
}

override def createTable(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
icebergCatalog.createTable(ident, schema, partitions, properties)
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
icebergCatalog.alterTable(ident, changes: _*)
}

override def dropTable(ident: Identifier): Boolean = icebergCatalog.dropTable(ident)

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
icebergCatalog.renameTable(oldIdent, newIdent)
}

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
icebergCatalog.initialize(name, options)
connectorCatalog.initialize(name, options)
catalogName = name
}

override def name(): String = catalogName

override def setDelegateCatalog(delegate: CatalogPlugin): Unit = {
defaultSessionCatalog = delegate
}

override def listFunctions(namespace: Array[String]): Array[Identifier] = icebergCatalog.listFunctions(namespace)

override def loadFunction(ident: Identifier): UnboundFunction = icebergCatalog.loadFunction(ident)

}
Loading