Skip to content

Commit 77fc198

Browse files
feat: bigquery catalog with iceberg support (#393)
## Summary Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this: 1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading Etsy's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables. 2. `GCSFileOSerializer` is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall back to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine. 3. Some serialization unit tests. 4. Lots of jar wrangling to get things to work in the right way. We'll have to make sure this doesn't break the streaming / fetching side of things as well. Things to note are: 1. Had to submit a patch to the spark bigquery connector code GoogleCloudDataproc/spark-bigquery-connector#1340 because the connector does not support three-part namespacing. As such, you can only query tables that belong to the same project within a single sql query until the above patch is in. 2. You can only write icberg tables to the currently configured project. The project is configured using `additional-confs.yaml` and I used the following config set to test this behavior: ```yaml spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider" spark.chronon.partition.format: "yyyy-MM-dd" spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary" spark.chronon.partition.column: "ds" spark.chronon.table.gcs.connector_output_dataset: "data" spark.chronon.table.gcs.connector_output_project: "canary-443022" spark.chronon.coalesce.factor: "10" spark.default.parallelism: "10" spark.sql.shuffle.partitions: "10" spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.gcp_location: "us-central1" spark.sql.catalog.spark_catalog.gcp_project: "canary-443022" spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator" ``` 3. I had to remove https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121 from the cluster because it conflicts with the metastore and connector jar being brought in here as dependencies. We'll need to rebuild our clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai 4. Also made a change to the canary branch zipline-ai/canary-confs@65fac34 to remove the project ID. This is not supported using the catalogs we have. The configured project `spark.sql.catalog.spark_catalog.gcp_project` is taken into account across the board. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced enhanced catalog management for seamless integration between BigQuery and Iceberg. - Added custom serialization logic to improve stability and performance. - **Refactor / Dependency Updates** - Streamlined dependency management and updated various library versions for smoother runtime behavior. - Refined Spark session configuration to ensure consistent application of settings. - Added new dependencies related to Hadoop client API. - **Tests** - Expanded integration tests for BigQuery functionality and Kryo serialization. - Removed obsolete test cases to focus on relevant validation. - **CI/CD** - Updated workflow triggers to activate on push events for improved integration responsiveness. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 3081d0e commit 77fc198

18 files changed

+7107
-3526
lines changed

.github/workflows/require_triggered_status_checks.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: branch_protection
22
on:
3-
pull_request:
3+
push:
44
jobs:
55
enforce_triggered_workflows:
66
runs-on: ubuntu-latest

cloud_gcp/BUILD.bazel

Lines changed: 44 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,58 @@
1-
scala_library(
2-
name = "cloud_gcp_lib",
3-
srcs = glob(["src/main/**/*.scala"]),
4-
format = select({
5-
"//tools/config:scala_2_13": False, # Disable for 2.13
6-
"//conditions:default": True, # Enable for other versions
7-
}),
8-
visibility = ["//visibility:public"],
9-
deps = [
10-
"//api:lib",
11-
"//api:thrift_java",
12-
"//online:lib",
13-
"//spark:lib",
14-
"//tools/build_rules/spark:spark-exec",
15-
scala_artifact_with_suffix("org.scala-lang.modules:scala-java8-compat"),
16-
scala_artifact_with_suffix("org.json4s:json4s-core"),
17-
scala_artifact_with_suffix("org.json4s:json4s-jackson"),
18-
scala_artifact_with_suffix("org.json4s:json4s-ast"),
19-
scala_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),
20-
scala_artifact_with_suffix("org.rogach:scallop"),
21-
maven_artifact("com.google.cloud:google-cloud-core"),
22-
maven_artifact("com.google.cloud:google-cloud-bigquery"),
23-
maven_artifact("com.google.cloud:google-cloud-bigtable"),
24-
maven_artifact("com.google.cloud:google-cloud-pubsub"),
25-
maven_artifact("com.google.cloud:google-cloud-dataproc"),
26-
maven_artifact("com.google.cloud.bigdataoss:gcsio"),
27-
maven_artifact("com.google.cloud.bigdataoss:util-hadoop"),
28-
maven_artifact("com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler"),
29-
scala_artifact_with_suffix("com.google.cloud.spark:spark-bigquery-with-dependencies"),
30-
maven_artifact("com.google.api:api-common"),
31-
maven_artifact("com.google.api.grpc:proto-google-cloud-dataproc-v1"),
32-
maven_artifact("com.google.api:gax"),
33-
maven_artifact("com.google.guava:guava"),
34-
maven_artifact("com.google.protobuf:protobuf-java"),
35-
maven_artifact("org.yaml:snakeyaml"),
36-
maven_artifact("io.grpc:grpc-netty-shaded"),
37-
maven_artifact("ch.qos.reload4j:reload4j"),
38-
maven_artifact("org.slf4j:slf4j-api"),
39-
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
40-
maven_artifact("org.threeten:threetenbp"),
41-
maven_artifact("org.apache.kafka:kafka-clients"),
42-
],
43-
)
44-
45-
test_deps = [
46-
":cloud_gcp_lib",
47-
"//api:thrift_java",
1+
shared_deps = [
2+
":iceberg_bigquery_catalog_lib",
483
"//api:lib",
4+
"//api:thrift_java",
495
"//online:lib",
506
"//spark:lib",
517
"//tools/build_rules/spark:spark-exec",
52-
# Libraries
538
scala_artifact_with_suffix("org.scala-lang.modules:scala-java8-compat"),
9+
scala_artifact_with_suffix("org.json4s:json4s-core"),
10+
scala_artifact_with_suffix("org.json4s:json4s-jackson"),
11+
scala_artifact_with_suffix("org.json4s:json4s-ast"),
5412
scala_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),
13+
scala_artifact_with_suffix("org.rogach:scallop"),
14+
maven_artifact("com.google.cloud:google-cloud-core"),
5515
maven_artifact("com.google.cloud:google-cloud-bigquery"),
5616
maven_artifact("com.google.cloud:google-cloud-bigtable"),
17+
maven_artifact("com.google.cloud:google-cloud-pubsub"),
5718
maven_artifact("com.google.cloud:google-cloud-dataproc"),
58-
maven_artifact("com.google.cloud.bigdataoss:gcs-connector"),
5919
maven_artifact("com.google.cloud.bigdataoss:gcsio"),
20+
maven_artifact("com.google.cloud.bigdataoss:gcs-connector"),
21+
maven_artifact("com.google.cloud.bigdataoss:util"),
6022
maven_artifact("com.google.cloud.bigdataoss:util-hadoop"),
61-
maven_artifact("com.google.cloud:google-cloud-bigtable-emulator"),
23+
maven_artifact("org.apache.hadoop:hadoop-client-api"),
24+
maven_artifact("com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler"),
6225
maven_artifact("com.google.api:api-common"),
6326
maven_artifact("com.google.api.grpc:proto-google-cloud-dataproc-v1"),
64-
scala_artifact_with_suffix("com.google.cloud.spark:spark-bigquery-with-dependencies"),
6527
maven_artifact("com.google.api:gax"),
28+
maven_artifact("com.google.guava:guava"),
6629
maven_artifact("com.google.protobuf:protobuf-java"),
67-
maven_artifact("org.apache.hadoop:hadoop-client-api"),
30+
maven_artifact("org.yaml:snakeyaml"),
31+
maven_artifact("io.grpc:grpc-netty-shaded"),
32+
maven_artifact("ch.qos.reload4j:reload4j"),
33+
maven_artifact("org.slf4j:slf4j-api"),
34+
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
35+
maven_artifact("org.threeten:threetenbp"),
36+
maven_artifact("org.apache.kafka:kafka-clients"),
37+
maven_artifact("com.google.cloud.spark:spark-3.5-bigquery"),
38+
scala_artifact_with_suffix("org.apache.iceberg:iceberg-spark-runtime-3.5"),
39+
maven_artifact("org.objenesis:objenesis"),
40+
]
41+
42+
scala_library(
43+
name = "cloud_gcp_lib",
44+
srcs = glob(["src/main/**/*.scala"]),
45+
format = select({
46+
"//tools/config:scala_2_13": False, # Disable for 2.13
47+
"//conditions:default": True, # Enable for other versions
48+
}),
49+
visibility = ["//visibility:public"],
50+
deps = shared_deps,
51+
)
52+
53+
test_deps = [
54+
maven_artifact("com.google.cloud:google-cloud-bigtable-emulator"),
55+
6856
# Testing
6957
scala_artifact_with_suffix("org.scalatest:scalatest-matchers-core"),
7058
scala_artifact_with_suffix("org.scalatest:scalatest-core"),
@@ -81,15 +69,10 @@ test_deps = [
8169
maven_artifact("com.novocode:junit-interface"),
8270
]
8371

84-
scala_library(
85-
name = "test_lib",
86-
srcs = glob(["src/test/**/*.scala"]),
87-
format = select({
88-
"//tools/config:scala_2_13": False, # Disable for 2.13
89-
"//conditions:default": True, # Enable for other versions
90-
}),
72+
java_import(
73+
name = "iceberg_bigquery_catalog_lib",
74+
jars = ["iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar"],
9175
visibility = ["//visibility:public"],
92-
deps = test_deps,
9376
)
9477

9578
scala_test_suite(
@@ -98,5 +81,5 @@ scala_test_suite(
9881
# defined in prelude_bazel file
9982
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
10083
visibility = ["//visibility:public"],
101-
deps = test_deps + [":test_lib"],
84+
deps = shared_deps + test_deps + [":cloud_gcp_lib"],
10285
)
Binary file not shown.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package ai.chronon.integrations.cloud_gcp
2+
import ai.chronon.spark.ChrononKryoRegistrator
3+
import com.esotericsoftware.kryo.Kryo
4+
import com.esotericsoftware.kryo.serializers.JavaSerializer
5+
import org.apache.iceberg.gcp.gcs.GCSFileIO
6+
7+
class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator {
8+
override def registerClasses(kryo: Kryo): Unit = {
9+
super.registerClasses(kryo)
10+
11+
// Have not been able to get kryo serialization to work with the closure in the GCSFileIO class.
12+
// See: https://github.com/apache/iceberg/blob/cc4fe4cc50043ccba89700f7948090ff87a5baee/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java#L138-L173
13+
// There are unit tests for this in the iceberg project: https://github.com/apache/iceberg/blob/cc4fe4cc50043ccba89700f7948090ff87a5baee/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java#L201-L209
14+
// However for some reason this still fails when we run for real. Should consider testing this again once we
15+
// bump iceberg versions. To test, we simply remove this line and run any integration job that writes iceberg to GCS.
16+
kryo.register(classOf[GCSFileIO], new JavaSerializer)
17+
18+
val additionalClassNames = Seq(
19+
"org.apache.iceberg.DataFile",
20+
"org.apache.iceberg.FileContent",
21+
"org.apache.iceberg.FileFormat",
22+
"org.apache.iceberg.GenericDataFile",
23+
"org.apache.iceberg.PartitionData",
24+
"org.apache.iceberg.SerializableByteBufferMap",
25+
"org.apache.iceberg.SerializableTable$SerializableConfSupplier",
26+
"org.apache.iceberg.SnapshotRef",
27+
"org.apache.iceberg.SnapshotRefType",
28+
"org.apache.iceberg.encryption.PlaintextEncryptionManager",
29+
"org.apache.iceberg.gcp.GCPProperties",
30+
"org.apache.iceberg.hadoop.HadoopFileIO",
31+
"org.apache.iceberg.hadoop.HadoopMetricsContext",
32+
"org.apache.iceberg.MetadataTableType",
33+
"org.apache.iceberg.io.ResolvingFileIO",
34+
"org.apache.iceberg.spark.source.SerializableTableWithSize",
35+
"org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize",
36+
"org.apache.iceberg.spark.source.SparkWrite$TaskCommit",
37+
"org.apache.iceberg.types.Types$DateType",
38+
"org.apache.iceberg.types.Types$NestedField",
39+
"org.apache.iceberg.types.Types$StructType",
40+
"org.apache.iceberg.util.SerializableMap"
41+
)
42+
additionalClassNames.foreach(name => doRegister(name, kryo))
43+
}
44+
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package ai.chronon.integrations.cloud_gcp
2+
3+
import com.google.cloud.bigquery.{
4+
BigQuery,
5+
BigQueryOptions,
6+
ExternalTableDefinition,
7+
StandardTableDefinition,
8+
TableDefinition,
9+
TableId
10+
}
11+
import com.google.cloud.spark.bigquery.BigQueryCatalog
12+
import org.apache.iceberg.spark.SparkCatalog
13+
import org.apache.spark.sql.SparkSession
14+
import org.apache.spark.sql.connector.catalog._
15+
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
16+
import org.apache.spark.sql.connector.expressions.Transform
17+
import org.apache.spark.sql.connector.read.ScanBuilder
18+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
19+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
20+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
21+
import org.apache.spark.sql.types.StructType
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
23+
24+
import java.util
25+
import scala.jdk.CollectionConverters._
26+
import scala.util.Try
27+
28+
/** A table that delegates all operations to an internal table, but with additional properties.
29+
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly.
30+
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table
31+
* with that information, before we pass it back to the Spark compute engine.
32+
*
33+
* Down the line, we could also support custom partition management.
34+
*/
35+
class DelegatingTable(internalTable: Table,
36+
additionalProperties: Map[String, String],
37+
partitioning: Option[Array[Transform]] = None)
38+
extends Table
39+
with SupportsRead
40+
with SupportsWrite {
41+
42+
override def name(): String = internalTable.name
43+
44+
override def schema(): StructType = internalTable.schema
45+
46+
override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()
47+
48+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
49+
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
50+
51+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
52+
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
53+
54+
override def properties(): util.Map[String, String] =
55+
(internalTable.properties().asScala ++ additionalProperties).asJava
56+
57+
override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning())
58+
59+
}
60+
61+
object DelegatingTable {
62+
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table =
63+
new DelegatingTable(table, additionalProperties = additionalProperties)
64+
}
65+
66+
/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for
67+
* querying of a variety of table types directly in spark sql or the dataframe api.
68+
* This is analogous to iceberg's [[org.apache.iceberg.spark.SparkSessionCatalog]] in that it will
69+
* apply a fallback when querying for tables. It will always attempt to load a table reference
70+
* as an iceberg table first and falling back to bigquery.
71+
*
72+
* To interact with iceberg, we use Google's https://cloud.google.com/blog/products/data-analytics/introducing-bigquery-metastore-fully-managed-metadata-service
73+
* metastore catalog library. By default, all catalog operations will delegate to this library, and this abstraction
74+
* is meant to remain incredibly thin. BE CAREFUL WHEN OVERRIDING THIS BEHAVIOR. You shouldn't be needing too much additional
75+
* functionality. Before you do this, consider upgrading the `iceberg_bigquery_catalog_lib` dependency and/or iceberg first.
76+
*
77+
* NOTE that this abstraction currently only supports querying tables that all belong to the same GCP project. Multi-project
78+
* support will depend on underlying libraries to support them.
79+
*/
80+
class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
81+
82+
@transient private lazy val bqOptions = BigQueryOptions.getDefaultInstance
83+
@transient private lazy val bigQueryClient: BigQuery = bqOptions.getService
84+
85+
@transient private lazy val icebergCatalog: SparkCatalog = new SparkCatalog()
86+
@transient private lazy val connectorCatalog: BigQueryCatalog = new BigQueryCatalog()
87+
88+
// Some stupid spark settings.
89+
private var defaultSessionCatalog: CatalogPlugin = null
90+
private var catalogName: String =
91+
null // This corresponds to `spark_catalog in `spark.sql.catalog.spark_catalog`. This is necessary for spark to correctly choose which implementation to use.
92+
93+
override def listNamespaces: Array[Array[String]] = icebergCatalog.listNamespaces()
94+
95+
override def listNamespaces(namespace: Array[String]): Array[Array[String]] = icebergCatalog.listNamespaces(namespace)
96+
97+
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] =
98+
icebergCatalog.loadNamespaceMetadata(namespace)
99+
100+
override def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit = {
101+
icebergCatalog.createNamespace(namespace, metadata)
102+
}
103+
104+
override def purgeTable(ident: Identifier): Boolean = {
105+
icebergCatalog.purgeTable(ident)
106+
}
107+
108+
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
109+
icebergCatalog.alterNamespace(namespace, changes: _*)
110+
}
111+
112+
override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean =
113+
icebergCatalog.dropNamespace(namespace, cascade)
114+
115+
override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace)
116+
117+
override def loadTable(rawIdent: Identifier): Table = {
118+
val ident = Identifier.of(rawIdent.namespace.flatMap(_.split("\\.")), rawIdent.name)
119+
Try { icebergCatalog.loadTable(ident) }
120+
.recover {
121+
case _ => {
122+
val tId = ident.namespace().toList match {
123+
case database :: Nil => TableId.of(database, ident.name())
124+
case project :: database :: Nil => TableId.of(project, database, ident.name())
125+
case Nil =>
126+
throw new IllegalArgumentException(s"Table identifier namespace ${rawIdent} must have at least one part.")
127+
}
128+
val table = bigQueryClient.getTable(tId)
129+
table.getDefinition.asInstanceOf[TableDefinition] match {
130+
case externalTable: ExternalTableDefinition => {
131+
val uris = externalTable.getSourceUris.asScala
132+
val uri = scala
133+
.Option(externalTable.getHivePartitioningOptions)
134+
.map(_.getSourceUriPrefix)
135+
.getOrElse {
136+
require(uris.size == 1, s"External table ${table} can be backed by only one URI.")
137+
uris.head.replaceAll("/\\*\\.parquet$", "")
138+
}
139+
140+
val fileBasedTable = ParquetTable(tId.toString,
141+
SparkSession.active,
142+
CaseInsensitiveStringMap.empty(),
143+
List(uri),
144+
None,
145+
classOf[ParquetFileFormat])
146+
DelegatingTable(fileBasedTable,
147+
Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri))
148+
}
149+
case _: StandardTableDefinition => {
150+
//todo(tchow): Support partitioning
151+
152+
// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
153+
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340
154+
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
155+
// ideally it should be the below:
156+
// val connectorTable = connectorCatalog.loadTable(ident)
157+
DelegatingTable(connectorTable, Map(TableCatalog.PROP_EXTERNAL -> "false"))
158+
}
159+
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
160+
}
161+
}
162+
}
163+
.getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(rawIdent))
164+
}
165+
166+
override def createTable(ident: Identifier,
167+
schema: StructType,
168+
partitions: Array[Transform],
169+
properties: util.Map[String, String]): Table = {
170+
icebergCatalog.createTable(ident, schema, partitions, properties)
171+
}
172+
173+
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
174+
icebergCatalog.alterTable(ident, changes: _*)
175+
}
176+
177+
override def dropTable(ident: Identifier): Boolean = icebergCatalog.dropTable(ident)
178+
179+
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
180+
icebergCatalog.renameTable(oldIdent, newIdent)
181+
}
182+
183+
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
184+
icebergCatalog.initialize(name, options)
185+
connectorCatalog.initialize(name, options)
186+
catalogName = name
187+
}
188+
189+
override def name(): String = catalogName
190+
191+
override def setDelegateCatalog(delegate: CatalogPlugin): Unit = {
192+
defaultSessionCatalog = delegate
193+
}
194+
195+
override def listFunctions(namespace: Array[String]): Array[Identifier] = icebergCatalog.listFunctions(namespace)
196+
197+
override def loadFunction(ident: Identifier): UnboundFunction = icebergCatalog.loadFunction(ident)
198+
199+
}

0 commit comments

Comments
 (0)