-
Notifications
You must be signed in to change notification settings - Fork 0
feat: TableUtils to be compatible with DataPointer (part 1) #158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
49c38d5
3835c7d
74f6946
ce3448d
c1a355c
d2b05fc
4ec33b7
b2ff399
6f228f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,53 +1,22 @@ | ||||||||||||||||||
package ai.chronon.integrations.cloud_gcp | ||||||||||||||||||
|
||||||||||||||||||
import ai.chronon.spark.Format | ||||||||||||||||||
import org.apache.spark.sql.Encoders | ||||||||||||||||||
import org.apache.spark.sql.Row | ||||||||||||||||||
import org.apache.spark.sql.SparkSession | ||||||||||||||||||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||||||||||||||||||
import org.apache.spark.sql.execution.FileSourceScanExec | ||||||||||||||||||
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex | ||||||||||||||||||
import org.apache.spark.sql.functions.col | ||||||||||||||||||
import org.apache.spark.sql.functions.explode | ||||||||||||||||||
import org.apache.spark.sql.functions.url_decode | ||||||||||||||||||
|
||||||||||||||||||
case class GCS(project: String) extends Format { | ||||||||||||||||||
case class GCS(project: String, sourceUri: String, format: String) extends Format { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification All GCS usages need updates Found commented-out legacy code and active usages that need to be updated with the new parameters:
🔗 Analysis chainVerify all GCS instantiations are updated Breaking change: new required parameters. Let me check the context of these usages to ensure they're properly updated. 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Find all GCS instantiations
rg "new GCS|GCS\(" --type scala
Length of output: 576 Script: #!/bin/bash
# Get context around GCS usages in BigQueryFormat
rg "GCS\(" -A 5 -B 5 cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
Length of output: 1457
tchow-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
|
||||||||||||||||||
override def name: String = "" | ||||||||||||||||||
override def name: String = format | ||||||||||||||||||
|
||||||||||||||||||
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])( | ||||||||||||||||||
implicit sparkSession: SparkSession): Seq[String] = | ||||||||||||||||||
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter) | ||||||||||||||||||
|
||||||||||||||||||
override def partitions(tableName: String)(implicit sparkSession: SparkSession): Seq[Map[String, String]] = { | ||||||||||||||||||
import sparkSession.implicits._ | ||||||||||||||||||
|
||||||||||||||||||
val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) | ||||||||||||||||||
val table = tableIdentifier.table | ||||||||||||||||||
val database = tableIdentifier.database.getOrElse(throw new IllegalArgumentException("database required!")) | ||||||||||||||||||
|
||||||||||||||||||
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191 | ||||||||||||||||||
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations | ||||||||||||||||||
sparkSession.conf.set("viewsEnabled", "true") | ||||||||||||||||||
sparkSession.conf.set("materializationDataset", database) | ||||||||||||||||||
|
||||||||||||||||||
// First, grab the URI location from BQ | ||||||||||||||||||
val uriSQL = | ||||||||||||||||||
s""" | ||||||||||||||||||
|select JSON_EXTRACT_STRING_ARRAY(option_value) as option_values from `${project}.${database}.INFORMATION_SCHEMA.TABLE_OPTIONS` | ||||||||||||||||||
|WHERE table_name = '${table}' and option_name = 'uris' | ||||||||||||||||||
| | ||||||||||||||||||
|""".stripMargin | ||||||||||||||||||
|
||||||||||||||||||
val uris = sparkSession.read | ||||||||||||||||||
.format("bigquery") | ||||||||||||||||||
.option("project", project) | ||||||||||||||||||
.option("query", uriSQL) | ||||||||||||||||||
.load() | ||||||||||||||||||
.select(explode(col("option_values")).as("option_value")) | ||||||||||||||||||
.select(url_decode(col("option_value"))) | ||||||||||||||||||
.as[String] | ||||||||||||||||||
.collect | ||||||||||||||||||
.toList | ||||||||||||||||||
|
||||||||||||||||||
assert(uris.length == 1, s"External table ${tableName} can be backed by only one URI.") | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
* Given: | ||||||||||||||||||
|
@@ -70,7 +39,7 @@ case class GCS(project: String) extends Format { | |||||||||||||||||
* | ||||||||||||||||||
*/ | ||||||||||||||||||
val partitionSpec = sparkSession.read | ||||||||||||||||||
.parquet(uris: _*) | ||||||||||||||||||
.parquet(sourceUri) | ||||||||||||||||||
tchow-zlai marked this conversation as resolved.
Show resolved
Hide resolved
tchow-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
.queryExecution | ||||||||||||||||||
.sparkPlan | ||||||||||||||||||
.asInstanceOf[FileSourceScanExec] | ||||||||||||||||||
|
@@ -82,16 +51,23 @@ case class GCS(project: String) extends Format { | |||||||||||||||||
val partitionColumns = partitionSpec.partitionColumns | ||||||||||||||||||
val partitions = partitionSpec.partitions.map(_.values) | ||||||||||||||||||
|
||||||||||||||||||
partitions | ||||||||||||||||||
val deserializer = | ||||||||||||||||||
Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer() | ||||||||||||||||||
|
||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for schema deserialization The deserializer creation could fail if partition columns schema is invalid. - val deserializer =
- Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
+ val deserializer = try {
+ Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(s"Failed to create deserializer for partition columns: ${e.getMessage}", e)
+ } 📝 Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems like it could be helpful? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @david-zlai, indeed, wrapping the deserializer creation in a try-catch block can be helpful. It ensures that if the partition columns schema is invalid or causes an exception, you’ll have a clear message indicating the failure. For example: val deserializer = try {
Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
} catch {
case e: Exception =>
throw new RuntimeException(s"Failed to create deserializer for partition columns: ${e.getMessage}", e)
} This way, any schema-related issues will be properly surfaced and handled. Let me know if you need anything else! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||||||||
val roundTripped = sparkSession | ||||||||||||||||||
.createDataFrame(sparkSession.sparkContext.parallelize(partitions.map(deserializer)), partitionColumns) | ||||||||||||||||||
.collect | ||||||||||||||||||
.toList | ||||||||||||||||||
|
||||||||||||||||||
roundTripped | ||||||||||||||||||
.map((part) => | ||||||||||||||||||
partitionColumns.fields.toList.zipWithIndex.map { | ||||||||||||||||||
case (field, idx) => { | ||||||||||||||||||
val fieldName = field.name | ||||||||||||||||||
val fieldValue = part.get(idx, field.dataType) | ||||||||||||||||||
val fieldValue = part.get(idx) | ||||||||||||||||||
fieldName -> fieldValue.toString // Just going to cast this as a string. | ||||||||||||||||||
Comment on lines
+72
to
73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle null values in partition values Missing null check for fieldValue. - val fieldValue = part.get(idx)
- fieldName -> fieldValue.toString
+ val fieldValue = Option(part.get(idx))
+ fieldName -> fieldValue.map(_.toString).getOrElse("") 📝 Committable suggestion
Suggested change
|
||||||||||||||||||
} | ||||||||||||||||||
}.toMap) | ||||||||||||||||||
.toList | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table") | ||||||||||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.