Skip to content

Commit ec8cec4

Browse files
feat: Support iceberg reads and writes (#424)
## Summary - Modifying TableUtils to use spark's built-in catalog wherever applicable. This will both improve performance and allow us to access iceberg tables through the bigquery catalog, since its built on #393. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced external cheour clientsout queries now deliver additional purchase details for richer data insights. - **Refactor** - Updated cloud integrations to enforce supported table formats and provide clearer error messaging. - Streamlined table creation and partitioning processes in Spark for improved performance and maintainability. - Standardized collection types for partitioning and sorting parameters in Dataframe operations. - **Tests** - Refined test cases to validate dynamic provider handling and verify consistent table operations. - Removed outdated test case for field name retrieval, reflecting changes in validation focus. - Updated assertions to utilize the new format provider access method. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 7668985 commit ec8cec4

File tree

21 files changed

+204
-330
lines changed

21 files changed

+204
-330
lines changed

.bazelrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ build --java_language_version=11
1616
build --java_runtime_version=11
1717
build --remote_cache=https://storage.googleapis.com/zipline-bazel-cache
1818
test --test_output=errors
19-
test --test_timeout=900
19+
test --test_timeout=1200

api/py/test/sample/staging_queries/quickstart/checkouts_external.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717

1818

1919
query = """
20-
SELECT
21-
ts,
22-
ds,
23-
return_id,
24-
user_id,
25-
product_id,
26-
refund_amt
27-
FROM cheour clientsouts_external
28-
WHERE ds BETWEEN '{{ start_date }}' AND '{{ end_date }}'
20+
SELECT
21+
purchases.ds,
22+
purchases.ts as purchase_ts,
23+
purchases.user_id,
24+
purchases.purchase_price,
25+
cheour clientsouts.return_id,
26+
cheour clientsouts.refund_amt,
27+
cheour clientsouts.product_id,
28+
cheour clientsouts.ts as cheour clientsout_ts
29+
FROM data.purchases AS purchases
30+
LEFT OUTER JOIN data.cheour clientsouts_external AS cheour clientsouts
31+
USING (user_id)
32+
WHERE purchases.ds BETWEEN '{{ start_date }}' AND '{{ end_date }}'
2933
"""
3034

3135
staging_query = StagingQuery(
@@ -35,7 +39,4 @@
3539
name='cheour clientsouts_staging_query',
3640
outputNamespace="data"
3741
),
38-
setups=[
39-
"CREATE OR REPLACE TEMPORARY VIEW cheour clientsouts_external USING parquet OPTIONS (path 'gs://zl-warehouse/data/cheour clientsouts_ds_not_in_parquet/')",
40-
],
4142
)

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 13 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,36 @@
11
paour clientsage ai.chronon.integrations.cloud_gcp
22

33
import ai.chronon.spark.TableUtils
4-
import ai.chronon.spark.TableUtils.TableCreationStatus
54
import ai.chronon.spark.format.Format
5+
import com.google.cloud.bigquery.BigQuery
66
import com.google.cloud.bigquery.connector.common.BigQueryUtil
7-
import com.google.cloud.spark.bigquery.SchemaConverters
8-
import com.google.cloud.spark.bigquery.SchemaConvertersConfiguration
9-
import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigquery.BigQuery
10-
import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigquery.StandardTableDefinition
11-
import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigquery.TableInfo
12-
import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigquery.TimePartitioning
13-
import org.apache.spark.sql.DataFrame
14-
import org.apache.spark.sql.SparkSession
15-
import org.apache.spark.sql.functions.col
16-
import org.apache.spark.sql.functions.date_format
17-
import org.apache.spark.sql.functions.to_date
7+
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTableProvider
8+
import org.apache.spark.sql.{DataFrame, SparkSession}
9+
import org.apache.spark.sql.functions.{col, date_format, to_date}
1810

1911
case class BigQueryFormat(project: String, bqClient: BigQuery, override val options: Map[String, String])
2012
extends Format {
2113
override def name: String = "bigquery"
2214

15+
private val bqFormat = classOf[Spark35BigQueryTableProvider].getName
16+
2317
override def alterTableProperties(tableName: String,
2418
tableProperties: Map[String, String]): (String => Unit) => Unit = {
2519
throw new NotImplementedError("alterTableProperties not yet supported for BigQuery")
2620
}
2721

2822
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
29-
implicit sparkSession: SparkSession): Seq[String] =
23+
implicit sparkSession: SparkSession): List[String] =
3024
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)
3125
override def generateTableBuilder(df: DataFrame,
3226
tableName: String,
33-
partitionColumns: Seq[String],
27+
partitionColumns: List[String],
3428
tableProperties: Map[String, String],
35-
fileFormat: String): (String => Unit) => TableCreationStatus = {
36-
37-
def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(
38-
sqlEvaluator: String => Unit): TableCreationStatus = {
39-
40-
// See: https://cloud.google.com/bigquery/docs/partitioned-tables#limitations
41-
// "BigQuery does not support partitioning by multiple columns. Only one column can be used to partition a table."
42-
assert(partitionColumns.size < 2,
43-
s"BigQuery only supports at most one partition column, incoming spec: ${partitionColumns}")
44-
val shadedTableId = BigQueryUtil.parseTableId(tableName)
45-
46-
val shadedBqSchema =
47-
SchemaConverters.from(SchemaConvertersConfiguration.createDefault()).toBigQuerySchema(df.schema)
48-
49-
val baseTableDef = StandardTableDefinition.newBuilder
50-
.setSchema(shadedBqSchema)
51-
52-
val tableDefinition = partitionColumns.headOption
53-
.map((col) => {
54-
val timePartitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField(col)
55-
baseTableDef
56-
.setTimePartitioning(timePartitioning.build())
57-
})
58-
.getOrElse(baseTableDef)
59-
60-
val tableInfoBuilder = TableInfo.newBuilder(shadedTableId, tableDefinition.build)
61-
62-
val tableInfo = tableInfoBuilder.build
63-
bqClient.create(tableInfo)
64-
TableUtils.TableCreatedWithoutInitialData
65-
}
66-
67-
inner(df, tableName, partitionColumns)
29+
fileFormat: String): (String => Unit) => Unit = {
30+
throw new UnsupportedOperationException("generateTableBuilder not supported for BigQuery")
6831
}
6932

70-
override def partitions(tableName: String)(implicit sparkSession: SparkSession): Seq[Map[String, String]] = {
33+
override def partitions(tableName: String)(implicit sparkSession: SparkSession): List[Map[String, String]] = {
7134
import sparkSession.implicits._
7235
val tableIdentifier = BigQueryUtil.parseTableId(tableName)
7336
val table = tableIdentifier.getTable
@@ -83,7 +46,7 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
8346
|""".stripMargin
8447

8548
val partitionCol = sparkSession.read
86-
.format("bigquery")
49+
.format(bqFormat)
8750
.option("project", project)
8851
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
8952
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations
@@ -109,7 +72,7 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
10972
val partitionFormat = TableUtils(sparkSession).partitionFormat
11073

11174
val partitionInfoDf = sparkSession.read
112-
.format("bigquery")
75+
.format(bqFormat)
11376
.option("project", project)
11477
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
11578
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ class DelegatingBigQueryMetastoreCatalog extends CatalogExtension {
167167
schema: StructType,
168168
partitions: Array[Transform],
169169
properties: util.Map[String, String]): Table = {
170+
val provider = properties.get(TableCatalog.PROP_PROVIDER)
171+
if (provider.toUpperCase != "ICEBERG") {
172+
throw new UnsupportedOperationException("Only creating iceberg tables supported.")
173+
}
170174
icebergCatalog.createTable(ident, schema, partitions, properties)
171175
}
172176

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala

Lines changed: 8 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,11 @@
11
paour clientsage ai.chronon.integrations.cloud_gcp
22

3-
import ai.chronon.api.Extensions.StringOps
4-
import ai.chronon.api.ScalaJavaConversions.JListOps
5-
import ai.chronon.spark.TableUtils
6-
import ai.chronon.spark.TableUtils.{TableCreatedWithInitialData, TableCreationStatus}
73
import ai.chronon.spark.format.Format
8-
import com.google.cloud.bigquery.connector.common.BigQueryUtil
9-
import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigquery.{
10-
BigQuery,
11-
BigQueryOptions,
12-
ExternalTableDefinition,
13-
FormatOptions,
14-
HivePartitioningOptions,
15-
TableInfo
16-
}
17-
import com.google.cloud.spark.bigquery.{SchemaConverters, SchemaConvertersConfiguration}
18-
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
4+
import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigquery._
195
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
206
import org.apache.spark.sql.execution.FileSourceScanExec
217
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
8+
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
229
import org.slf4j.LoggerFactory
2310

2411
case class GCS(sourceUri: String, fileFormat: String) extends Format {
@@ -31,10 +18,10 @@ case class GCS(sourceUri: String, fileFormat: String) extends Format {
3118
override def name: String = fileFormat
3219

3320
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
34-
implicit sparkSession: SparkSession): Seq[String] =
21+
implicit sparkSession: SparkSession): List[String] =
3522
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)
3623

37-
override def partitions(tableName: String)(implicit sparkSession: SparkSession): Seq[Map[String, String]] = {
24+
override def partitions(tableName: String)(implicit sparkSession: SparkSession): List[Map[String, String]] = {
3825

3926
/** Given:
4027
* hdfs://<host>:<port>/ path/ to/ partition/ a=1/ b=hello/ c=3.14
@@ -88,62 +75,15 @@ case class GCS(sourceUri: String, fileFormat: String) extends Format {
8875
fieldName -> fieldValue.toString // Just going to cast this as a string.
8976

9077
}.toMap)
78+
.toList
9179
}
9280

9381
override def generateTableBuilder(df: DataFrame,
9482
tableName: String,
95-
partitionColumns: Seq[String],
83+
partitionColumns: List[String],
9684
tableProperties: Map[String, String],
97-
fileFormat: String): (String => Unit) => TableCreationStatus = {
98-
99-
def inner(df: DataFrame, tableName: String, partitionColumns: Seq[String])(sqlEvaluator: String => Unit) = {
100-
101-
// See: https://cloud.google.com/bigquery/docs/partitioned-tables#limitations
102-
// "BigQuery does not support partitioning by multiple columns. Only one column can be used to partition a table."
103-
require(partitionColumns.size < 2,
104-
s"BigQuery only supports at most one partition column, incoming spec: ${partitionColumns}")
105-
106-
val shadedTableId = BigQueryUtil.parseTableId(tableName)
107-
108-
val writePrefix = TableUtils(df.sparkSession).writePrefix
109-
require(writePrefix.nonEmpty, "Please set conf 'spark.chronon.table_write.prefix' pointing to a data buour clientset.")
110-
111-
val path = writePrefix.get + tableName.sanitize + "/" //split("/").map(_.sanitize).mkString("/")
112-
val dataGlob = path + "*"
113-
114-
logger.info(s"""
115-
|table source uri: $dataGlob
116-
|partition uri: $path
117-
|""".stripMargin)
118-
119-
df.write
120-
.partitionBy(partitionColumns: _*)
121-
.mode("overwrite") // or "append" based on your needs
122-
.parquet(path)
123-
124-
val baseTableDef = ExternalTableDefinition
125-
.newBuilder(dataGlob, FormatOptions.parquet())
126-
.setAutodetect(true)
127-
128-
if (partitionColumns.nonEmpty) {
129-
val timePartitioning = HivePartitioningOptions
130-
.newBuilder()
131-
.setFields(partitionColumns.toJava)
132-
.setSourceUriPrefix(path)
133-
.setMode("STRINGS")
134-
.build()
135-
baseTableDef.setHivePartitioningOptions(timePartitioning)
136-
}
137-
138-
val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build()
139-
val createdTable = bigQueryClient.create(tableInfo)
140-
141-
println(s"Created external table ${createdTable.getTableId}")
142-
143-
TableCreatedWithInitialData
144-
}
145-
146-
inner(df, tableName, partitionColumns)
85+
fileFormat: String): (String => Unit) => Unit = {
86+
throw new UnsupportedOperationException("generateTableBuilder not supported for GCS")
14787
}
14888

14989
def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table")

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
paour clientsage ai.chronon.integrations.cloud_gcp
22
import ai.chronon.api.Extensions.StringOps
33
import ai.chronon.spark.TableUtils
4-
import ai.chronon.spark.format.{Format, FormatProvider}
4+
import ai.chronon.spark.format.{Format, FormatProvider, Iceberg}
5+
import com.google.cloud.bigquery._
56
import com.google.cloud.bigquery.connector.common.BigQueryUtil
6-
import com.google.cloud.spark.bigquery.repaour clientsaged.com.google.cloud.bigquery._
7+
import com.google.cloud.iceberg.bigquery.relocated.com.google.api.services.bigquery.model.TableReference
8+
import org.apache.iceberg.exceptions.NoSuchIcebergTableException
9+
import org.apache.iceberg.gcp.bigquery.{BigQueryClient, BigQueryClientImpl}
710
import org.apache.spark.sql.SparkSession
811

12+
import scala.util.Try
913
import scala.jdk.CollectionConverters._
1014

1115
case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider {
@@ -18,7 +22,8 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
1822
* - No default project: An error will occur if no project ID is available.
1923
*/
2024
private lazy val bqOptions = BigQueryOptions.getDefaultInstance
21-
lazy val bigQueryClient: BigQuery = bqOptions.getService
25+
private lazy val bigQueryClient: BigQuery = bqOptions.getService
26+
private lazy val icebergClient: BigQueryClient = new BigQueryClientImpl()
2227

2328
override def resolveTableName(tableName: String): String =
2429
format(tableName)
@@ -39,33 +44,49 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
3944
GCS(path, "PARQUET")
4045
}
4146

42-
private[cloud_gcp] def getFormat(table: Table): Format =
47+
private[cloud_gcp] def getFormat(table: Table): Format = {
4348
table.getDefinition.asInstanceOf[TableDefinition] match {
44-
4549
case definition: ExternalTableDefinition =>
46-
val formatOptions = definition.getFormatOptions
47-
.asInstanceOf[FormatOptions]
48-
val externalTable = table.getDefinition.asInstanceOf[ExternalTableDefinition]
49-
val uri = scala
50-
.Option(externalTable.getHivePartitioningOptions)
51-
.map(_.getSourceUriPrefix)
52-
.getOrElse {
53-
val uris = externalTable.getSourceUris.asScala
54-
require(uris.size == 1, s"External table ${table} can be baour clientsed by only one URI.")
55-
uris.head.replaceAll("/\\*\\.parquet$", "")
56-
}
50+
Try {
51+
val tableRef = new TableReference()
52+
.setProjectId(table.getTableId.getProject)
53+
.setDatasetId(table.getTableId.getDataset)
54+
.setTableId(table.getTableId.getTable)
55+
56+
icebergClient.getTable(tableRef) // Just try to load it. It'll fail if it's not an iceberg table.
57+
Iceberg
58+
}.recover {
59+
case canHandle: NoSuchIcebergTableException =>
60+
val formatOptions = definition.getFormatOptions.asInstanceOf[FormatOptions]
61+
val externalTable = table.getDefinition.asInstanceOf[ExternalTableDefinition]
62+
63+
val uri = scala
64+
.Option(externalTable.getHivePartitioningOptions)
65+
.map(_.getSourceUriPrefix)
66+
.getOrElse {
67+
val uris = externalTable.getSourceUris.asScala
68+
require(uris.size == 1, s"External table ${table} can be baour clientsed by only one URI.")
69+
uris.head.replaceAll("/\\*\\.parquet$", "")
70+
}
5771

58-
GCS(uri, formatOptions.getType)
72+
GCS(uri, formatOptions.getType)
73+
case e: Exception => throw e
74+
}.get
5975

6076
case _: StandardTableDefinition =>
6177
BigQueryFormat(table.getTableId.getProject, bigQueryClient, Map.empty)
6278

63-
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
79+
case _ =>
80+
throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
6481
}
82+
}
6583

6684
private def format(tableName: String): scala.Option[Format] = {
67-
68-
val btTableIdentifier: TableId = BigQueryUtil.parseTableId(tableName)
85+
val shadedTid = BigQueryUtil.parseTableId(tableName)
86+
val btTableIdentifier: TableId = scala
87+
.Option(shadedTid.getProject)
88+
.map(TableId.of(_, shadedTid.getDataset, shadedTid.getTable))
89+
.getOrElse(TableId.of(shadedTid.getDataset, shadedTid.getTable))
6990
val table = scala.Option(bigQueryClient.getTable(btTableIdentifier.getDataset, btTableIdentifier.getTable))
7091
table
7192
.map(getFormat)

0 commit comments

Comments
 (0)