Skip to content

Commit 492c4d0

Browse files
feat: TableUtils to be compatible with DataPointer (part 1) (#158)
## Summary - Fixing some earlier bugs. Doing case matching on java isn't the same with scala classes. - Adding a few more params to GCS format. Need the source URI, and the `format` string (which is basically the file format). - Deleting some queries to INFORMATION_SCHEMA in GCS format. no longer needed since we are using the BQ Client. - Adding some code to handle Spark InternalRows. We are using a low level impl to get at the InMemoryFileIndex which contains file partitions. It gives us internal rows so we need to translate that to rows, which involves the correct serialization based on the column types. - Add a couple tests to BigQueryCatalogTest - Adding a `name` field to `Format` - Begin to migrate some TableUtils methods to delegate to DataPointer. - https://app.asana.com/0/1208949807589885/1208960391734329/f - https://app.asana.com/0/1208949807589885/1208960391734331/f ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced BigQuery and GCS format handling with improved table name resolution and data source support. - Updated Spark table utilities with more robust data loading and management capabilities. - Introduced new methods for resolving table names and handling data formats. - Added support for new dependencies related to Google Cloud Dataproc. - Introduced unit tests for GCS format functionality. - **Bug Fixes** - Improved error handling for data source formats and table operations. - Streamlined data pointer operations for better format compatibility. - **Refactor** - Simplified data loading and schema retrieval methods. - Consolidated format handling logic in data source operations. - Enhanced organization and clarity in data pointer handling. - Cleaned up dependency declarations and project settings in build configuration. - Improved error handling and control flow in join computation processes. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent a41fdb4 commit 492c4d0

File tree

7 files changed

+231
-174
lines changed

7 files changed

+231
-174
lines changed

build.sbt

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ lazy val cloud_gcp = project
215215
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "3.0.3", // it's what's on the cluster
216216
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.26",
217217
libraryDependencies += "com.google.cloud.bigdataoss" % "gcsio" % "3.0.3", // need it for https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java
218+
libraryDependencies += "com.google.cloud.bigdataoss" % "util-hadoop" % "3.0.0", // need it for https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/util-hadoop/src/main/java/com/google/cloud/hadoop/util/HadoopConfigurationProperty.java
218219
libraryDependencies += "io.circe" %% "circe-yaml" % "1.15.0",
219220
libraryDependencies += "com.google.cloud.spark" %% s"spark-bigquery-with-dependencies" % "0.41.0",
220221
libraryDependencies += "com.google.cloud.bigtable" % "bigtable-hbase-2.x" % "2.14.2",
@@ -389,7 +390,6 @@ lazy val hub = (project in file("hub"))
389390
}
390391
)
391392

392-
393393
val scala_test = "org.scalatest" %% "scalatest" % "3.2.19" % "test"
394394
val sl4j = "org.slf4j" % "slf4j-api" % slf4jApiVersion
395395
val logback = "ch.qos.logback" % "logback-classic" % logbackClassicVersion
@@ -403,25 +403,22 @@ val commonDependencies = Seq(
403403
lazy val orchestration = project
404404
.dependsOn(online.%("compile->compile;test->test"))
405405
.settings(
406-
407406
assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"),
408407
Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"),
409-
410408
assembly / assemblyMergeStrategy := {
411-
case "log4j2.properties" => MergeStrategy.first
409+
case "log4j2.properties" => MergeStrategy.first
412410
case "META-INF/log4j-provider.properties" => MergeStrategy.first
413-
case PathList("org", "apache", "logging", "log4j", "core", "config", "plugins", "Log4j2Plugins.dat") => MergeStrategy.first
411+
case PathList("org", "apache", "logging", "log4j", "core", "config", "plugins", "Log4j2Plugins.dat") =>
412+
MergeStrategy.first
414413
case x => (assembly / assemblyMergeStrategy).value(x)
415414
},
416-
417415
libraryDependencies ++= commonDependencies ++ Seq(
418416
"org.apache.logging.log4j" % "log4j-api" % log4j2_version,
419417
"org.apache.logging.log4j" % "log4j-core" % log4j2_version,
420-
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2_version,
421-
),
418+
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2_version
419+
)
422420
)
423421

424-
425422
ThisBuild / assemblyMergeStrategy := {
426423
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
427424
case PathList("META-INF", _*) => MergeStrategy.filterDistinctLines

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,63 +5,94 @@ import ai.chronon.spark.FormatProvider
55
import ai.chronon.spark.Hive
66
import com.google.cloud.bigquery.BigQueryOptions
77
import com.google.cloud.bigquery.ExternalTableDefinition
8+
import com.google.cloud.bigquery.FormatOptions
89
import com.google.cloud.bigquery.StandardTableDefinition
10+
import com.google.cloud.bigquery.Table
911
import com.google.cloud.bigquery.connector.common.BigQueryUtil
10-
import com.google.cloud.bigquery.{TableId => BTableId}
1112
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId
1213
import org.apache.spark.sql.SparkSession
14+
import org.apache.spark.sql.functions.{col, to_date}
15+
16+
import scala.collection.JavaConverters._
1317

1418
case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider {
19+
// Order of Precedence for Default Project
20+
// Explicitly configured project in code (e.g., setProjectId()).
21+
// GOOGLE_CLOUD_PROJECT environment variable.
22+
// project_id from the ADC service account JSON file.
23+
// Active project in the gcloud CLI configuration.
24+
// No default project: An error will occur if no project ID is available.
25+
lazy val bqOptions = BigQueryOptions.getDefaultInstance
26+
lazy val bigQueryClient = bqOptions.getService
27+
28+
override def resolveTableName(tableName: String): String = {
29+
format(tableName: String) match {
30+
case GCS(_, uri, _) => uri
31+
case _ => tableName
32+
}
33+
}
34+
35+
override def readFormat(tableName: String): Format = format(tableName)
36+
37+
// Fixed to BigQuery for now.
38+
override def writeFormat(tableName: String): Format = BQuery(bqOptions.getProjectId)
1539

16-
lazy val bigQueryClient = BigQueryOptions.getDefaultInstance.getService
17-
def readFormat(tableName: String): Format = {
40+
private def format(tableName: String): Format = {
1841

1942
val btTableIdentifier: TableId = BigQueryUtil.parseTableId(tableName)
20-
val unshadedTI: BTableId =
21-
BTableId.of(btTableIdentifier.getProject, btTableIdentifier.getDataset, btTableIdentifier.getTable)
22-
23-
val tableOpt = Option(bigQueryClient.getTable(unshadedTI))
24-
25-
tableOpt match {
26-
case Some(table) => {
27-
table.getDefinition match {
28-
case _: ExternalTableDefinition => BQuery(unshadedTI.getProject)
29-
case _: StandardTableDefinition => GCS(unshadedTI.getProject)
30-
}
31-
}
32-
case None => Hive
33-
}
43+
44+
val tableOpt: Option[Table] = Option(
45+
bigQueryClient.getTable(btTableIdentifier.getDataset, btTableIdentifier.getTable))
46+
tableOpt
47+
.map((table) => {
48+
49+
if (table.getDefinition.isInstanceOf[ExternalTableDefinition]) {
50+
val uris = table.getDefinition
51+
.asInstanceOf[ExternalTableDefinition]
52+
.getSourceUris
53+
.asScala
54+
.toList
55+
.map((uri) => uri.stripSuffix("/*") + "/")
56+
57+
assert(uris.length == 1, s"External table ${tableName} can be backed by only one URI.")
58+
59+
val formatStr = table.getDefinition
60+
.asInstanceOf[ExternalTableDefinition]
61+
.getFormatOptions
62+
.asInstanceOf[FormatOptions]
63+
.getType
64+
65+
GCS(table.getTableId.getProject, uris.head, formatStr)
66+
} else if (table.getDefinition.isInstanceOf[StandardTableDefinition]) BQuery(table.getTableId.getProject)
67+
else throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
68+
})
69+
.getOrElse(Hive)
3470

3571
/**
36-
Using federation
37-
val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
38-
val tableMeta = sparkSession.sessionState.catalog.getTableRawMetadata(tableIdentifier)
39-
val storageProvider = tableMeta.provider
40-
storageProvider match {
41-
case Some("com.google.cloud.spark.bigquery") => {
42-
val tableProperties = tableMeta.properties
43-
val project = tableProperties
44-
.get("FEDERATION_BIGQUERY_TABLE_PROPERTY")
45-
.map(BigQueryUtil.parseTableId)
46-
.map(_.getProject)
47-
.getOrElse(throw new IllegalStateException("bigquery project required!"))
48-
val bigQueryTableType = tableProperties.get("federation.bigquery.table.type")
49-
bigQueryTableType.map(_.toUpperCase) match {
50-
case Some("EXTERNAL") => GCS(project)
51-
case Some("MANAGED") => BQuery(project)
52-
case None => throw new IllegalStateException("Dataproc federation service must be available.")
53-
54-
}
55-
}
56-
57-
case Some("hive") | None => Hive
58-
}
72+
* Using federation
73+
* val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
74+
* val tableMeta = sparkSession.sessionState.catalog.getTableRawMetadata(tableIdentifier)
75+
* val storageProvider = tableMeta.provider
76+
* storageProvider match {
77+
* case Some("com.google.cloud.spark.bigquery") => {
78+
* val tableProperties = tableMeta.properties
79+
* val project = tableProperties
80+
* .get("FEDERATION_BIGQUERY_TABLE_PROPERTY")
81+
* .map(BigQueryUtil.parseTableId)
82+
* .map(_.getProject)
83+
* .getOrElse(throw new IllegalStateException("bigquery project required!"))
84+
* val bigQueryTableType = tableProperties.get("federation.bigquery.table.type")
85+
* bigQueryTableType.map(_.toUpperCase) match {
86+
* case Some("EXTERNAL") => GCS(project)
87+
* case Some("MANAGED") => BQuery(project)
88+
* case None => throw new IllegalStateException("Dataproc federation service must be available.")
89+
*
90+
* }
91+
*
92+
* case Some("hive") | None => Hive
93+
* }
5994
* */
60-
6195
}
62-
63-
// For now, fix to BigQuery. We'll clean this up.
64-
def writeFormat(tableName: String): Format = ???
6596
}
6697

6798
case class BQuery(project: String) extends Format {
@@ -120,6 +151,13 @@ case class BQuery(project: String) extends Format {
120151
.option("project", project)
121152
.option("query", partValsSql)
122153
.load()
154+
.select(
155+
to_date(col("partition_id"),
156+
"yyyyMMdd"
157+
) // Note: this "yyyyMMdd" format is hardcoded but we need to change it to be something else.
158+
.as("partition_id"))
159+
.na // Should filter out '__NULL__' and '__UNPARTITIONED__'. See: https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables
160+
.drop()
123161
.as[String]
124162
.collect
125163
.toList

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,21 @@
11
package ai.chronon.integrations.cloud_gcp
22

33
import ai.chronon.spark.Format
4+
import org.apache.spark.sql.Encoders
5+
import org.apache.spark.sql.Row
46
import org.apache.spark.sql.SparkSession
7+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
58
import org.apache.spark.sql.execution.FileSourceScanExec
69
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
7-
import org.apache.spark.sql.functions.col
8-
import org.apache.spark.sql.functions.explode
9-
import org.apache.spark.sql.functions.url_decode
10+
case class GCS(project: String, sourceUri: String, fileFormat: String) extends Format {
1011

11-
case class GCS(project: String) extends Format {
12-
13-
override def name: String = ""
12+
override def name: String = fileFormat
1413

1514
override def primaryPartitions(tableName: String, partitionColumn: String, subPartitionsFilter: Map[String, String])(
1615
implicit sparkSession: SparkSession): Seq[String] =
1716
super.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)
1817

1918
override def partitions(tableName: String)(implicit sparkSession: SparkSession): Seq[Map[String, String]] = {
20-
import sparkSession.implicits._
21-
22-
val tableIdentifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
23-
val table = tableIdentifier.table
24-
val database = tableIdentifier.database.getOrElse(throw new IllegalArgumentException("database required!"))
25-
26-
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
27-
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations
28-
sparkSession.conf.set("viewsEnabled", "true")
29-
sparkSession.conf.set("materializationDataset", database)
30-
31-
// First, grab the URI location from BQ
32-
val uriSQL =
33-
s"""
34-
|select JSON_EXTRACT_STRING_ARRAY(option_value) as option_values from `${project}.${database}.INFORMATION_SCHEMA.TABLE_OPTIONS`
35-
|WHERE table_name = '${table}' and option_name = 'uris'
36-
|
37-
|""".stripMargin
38-
39-
val uris = sparkSession.read
40-
.format("bigquery")
41-
.option("project", project)
42-
.option("query", uriSQL)
43-
.load()
44-
.select(explode(col("option_values")).as("option_value"))
45-
.select(url_decode(col("option_value")))
46-
.as[String]
47-
.collect
48-
.toList
49-
50-
assert(uris.length == 1, s"External table ${tableName} can be backed by only one URI.")
5119

5220
/**
5321
* Given:
@@ -70,7 +38,8 @@ case class GCS(project: String) extends Format {
7038
*
7139
*/
7240
val partitionSpec = sparkSession.read
73-
.parquet(uris: _*)
41+
.format(fileFormat)
42+
.load(sourceUri)
7443
.queryExecution
7544
.sparkPlan
7645
.asInstanceOf[FileSourceScanExec]
@@ -82,16 +51,28 @@ case class GCS(project: String) extends Format {
8251
val partitionColumns = partitionSpec.partitionColumns
8352
val partitions = partitionSpec.partitions.map(_.values)
8453

85-
partitions
54+
val deserializer =
55+
try {
56+
Encoders.row(partitionColumns).asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
57+
} catch {
58+
case e: Exception =>
59+
throw new RuntimeException(s"Failed to create deserializer for partition columns: ${e.getMessage}", e)
60+
}
61+
62+
val roundTripped = sparkSession
63+
.createDataFrame(sparkSession.sparkContext.parallelize(partitions.map(deserializer)), partitionColumns)
64+
.collect
65+
.toList
66+
67+
roundTripped
8668
.map((part) =>
8769
partitionColumns.fields.toList.zipWithIndex.map {
8870
case (field, idx) => {
8971
val fieldName = field.name
90-
val fieldValue = part.get(idx, field.dataType)
72+
val fieldValue = part.get(idx)
9173
fieldName -> fieldValue.toString // Just going to cast this as a string.
9274
}
9375
}.toMap)
94-
.toList
9576
}
9677

9778
def createTableTypeString: String = throw new UnsupportedOperationException("GCS does not support create table")

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import ai.chronon.spark.SparkSessionBuilder
66
import ai.chronon.spark.TableUtils
77
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
88
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
9+
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration
10+
import com.google.cloud.hadoop.util.HadoopConfigurationProperty
911
import org.apache.spark.sql.SparkSession
1012
import org.junit.Assert.assertEquals
1113
import org.junit.Assert.assertTrue
@@ -24,8 +26,7 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
2426
"spark.chronon.partition.column" -> "c",
2527
"spark.hadoop.fs.gs.impl" -> classOf[GoogleHadoopFileSystem].getName,
2628
"spark.hadoop.fs.AbstractFileSystem.gs.impl" -> classOf[GoogleHadoopFS].getName,
27-
"spark.hadoop.google.cloud.auth.service.account.enable" -> true.toString,
28-
"spark.hadoop.fs.gs.impl" -> classOf[GoogleHadoopFileSystem].getName
29+
"spark.hadoop.google.cloud.auth.service.account.enable" -> true.toString
2930
))
3031
)
3132
lazy val tableUtils: TableUtils = TableUtils(spark)
@@ -34,20 +35,43 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
3435
assertEquals("thrift://localhost:9083", spark.sqlContext.getConf("hive.metastore.uris"))
3536
}
3637

38+
test("google runtime classes are available") {
39+
assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[Long]])
40+
assertCompiles("classOf[GoogleHadoopFileSystem]")
41+
assertCompiles("classOf[GoogleHadoopFS]")
42+
43+
}
44+
3745
test("verify dynamic classloading of GCP providers") {
3846
assertTrue(tableUtils.tableReadFormat("data.sample_native") match {
3947
case BQuery(_) => true
4048
case _ => false
4149
})
4250
}
4351

44-
ignore("integration testing bigquery load table") {
52+
ignore("integration testing bigquery native table") {
53+
val nativeTable = "data.sample_native"
54+
val table = tableUtils.loadTable(nativeTable)
55+
table.show
56+
val partitioned = tableUtils.isPartitioned(nativeTable)
57+
println(partitioned)
58+
// val database = tableUtils.createDatabase("test_database")
59+
val allParts = tableUtils.allPartitions(nativeTable)
60+
println(allParts)
61+
}
62+
63+
ignore("integration testing bigquery external table") {
4564
val externalTable = "data.checkouts_parquet"
65+
66+
val bs = GoogleHadoopFileSystemConfiguration.BLOCK_SIZE
67+
println(bs)
4668
val table = tableUtils.loadTable(externalTable)
47-
tableUtils.isPartitioned(externalTable)
48-
tableUtils.createDatabase("test_database")
49-
tableUtils.allPartitions(externalTable)
5069
table.show
70+
val partitioned = tableUtils.isPartitioned(externalTable)
71+
println(partitioned)
72+
// val database = tableUtils.createDatabase("test_database")
73+
val allParts = tableUtils.allPartitions(externalTable)
74+
println(allParts)
5175
}
5276

5377
ignore("integration testing bigquery partitions") {

spark/src/main/scala/ai/chronon/spark/Driver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ object Driver {
278278
val join = new Join(
279279
args.joinConf,
280280
args.endDate(),
281-
args.buildTableUtils(),
281+
tableUtils,
282282
!args.runFirstHole(),
283283
selectedJoinParts = args.selectedJoinParts.toOption
284284
)

0 commit comments

Comments
 (0)