Skip to content

Commit a91c224

Browse files
rebase
Co-authored-by: Thomas Chow <[email protected]>
1 parent 45bae2f commit a91c224

File tree

14 files changed

+6522
-2771
lines changed

14 files changed

+6522
-2771
lines changed

.github/workflows/require_triggered_status_checks.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: branch_protection
22
on:
3-
pull_request:
3+
push:
44
jobs:
55
enforce_triggered_workflows:
66
runs-on: ubuntu-latest

cloud_gcp/BUILD.bazel

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,5 @@
1-
scala_library(
2-
name = "cloud_gcp_lib",
3-
srcs = glob(["src/main/**/*.scala"]),
4-
format = select({
5-
"//tools/config:scala_2_13": False, # Disable for 2.13
6-
"//conditions:default": True, # Enable for other versions
7-
}),
8-
visibility = ["//visibility:public"],
9-
deps = [
1+
shared_deps = [
2+
":iceberg_bigquery_catalog_lib",
103
"//api:lib",
114
"//api:thrift_java",
125
"//online:lib",
@@ -24,9 +17,11 @@ scala_library(
2417
maven_artifact("com.google.cloud:google-cloud-pubsub"),
2518
maven_artifact("com.google.cloud:google-cloud-dataproc"),
2619
maven_artifact("com.google.cloud.bigdataoss:gcsio"),
20+
maven_artifact("com.google.cloud.bigdataoss:gcs-connector"),
21+
maven_artifact("com.google.cloud.bigdataoss:util"),
2722
maven_artifact("com.google.cloud.bigdataoss:util-hadoop"),
23+
maven_artifact("org.apache.hadoop:hadoop-client-api"),
2824
maven_artifact("com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler"),
29-
scala_artifact_with_suffix("com.google.cloud.spark:spark-bigquery-with-dependencies"),
3025
maven_artifact("com.google.api:api-common"),
3126
maven_artifact("com.google.api.grpc:proto-google-cloud-dataproc-v1"),
3227
maven_artifact("com.google.api:gax"),
@@ -39,7 +34,16 @@ scala_library(
3934
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
4035
maven_artifact("org.threeten:threetenbp"),
4136
maven_artifact("org.apache.kafka:kafka-clients"),
42-
],
37+
maven_artifact("com.google.cloud.spark:spark-3.5-bigquery"),
38+
scala_artifact_with_suffix("org.apache.iceberg:iceberg-spark-runtime-3.5"),
39+
]
40+
41+
scala_library(
42+
name = "cloud_gcp_lib",
43+
srcs = glob(["src/main/**/*.scala"]),
44+
format = True,
45+
visibility = ["//visibility:public"],
46+
deps = shared_deps,
4347
)
4448

4549
jvm_binary(
@@ -52,28 +56,8 @@ jvm_binary(
5256
)
5357

5458
test_deps = [
55-
":cloud_gcp_lib",
56-
"//api:thrift_java",
57-
"//api:lib",
58-
"//online:lib",
59-
"//spark:lib",
60-
"//tools/build_rules/spark:spark-exec",
61-
# Libraries
62-
scala_artifact_with_suffix("org.scala-lang.modules:scala-java8-compat"),
63-
scala_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),
64-
maven_artifact("com.google.cloud:google-cloud-bigquery"),
65-
maven_artifact("com.google.cloud:google-cloud-bigtable"),
66-
maven_artifact("com.google.cloud:google-cloud-dataproc"),
67-
maven_artifact("com.google.cloud.bigdataoss:gcs-connector"),
68-
maven_artifact("com.google.cloud.bigdataoss:gcsio"),
69-
maven_artifact("com.google.cloud.bigdataoss:util-hadoop"),
7059
maven_artifact("com.google.cloud:google-cloud-bigtable-emulator"),
71-
maven_artifact("com.google.api:api-common"),
72-
maven_artifact("com.google.api.grpc:proto-google-cloud-dataproc-v1"),
73-
scala_artifact_with_suffix("com.google.cloud.spark:spark-bigquery-with-dependencies"),
74-
maven_artifact("com.google.api:gax"),
75-
maven_artifact("com.google.protobuf:protobuf-java"),
76-
maven_artifact("org.apache.hadoop:hadoop-client-api"),
60+
7761
# Testing
7862
scala_artifact_with_suffix("org.scalatest:scalatest-matchers-core"),
7963
scala_artifact_with_suffix("org.scalatest:scalatest-core"),
@@ -90,15 +74,10 @@ test_deps = [
9074
maven_artifact("com.novocode:junit-interface"),
9175
]
9276

93-
scala_library(
94-
name = "test_lib",
95-
srcs = glob(["src/test/**/*.scala"]),
96-
format = select({
97-
"//tools/config:scala_2_13": False, # Disable for 2.13
98-
"//conditions:default": True, # Enable for other versions
99-
}),
77+
java_import(
78+
name = "iceberg_bigquery_catalog_lib",
79+
jars = ["iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar"],
10080
visibility = ["//visibility:public"],
101-
deps = test_deps,
10281
)
10382

10483
scala_test_suite(
@@ -107,5 +86,5 @@ scala_test_suite(
10786
# defined in prelude_bazel file
10887
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
10988
visibility = ["//visibility:public"],
110-
deps = test_deps + [":test_lib"],
89+
deps = shared_deps + test_deps + [":cloud_gcp_lib"],
11190
)
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package ai.chronon.integrations.cloud_gcp
2+
3+
import com.google.cloud.bigquery.{
4+
BigQuery,
5+
BigQueryOptions,
6+
ExternalTableDefinition,
7+
StandardTableDefinition,
8+
TableDefinition,
9+
TableId
10+
}
11+
import com.google.cloud.spark.bigquery.BigQueryCatalog
12+
import org.apache.iceberg.spark.SparkCatalog
13+
import org.apache.spark.sql.SparkSession
14+
import org.apache.spark.sql.connector.catalog._
15+
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
16+
import org.apache.spark.sql.connector.expressions.Transform
17+
import org.apache.spark.sql.connector.read.ScanBuilder
18+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
19+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
20+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
21+
import org.apache.spark.sql.types.StructType
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
23+
24+
import java.util
25+
import scala.jdk.CollectionConverters._
26+
import scala.util.Try
27+
28+
/** For now, just delegate to the iceberg catalog.
29+
* todo(tchow): Given a chronon catalog, delegate to the correct catalog.
30+
*
31+
* In order of resolution:
32+
* 1. BigQuery native
33+
* 2. Iceberg
34+
* 3. External tables
35+
*
36+
* @param chrononCat
37+
* @param icebergCatalog
38+
*/
39+
class DelegatingTable(internalTable: Table, additionalProperties: Map[String, String])
40+
extends Table
41+
with SupportsRead
42+
with SupportsWrite {
43+
44+
override def name(): String = internalTable.name
45+
46+
override def schema(): StructType = internalTable.schema
47+
48+
override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()
49+
50+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
51+
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
52+
53+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
54+
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
55+
56+
override def properties(): util.Map[String, String] =
57+
(internalTable.properties().asScala ++ additionalProperties).asJava
58+
}
59+
60+
object DelegatingTable {
61+
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table =
62+
new DelegatingTable(table, additionalProperties = additionalProperties)
63+
}
64+
65+
class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
66+
67+
@transient private lazy val bqOptions = BigQueryOptions.getDefaultInstance
68+
@transient private lazy val bigQueryClient: BigQuery = bqOptions.getService
69+
70+
@transient private lazy val icebergCatalog: SparkCatalog = new SparkCatalog()
71+
@transient private lazy val connectorCatalog: BigQueryCatalog = new BigQueryCatalog()
72+
private var defaultSessionCatalog: CatalogPlugin = null
73+
74+
override def listNamespaces: Array[Array[String]] = icebergCatalog.listNamespaces()
75+
76+
override def listNamespaces(namespace: Array[String]): Array[Array[String]] = icebergCatalog.listNamespaces(namespace)
77+
78+
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] =
79+
icebergCatalog.loadNamespaceMetadata(namespace)
80+
81+
override def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit = {
82+
icebergCatalog.createNamespace(namespace, metadata)
83+
}
84+
85+
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
86+
icebergCatalog.alterNamespace(namespace, changes: _*)
87+
}
88+
89+
override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean =
90+
icebergCatalog.dropNamespace(namespace, cascade)
91+
92+
override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace)
93+
94+
override def loadTable(ident: Identifier): Table = {
95+
Try { icebergCatalog.loadTable(ident) }
96+
.recover {
97+
case _ => {
98+
val connectorTable = connectorCatalog.loadTable(ident)
99+
val tId = ident.namespace().toList match {
100+
case database :: Nil => TableId.of(database, ident.name())
101+
case project :: database :: Nil => TableId.of(project, database, ident.name())
102+
}
103+
val table = bigQueryClient.getTable(tId)
104+
table.getDefinition.asInstanceOf[TableDefinition] match {
105+
case externalTable: ExternalTableDefinition => {
106+
val uris = externalTable.getSourceUris.asScala
107+
val uri = scala
108+
.Option(externalTable.getHivePartitioningOptions)
109+
.map(_.getSourceUriPrefix)
110+
.getOrElse {
111+
require(uris.size == 1, s"External table ${table} can be backed by only one URI.")
112+
uris.head.replaceAll("/\\*\\.parquet$", "")
113+
}
114+
115+
val fileBasedTable = ParquetTable(tId.toString,
116+
SparkSession.active,
117+
CaseInsensitiveStringMap.empty(),
118+
List(uri),
119+
None,
120+
classOf[ParquetFileFormat])
121+
DelegatingTable(fileBasedTable,
122+
Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri))
123+
}
124+
case _: StandardTableDefinition => {
125+
DelegatingTable(connectorTable, Map(TableCatalog.PROP_EXTERNAL -> "false"))
126+
}
127+
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
128+
}
129+
}
130+
}
131+
.getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(ident))
132+
}
133+
134+
override def createTable(ident: Identifier,
135+
schema: StructType,
136+
partitions: Array[Transform],
137+
properties: util.Map[String, String]): Table = {
138+
icebergCatalog.createTable(ident, schema, partitions, properties)
139+
}
140+
141+
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
142+
icebergCatalog.alterTable(ident, changes: _*)
143+
}
144+
145+
override def dropTable(ident: Identifier): Boolean = icebergCatalog.dropTable(ident)
146+
147+
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
148+
icebergCatalog.renameTable(oldIdent, newIdent)
149+
}
150+
151+
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
152+
icebergCatalog.initialize(name, options)
153+
connectorCatalog.initialize(name, options)
154+
}
155+
156+
override def name() = "bigquery-delegate"
157+
158+
override def setDelegateCatalog(delegate: CatalogPlugin): Unit = {
159+
defaultSessionCatalog = delegate
160+
}
161+
162+
override def listFunctions(namespace: Array[String]): Array[Identifier] = icebergCatalog.listFunctions(namespace)
163+
164+
override def loadFunction(ident: Identifier): UnboundFunction = icebergCatalog.loadFunction(ident)
165+
166+
}

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,15 @@
11
package ai.chronon.integrations.cloud_gcp
22

33
import ai.chronon.api.Extensions.StringOps
4-
import ai.chronon.api.ScalaJavaConversions.JListOps
54
import ai.chronon.spark.TableUtils
6-
import ai.chronon.spark.TableUtils.{TableCreatedWithInitialData, TableCreationStatus}
5+
import ai.chronon.spark.TableUtils.{TableCreatedWithoutInitialData, TableCreationStatus}
76
import ai.chronon.spark.format.Format
87
import com.google.cloud.bigquery.connector.common.BigQueryUtil
9-
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.{
10-
BigQuery,
11-
BigQueryOptions,
12-
ExternalTableDefinition,
13-
FormatOptions,
14-
HivePartitioningOptions,
15-
TableInfo
16-
}
17-
import com.google.cloud.spark.bigquery.{SchemaConverters, SchemaConvertersConfiguration}
18-
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
8+
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery._
199
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2010
import org.apache.spark.sql.execution.FileSourceScanExec
2111
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
12+
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
2213
import org.slf4j.LoggerFactory
2314

2415
case class GCS(sourceUri: String, fileFormat: String) extends Format {
@@ -116,31 +107,39 @@ case class GCS(sourceUri: String, fileFormat: String) extends Format {
116107
|partition uri: $path
117108
|""".stripMargin)
118109

119-
df.write
120-
.partitionBy(partitionColumns: _*)
121-
.mode("overwrite") // or "append" based on your needs
122-
.parquet(path)
123-
124-
val baseTableDef = ExternalTableDefinition
125-
.newBuilder(dataGlob, FormatOptions.parquet())
126-
.setAutodetect(true)
127-
128-
if (partitionColumns.nonEmpty) {
129-
val timePartitioning = HivePartitioningOptions
130-
.newBuilder()
131-
.setFields(partitionColumns.toJava)
132-
.setSourceUriPrefix(path)
133-
.setMode("STRINGS")
134-
.build()
135-
baseTableDef.setHivePartitioningOptions(timePartitioning)
136-
}
137-
138-
val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build()
139-
val createdTable = bigQueryClient.create(tableInfo)
140-
141-
println(s"Created external table ${createdTable.getTableId}")
142-
143-
TableCreatedWithInitialData
110+
val partCols = partitionColumns.map(df.col)
111+
112+
val noProjectTableName = f"${shadedTableId.getDataset}.${shadedTableId.getTable}"
113+
114+
val tableWriter =
115+
df.writeTo(noProjectTableName)
116+
.tableProperty("write.format.default", "parquet")
117+
118+
partCols.headOption
119+
.map((c) => tableWriter.partitionedBy(c, partCols.tail: _*))
120+
.getOrElse(tableWriter)
121+
.using("iceberg")
122+
.create()
123+
// val baseTableDef = ExternalTableDefinition
124+
// .newBuilder(dataGlob, FormatOptions.parquet())
125+
// .setAutodetect(true)
126+
//
127+
// if (partitionColumns.nonEmpty) {
128+
// val timePartitioning = HivePartitioningOptions
129+
// .newBuilder()
130+
// .setFields(partitionColumns.toJava)
131+
// .setSourceUriPrefix(path)
132+
// .setMode("STRINGS")
133+
// .build()
134+
// baseTableDef.setHivePartitioningOptions(timePartitioning)
135+
// }
136+
//
137+
// val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build()
138+
// val createdTable = bigQueryClient.create(tableInfo)
139+
//
140+
// println(s"Created external table ${createdTable.getTableId}")
141+
142+
TableCreatedWithoutInitialData
144143
}
145144

146145
inner(df, tableName, partitionColumns)

0 commit comments

Comments
 (0)