-
Notifications
You must be signed in to change notification settings - Fork 0
feat: TableUtils to be compatible with DataPointer (part 1) #158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
49c38d5
3835c7d
74f6946
ce3448d
c1a355c
d2b05fc
4ec33b7
b2ff399
6f228f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,53 +1,21 @@ | ||||||||||
package ai.chronon.integrations.cloud_gcp | ||||||||||
|
||||||||||
import ai.chronon.spark.Format | ||||||||||
import org.apache.spark.sql.Encoders | ||||||||||
import org.apache.spark.sql.Row | ||||||||||
import org.apache.spark.sql.SparkSession | ||||||||||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||||||||||
import org.apache.spark.sql.execution.FileSourceScanExec | ||||||||||
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex | ||||||||||
import org.apache.spark.sql.functions.col | ||||||||||
import org.apache.spark.sql.functions.explode | ||||||||||
import org.apache.spark.sql.functions.url_decode | ||||||||||
case class GCS(project: String, sourceUri: String, fileFormat: String) extends Format { | ||||||||||
|
||||||||||
case class GCS(project: String) extends Format { | ||||||||||
|
||||||||||
override def name: String = "" | ||||||||||
override def name: String = fileFormat | ||||||||||
|
||||||||||
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])( | ||||||||||
implicit sparkSession: SparkSession): Seq[String] = | ||||||||||
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter) | ||||||||||
|
||||||||||
override def partitions(tableName: String)(implicit sparkSession: SparkSession): Seq[Map[String, String]] = { | ||||||||||
import sparkSession.implicits._ | ||||||||||
|
||||||||||
val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) | ||||||||||
val table = tableIdentifier.table | ||||||||||
val database = tableIdentifier.database.getOrElse(throw new IllegalArgumentException("database required!")) | ||||||||||
|
||||||||||
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191 | ||||||||||
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations | ||||||||||
sparkSession.conf.set("viewsEnabled", "true") | ||||||||||
sparkSession.conf.set("materializationDataset", database) | ||||||||||
|
||||||||||
// First, grab the URI location from BQ | ||||||||||
val uriSQL = | ||||||||||
s""" | ||||||||||
|select JSON_EXTRACT_STRING_ARRAY(option_value) as option_values from `${project}.${database}.INFORMATION_SCHEMA.TABLE_OPTIONS` | ||||||||||
|WHERE table_name = '${table}' and option_name = 'uris' | ||||||||||
| | ||||||||||
|""".stripMargin | ||||||||||
|
||||||||||
val uris = sparkSession.read | ||||||||||
.format("bigquery") | ||||||||||
.option("project", project) | ||||||||||
.option("query", uriSQL) | ||||||||||
.load() | ||||||||||
.select(explode(col("option_values")).as("option_value")) | ||||||||||
.select(url_decode(col("option_value"))) | ||||||||||
.as[String] | ||||||||||
.collect | ||||||||||
.toList | ||||||||||
|
||||||||||
assert(uris.length == 1, s"External table ${tableName} can be backed by only one URI.") | ||||||||||
|
||||||||||
/** | ||||||||||
* Given: | ||||||||||
|
@@ -70,7 +38,8 @@ case class GCS(project: String) extends Format { | |||||||||
* | ||||||||||
*/ | ||||||||||
val partitionSpec = sparkSession.read | ||||||||||
.parquet(uris: _*) | ||||||||||
.format(fileFormat) | ||||||||||
.load(sourceUri) | ||||||||||
.queryExecution | ||||||||||
.sparkPlan | ||||||||||
.asInstanceOf[FileSourceScanExec] | ||||||||||
|
@@ -82,16 +51,28 @@ case class GCS(project: String) extends Format { | |||||||||
val partitionColumns = partitionSpec.partitionColumns | ||||||||||
val partitions = partitionSpec.partitions.map(_.values) | ||||||||||
|
||||||||||
partitions | ||||||||||
val deserializer = | ||||||||||
try { | ||||||||||
Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer() | ||||||||||
} catch { | ||||||||||
case e: Exception => | ||||||||||
throw new RuntimeException(s"Failed to create deserializer for partition columns: ${e.getMessage}", e) | ||||||||||
} | ||||||||||
|
||||||||||
val roundTripped = sparkSession | ||||||||||
.createDataFrame(sparkSession.sparkContext.parallelize(partitions.map(deserializer)), partitionColumns) | ||||||||||
.collect | ||||||||||
.toList | ||||||||||
|
||||||||||
roundTripped | ||||||||||
.map((part) => | ||||||||||
partitionColumns.fields.toList.zipWithIndex.map { | ||||||||||
case (field, idx) => { | ||||||||||
val fieldName = field.name | ||||||||||
val fieldValue = part.get(idx, field.dataType) | ||||||||||
val fieldValue = part.get(idx) | ||||||||||
fieldName -> fieldValue.toString // Just going to cast this as a string. | ||||||||||
Comment on lines
+72
to
73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle null values in partition values Missing null check for fieldValue. - val fieldValue = part.get(idx)
- fieldName -> fieldValue.toString
+ val fieldValue = Option(part.get(idx))
+ fieldName -> fieldValue.map(_.toString).getOrElse("") 📝 Committable suggestion
Suggested change
|
||||||||||
} | ||||||||||
}.toMap) | ||||||||||
.toList | ||||||||||
} | ||||||||||
|
||||||||||
def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table") | ||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.