Skip to content

Commit b0c7cd0

Browse files
feat: StagingQuery param (#406)
## Summary - Supporting StagingQueries for configurable compute engines. To support BigQuery, the simplest way is to just write bigquery sql and run it on bq to create the final table. Let's first make the API change. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added an option for users to specify the compute engine when processing queries, offering choices such as Spark and BigQuery. - Introduced validation to ensure that queries run only with the designated engine. - **Style** - Streamlined code organization for enhanced readability. - Consolidated and reordered import statements for improved clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 33bfc75 commit b0c7cd0

File tree

4 files changed

+72
-53
lines changed

4 files changed

+72
-53
lines changed

api/src/main/scala/ai/chronon/api/Builders.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,15 @@ object Builders {
308308
metaData: MetaData = null,
309309
startPartition: String = null,
310310
setups: Seq[String] = null,
311-
partitionColumn: String = null
311+
partitionColumn: String = null,
312+
engineType: EngineType = EngineType.SPARK
312313
): StagingQuery = {
313314
val stagingQuery = new StagingQuery()
314315
stagingQuery.setQuery(query)
315316
stagingQuery.setMetaData(metaData)
316317
stagingQuery.setStartPartition(startPartition)
317318
stagingQuery.setPartitionColumn(partitionColumn)
319+
stagingQuery.setEngineType(engineType)
318320
if (setups != null) stagingQuery.setSetups(setups.toJava)
319321
stagingQuery
320322
}

api/thrift/api.thrift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ struct StagingQuery {
5454
* Only needed for `max_date` template
5555
**/
5656
5: optional string partitionColumn
57+
58+
/**
59+
* By default, spark is the compute engine. You can specify an override (eg. bigquery, etc.)
60+
**/
61+
6: optional EngineType engineType
5762
}
5863

5964
struct EventSource {
@@ -220,6 +225,12 @@ enum Accuracy {
220225
SNAPSHOT = 1
221226
}
222227

228+
enum EngineType {
229+
SPARK = 0,
230+
BIGQUERY = 1
231+
232+
}
233+
223234
struct MetaData {
224235
1: optional string name
225236
// marking this as true means that the conf can be served online

spark/src/main/scala/ai/chronon/spark/StagingQuery.scala

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@ package ai.chronon.spark
1818

1919
import ai.chronon.api
2020
import ai.chronon.api.Extensions._
21-
import ai.chronon.api.ParametricMacro
21+
import ai.chronon.api.{EngineType, ParametricMacro}
2222
import ai.chronon.api.ScalaJavaConversions._
2323
import ai.chronon.online.PartitionRange
2424
import ai.chronon.spark.Extensions._
25-
import org.slf4j.Logger
26-
import org.slf4j.LoggerFactory
25+
import org.slf4j.{Logger, LoggerFactory}
2726

2827
import scala.collection.mutable
2928

@@ -46,57 +45,64 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
4645
enableAutoExpand: Option[Boolean] = Some(true),
4746
overrideStartPartition: Option[String] = None,
4847
skipFirstHole: Boolean = true): Unit = {
49-
Option(stagingQueryConf.setups).foreach(_.toScala.foreach(tableUtils.sql))
48+
if (stagingQueryConf.getEngineType != EngineType.SPARK) {
49+
throw new UnsupportedOperationException(
50+
s"Engine type ${stagingQueryConf.getEngineType} is not supported for Staging Query")
51+
}
5052
// the input table is not partitioned, usually for data testing or for kaggle demos
5153
if (stagingQueryConf.startPartition == null) {
52-
tableUtils.sql(stagingQueryConf.query).save(outputTable)
53-
return
54-
}
55-
val overrideStart = overrideStartPartition.getOrElse(stagingQueryConf.startPartition)
56-
val unfilledRanges =
57-
tableUtils.unfilledRanges(outputTable,
58-
PartitionRange(overrideStart, endPartition)(tableUtils.partitionSpec),
59-
skipFirstHole = skipFirstHole)
54+
tableUtils.sql(stagingQueryConf.query).saveUnPartitioned(outputTable)
55+
} else {
56+
val overrideStart = overrideStartPartition.getOrElse(stagingQueryConf.startPartition)
57+
val unfilledRanges =
58+
tableUtils.unfilledRanges(outputTable,
59+
PartitionRange(overrideStart, endPartition)(tableUtils.partitionSpec),
60+
skipFirstHole = skipFirstHole)
6061

61-
if (unfilledRanges.isEmpty) {
62-
logger.info(s"""No unfilled range for $outputTable given
63-
|start partition of ${stagingQueryConf.startPartition}
64-
|override start partition of $overrideStart
65-
|end partition of $endPartition
66-
|""".stripMargin)
67-
return
68-
}
69-
val stagingQueryUnfilledRanges = unfilledRanges.get
70-
logger.info(s"Staging Query unfilled ranges: $stagingQueryUnfilledRanges")
71-
val exceptions = mutable.Buffer.empty[String]
72-
stagingQueryUnfilledRanges.foreach { stagingQueryUnfilledRange =>
73-
try {
74-
val stepRanges = stepDays.map(stagingQueryUnfilledRange.steps).getOrElse(Seq(stagingQueryUnfilledRange))
75-
logger.info(s"Staging query ranges to compute: ${stepRanges.map { _.toString }.pretty}")
76-
stepRanges.zipWithIndex.foreach { case (range, index) =>
77-
val progress = s"| [${index + 1}/${stepRanges.size}]"
78-
logger.info(s"Computing staging query for range: $range $progress")
79-
val renderedQuery =
80-
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
81-
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
82-
val df = tableUtils.sql(renderedQuery)
83-
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
84-
logger.info(s"Wrote to table $outputTable, into partitions: $range $progress")
85-
}
86-
logger.info(s"Finished writing Staging Query data to $outputTable")
87-
} catch {
88-
case err: Throwable =>
89-
exceptions.append(s"Error handling range $stagingQueryUnfilledRange : ${err.getMessage}\n${err.traceString}")
62+
if (unfilledRanges.isEmpty) {
63+
logger.info(s"""No unfilled range for $outputTable given
64+
|start partition of ${stagingQueryConf.startPartition}
65+
|override start partition of $overrideStart
66+
|end partition of $endPartition
67+
|""".stripMargin)
68+
return
9069
}
91-
}
92-
if (exceptions.nonEmpty) {
93-
val length = exceptions.length
94-
val fullMessage = exceptions.zipWithIndex
95-
.map { case (message, index) =>
96-
s"[${index + 1}/${length} exceptions]\n${message}"
70+
val stagingQueryUnfilledRanges = unfilledRanges.get
71+
logger.info(s"Staging Query unfilled ranges: $stagingQueryUnfilledRanges")
72+
Option(stagingQueryConf.setups).foreach(_.toScala.foreach(tableUtils.sql))
73+
val exceptions = mutable.Buffer.empty[String]
74+
stagingQueryUnfilledRanges.foreach { stagingQueryUnfilledRange =>
75+
try {
76+
val stepRanges = stepDays.map(stagingQueryUnfilledRange.steps).getOrElse(Seq(stagingQueryUnfilledRange))
77+
logger.info(s"Staging query ranges to compute: ${stepRanges.map {
78+
_.toString
79+
}.pretty}")
80+
stepRanges.zipWithIndex.foreach { case (range, index) =>
81+
val progress = s"| [${index + 1}/${stepRanges.size}]"
82+
logger.info(s"Computing staging query for range: $range $progress")
83+
val renderedQuery =
84+
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
85+
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
86+
val df = tableUtils.sql(renderedQuery)
87+
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
88+
logger.info(s"Wrote to table $outputTable, into partitions: $range $progress")
89+
}
90+
logger.info(s"Finished writing Staging Query data to $outputTable")
91+
} catch {
92+
case err: Throwable =>
93+
exceptions.append(
94+
s"Error handling range $stagingQueryUnfilledRange : ${err.getMessage}\n${err.traceString}")
9795
}
98-
.mkString("\n")
99-
throw new Exception(fullMessage)
96+
}
97+
if (exceptions.nonEmpty) {
98+
val length = exceptions.length
99+
val fullMessage = exceptions.zipWithIndex
100+
.map { case (message, index) =>
101+
s"[${index + 1}/${length} exceptions]\n${message}"
102+
}
103+
.mkString("\n")
104+
throw new Exception(fullMessage)
105+
}
100106
}
101107
}
102108
}

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package ai.chronon.spark
1919
import ai.chronon.aggregator.windowing.TsUtils
2020
import ai.chronon.api.ColorPrinter.ColorString
2121
import ai.chronon.api.Extensions._
22-
import ai.chronon.api.{Constants, PartitionSpec, Query, QueryUtils}
2322
import ai.chronon.api.ScalaJavaConversions._
23+
import ai.chronon.api.{Constants, PartitionSpec, Query, QueryUtils}
2424
import ai.chronon.online.PartitionRange
2525
import ai.chronon.spark.Extensions._
2626
import ai.chronon.spark.TableUtils.{
@@ -32,17 +32,17 @@ import ai.chronon.spark.TableUtils.{
3232
import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql
3333
import ai.chronon.spark.format.{DefaultFormatProvider, Format, FormatProvider}
3434
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
35-
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
3635
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
3736
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project}
3837
import org.apache.spark.sql.functions._
3938
import org.apache.spark.sql.types._
39+
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
4040
import org.apache.spark.storage.StorageLevel
4141
import org.slf4j.{Logger, LoggerFactory}
4242

4343
import java.io.{PrintWriter, StringWriter}
44-
import java.time.{Instant, ZoneId}
4544
import java.time.format.DateTimeFormatter
45+
import java.time.{Instant, ZoneId}
4646
import scala.collection.{Seq, immutable, mutable}
4747
import scala.util.{Failure, Try}
4848

0 commit comments

Comments
 (0)