Skip to content

feat: TableUtils to be compatible with DataPointer (part 1) #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ lazy val cloud_gcp = project
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "3.0.3", // it's what's on the cluster
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.26",
libraryDependencies += "com.google.cloud.bigdataoss" % "gcsio" % "3.0.3", // need it for https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java
libraryDependencies += "com.google.cloud.bigdataoss" % "util-hadoop" % "3.0.0", // need it for https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/util-hadoop/src/main/java/com/google/cloud/hadoop/util/HadoopConfigurationProperty.java
libraryDependencies += "io.circe" %% "circe-yaml" % "1.15.0",
libraryDependencies += "com.google.cloud.spark" %% s"spark-bigquery-with-dependencies" % "0.41.0",
libraryDependencies += "com.google.cloud.bigtable" % "bigtable-hbase-2.x" % "2.14.2",
Expand Down Expand Up @@ -389,7 +390,6 @@ lazy val hub = (project in file("hub"))
}
)


val scala_test = "org.scalatest" %% "scalatest" % "3.2.19" % "test"
val sl4j = "org.slf4j" % "slf4j-api" % slf4jApiVersion
val logback = "ch.qos.logback" % "logback-classic" % logbackClassicVersion
Expand All @@ -403,25 +403,22 @@ val commonDependencies = Seq(
lazy val orchestration = project
.dependsOn(online.%("compile->compile;test->test"))
.settings(

assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"),
Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"),

assembly / assemblyMergeStrategy := {
case "log4j2.properties" => MergeStrategy.first
case "log4j2.properties" => MergeStrategy.first
case "META-INF/log4j-provider.properties" => MergeStrategy.first
case PathList("org", "apache", "logging", "log4j", "core", "config", "plugins", "Log4j2Plugins.dat") => MergeStrategy.first
case PathList("org", "apache", "logging", "log4j", "core", "config", "plugins", "Log4j2Plugins.dat") =>
MergeStrategy.first
case x => (assembly / assemblyMergeStrategy).value(x)
},

libraryDependencies ++= commonDependencies ++ Seq(
"org.apache.logging.log4j" % "log4j-api" % log4j2_version,
"org.apache.logging.log4j" % "log4j-core" % log4j2_version,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2_version,
),
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2_version
)
)


ThisBuild / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", _*) => MergeStrategy.filterDistinctLines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,94 @@ import ai.chronon.spark.FormatProvider
import ai.chronon.spark.Hive
import com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.bigquery.ExternalTableDefinition
import com.google.cloud.bigquery.FormatOptions
import com.google.cloud.bigquery.StandardTableDefinition
import com.google.cloud.bigquery.Table
import com.google.cloud.bigquery.connector.common.BigQueryUtil
import com.google.cloud.bigquery.{TableId => BTableId}
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._

case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider {
// Order of Precedence for Default Project
// Explicitly configured project in code (e.g., setProjectId()).
// GOOGLE_CLOUD_PROJECT environment variable.
// project_id from the ADC service account JSON file.
// Active project in the gcloud CLI configuration.
// No default project: An error will occur if no project ID is available.
lazy val bqOptions = BigQueryOptions.getDefaultInstance
lazy val bigQueryClient = bqOptions.getService

override def resolveTableName(tableName: String): String = {
format(tableName: String) match {
case GCS(_, uri, _) => uri
case _ => tableName
}
}

override def readFormat(tableName: String): Format = format(tableName)

// Fixed to BigQuery for now.
override def writeFormat(tableName: String): Format = BQuery(bqOptions.getProjectId)

lazy val bigQueryClient = BigQueryOptions.getDefaultInstance.getService
def readFormat(tableName: String): Format = {
private def format(tableName: String): Format = {

val btTableIdentifier: TableId = BigQueryUtil.parseTableId(tableName)
val unshadedTI: BTableId =
BTableId.of(btTableIdentifier.getProject, btTableIdentifier.getDataset, btTableIdentifier.getTable)

val tableOpt = Option(bigQueryClient.getTable(unshadedTI))

tableOpt match {
case Some(table) => {
table.getDefinition match {
case _: ExternalTableDefinition => BQuery(unshadedTI.getProject)
case _: StandardTableDefinition => GCS(unshadedTI.getProject)
}
}
case None => Hive
}

val tableOpt: Option[Table] = Option(
bigQueryClient.getTable(btTableIdentifier.getDataset, btTableIdentifier.getTable))
tableOpt
.map((table) => {

if (table.getDefinition.isInstanceOf[ExternalTableDefinition]) {
val uris = table.getDefinition
.asInstanceOf[ExternalTableDefinition]
.getSourceUris
.asScala
.toList
.map((uri) => uri.stripSuffix("/*") + "/")

assert(uris.length == 1, s"External table ${tableName} can be backed by only one URI.")

val formatStr = table.getDefinition
.asInstanceOf[ExternalTableDefinition]
.getFormatOptions
.asInstanceOf[FormatOptions]
.getType

GCS(table.getTableId.getProject, uris.head, formatStr)
} else if (table.getDefinition.isInstanceOf[StandardTableDefinition]) BQuery(table.getTableId.getProject)
else throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
})
.getOrElse(Hive)

/**
Using federation
val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val tableMeta = sparkSession.sessionState.catalog.getTableRawMetadata(tableIdentifier)
val storageProvider = tableMeta.provider
storageProvider match {
case Some("com.google.cloud.spark.bigquery") => {
val tableProperties = tableMeta.properties
val project = tableProperties
.get("FEDERATION_BIGQUERY_TABLE_PROPERTY")
.map(BigQueryUtil.parseTableId)
.map(_.getProject)
.getOrElse(throw new IllegalStateException("bigquery project required!"))
val bigQueryTableType = tableProperties.get("federation.bigquery.table.type")
bigQueryTableType.map(_.toUpperCase) match {
case Some("EXTERNAL") => GCS(project)
case Some("MANAGED") => BQuery(project)
case None => throw new IllegalStateException("Dataproc federation service must be available.")

}
}

case Some("hive") | None => Hive
}
* Using federation
* val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
* val tableMeta = sparkSession.sessionState.catalog.getTableRawMetadata(tableIdentifier)
* val storageProvider = tableMeta.provider
* storageProvider match {
* case Some("com.google.cloud.spark.bigquery") => {
* val tableProperties = tableMeta.properties
* val project = tableProperties
* .get("FEDERATION_BIGQUERY_TABLE_PROPERTY")
* .map(BigQueryUtil.parseTableId)
* .map(_.getProject)
* .getOrElse(throw new IllegalStateException("bigquery project required!"))
* val bigQueryTableType = tableProperties.get("federation.bigquery.table.type")
* bigQueryTableType.map(_.toUpperCase) match {
* case Some("EXTERNAL") => GCS(project)
* case Some("MANAGED") => BQuery(project)
* case None => throw new IllegalStateException("Dataproc federation service must be available.")
*
* }
* }
*
* case Some("hive") | None => Hive
* }
* */

}

// For now, fix to BigQuery. We'll clean this up.
def writeFormat(tableName: String): Format = ???
}

case class BQuery(project: String) extends Format {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,22 @@
package ai.chronon.integrations.cloud_gcp

import ai.chronon.spark.Format
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.url_decode

case class GCS(project: String) extends Format {
case class GCS(project: String, sourceUri: String, format: String) extends Format {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

All GCS usages need updates

Found commented-out legacy code and active usages that need to be updated with the new parameters:

  • BigQueryFormat.scala: Pattern match GCS(_, uri, _) is updated
  • BigQueryFormat.scala: Creation GCS(table.getTableId.getProject, uris.head, formatStr) is updated
  • BigQueryFormat.scala: Commented code GCS(project) is outdated and needs removal
🔗 Analysis chain

Verify all GCS instantiations are updated

Breaking change: new required parameters.


Let me check the context of these usages to ensure they're properly updated.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find all GCS instantiations
rg "new GCS|GCS\(" --type scala

Length of output: 576


Script:

#!/bin/bash
# Get context around GCS usages in BigQueryFormat
rg "GCS\(" -A 5 -B 5 cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Length of output: 1457


override def name: String = ""
override def name: String = format

override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
implicit sparkSession: SparkSession): Seq[String] =
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)

override def partitions(tableName: String)(implicit sparkSession: SparkSession): Seq[Map[String, String]] = {
import sparkSession.implicits._

val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val table = tableIdentifier.table
val database = tableIdentifier.database.getOrElse(throw new IllegalArgumentException("database required!"))

// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations
sparkSession.conf.set("viewsEnabled", "true")
sparkSession.conf.set("materializationDataset", database)

// First, grab the URI location from BQ
val uriSQL =
s"""
|select JSON_EXTRACT_STRING_ARRAY(option_value) as option_values from `${project}.${database}.INFORMATION_SCHEMA.TABLE_OPTIONS`
|WHERE table_name = '${table}' and option_name = 'uris'
|
|""".stripMargin

val uris = sparkSession.read
.format("bigquery")
.option("project", project)
.option("query", uriSQL)
.load()
.select(explode(col("option_values")).as("option_value"))
.select(url_decode(col("option_value")))
.as[String]
.collect
.toList

assert(uris.length == 1, s"External table ${tableName} can be backed by only one URI.")

/**
* Given:
Expand All @@ -70,7 +39,7 @@ case class GCS(project: String) extends Format {
*
*/
val partitionSpec = sparkSession.read
.parquet(uris: _*)
.parquet(sourceUri)
.queryExecution
.sparkPlan
.asInstanceOf[FileSourceScanExec]
Expand All @@ -82,16 +51,23 @@ case class GCS(project: String) extends Format {
val partitionColumns = partitionSpec.partitionColumns
val partitions = partitionSpec.partitions.map(_.values)

partitions
val deserializer =
Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()

Copy link

@coderabbitai coderabbitai bot Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for schema deserialization

The deserializer creation could fail if partition columns schema is invalid.

-    val deserializer =
-      Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
+    val deserializer = try {
+      Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
+    } catch {
+      case e: Exception => 
+        throw new RuntimeException(s"Failed to create deserializer for partition columns: ${e.getMessage}", e)
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val deserializer =
Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
val deserializer = try {
Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
} catch {
case e: Exception =>
throw new RuntimeException(s"Failed to create deserializer for partition columns: ${e.getMessage}", e)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it could be helpful?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@david-zlai, indeed, wrapping the deserializer creation in a try-catch block can be helpful. It ensures that if the partition columns schema is invalid or causes an exception, you’ll have a clear message indicating the failure. For example:

val deserializer = try {
  Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
} catch {
  case e: Exception =>
    throw new RuntimeException(s"Failed to create deserializer for partition columns: ${e.getMessage}", e)
}

This way, any schema-related issues will be properly surfaced and handled. Let me know if you need anything else!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

val roundTripped = sparkSession
.createDataFrame(sparkSession.sparkContext.parallelize(partitions.map(deserializer)), partitionColumns)
.collect
.toList

roundTripped
.map((part) =>
partitionColumns.fields.toList.zipWithIndex.map {
case (field, idx) => {
val fieldName = field.name
val fieldValue = part.get(idx, field.dataType)
val fieldValue = part.get(idx)
fieldName -> fieldValue.toString // Just going to cast this as a string.
Comment on lines +72 to 73
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle null values in partition values

Missing null check for fieldValue.

-            val fieldValue = part.get(idx)
-            fieldName -> fieldValue.toString
+            val fieldValue = Option(part.get(idx))
+            fieldName -> fieldValue.map(_.toString).getOrElse("")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val fieldValue = part.get(idx)
fieldName -> fieldValue.toString // Just going to cast this as a string.
val fieldValue = Option(part.get(idx))
fieldName -> fieldValue.map(_.toString).getOrElse("")

}
}.toMap)
.toList
}

def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.TableUtils
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration
import com.google.cloud.hadoop.util.HadoopConfigurationProperty
import org.apache.spark.sql.SparkSession
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
Expand All @@ -24,8 +26,7 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
"spark.chronon.partition.column" -> "c",
"spark.hadoop.fs.gs.impl" -> classOf[GoogleHadoopFileSystem].getName,
"spark.hadoop.fs.AbstractFileSystem.gs.impl" -> classOf[GoogleHadoopFS].getName,
"spark.hadoop.google.cloud.auth.service.account.enable" -> true.toString,
"spark.hadoop.fs.gs.impl" -> classOf[GoogleHadoopFileSystem].getName
"spark.hadoop.google.cloud.auth.service.account.enable" -> true.toString
))
)
lazy val tableUtils: TableUtils = TableUtils(spark)
Expand All @@ -34,20 +35,43 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
assertEquals("thrift://localhost:9083", spark.sqlContext.getConf("hive.metastore.uris"))
}

test("google runtime classes are available") {
assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[Long]])
assertCompiles("classOf[GoogleHadoopFileSystem]")
assertCompiles("classOf[GoogleHadoopFS]")

}

test("verify dynamic classloading of GCP providers") {
assertTrue(tableUtils.tableReadFormat("data.sample_native") match {
case BQuery(_) => true
case _ => false
})
}

ignore("integration testing bigquery load table") {
ignore("integration testing bigquery native table") {
val nativeTable = "data.sample_native"
val table = tableUtils.loadTable(nativeTable)
table.show
val partitioned = tableUtils.isPartitioned(nativeTable)
println(partitioned)
// val database = tableUtils.createDatabase("test_database")
val allParts = tableUtils.allPartitions(nativeTable)
println(allParts)
}

ignore("integration testing bigquery external table") {
val externalTable = "data.checkouts_parquet"

val bs = GoogleHadoopFileSystemConfiguration.BLOCK_SIZE
println(bs)
val table = tableUtils.loadTable(externalTable)
tableUtils.isPartitioned(externalTable)
tableUtils.createDatabase("test_database")
tableUtils.allPartitions(externalTable)
table.show
val partitioned = tableUtils.isPartitioned(externalTable)
println(partitioned)
// val database = tableUtils.createDatabase("test_database")
val allParts = tableUtils.allPartitions(externalTable)
println(allParts)
}

ignore("integration testing bigquery partitions") {
Expand Down
Loading
Loading