Skip to content

feat: StagingQuery param #406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/src/main/scala/ai/chronon/api/Builders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,15 @@ object Builders {
metaData: MetaData = null,
startPartition: String = null,
setups: Seq[String] = null,
partitionColumn: String = null
partitionColumn: String = null,
engineType: EngineType = EngineType.SPARK
): StagingQuery = {
val stagingQuery = new StagingQuery()
stagingQuery.setQuery(query)
stagingQuery.setMetaData(metaData)
stagingQuery.setStartPartition(startPartition)
stagingQuery.setPartitionColumn(partitionColumn)
stagingQuery.setEngineType(engineType)
if (setups != null) stagingQuery.setSetups(setups.toJava)
stagingQuery
}
Expand Down
11 changes: 11 additions & 0 deletions api/thrift/api.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ struct StagingQuery {
* Only needed for `max_date` template
**/
5: optional string partitionColumn

/**
* By default, spark is the compute engine. You can specify an override (eg. bigquery, etc.)
**/
6: optional EngineType engineType
}

struct EventSource {
Expand Down Expand Up @@ -220,6 +225,12 @@ enum Accuracy {
SNAPSHOT = 1
}

enum EngineType {
SPARK = 0,
BIGQUERY = 1

}

struct MetaData {
1: optional string name
// marking this as true means that the conf can be served online
Expand Down
104 changes: 55 additions & 49 deletions spark/src/main/scala/ai/chronon/spark/StagingQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.Extensions._
import ai.chronon.api.ParametricMacro
import ai.chronon.api.{EngineType, ParametricMacro}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.online.PartitionRange
import ai.chronon.spark.Extensions._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.mutable

Expand All @@ -46,57 +45,64 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
enableAutoExpand: Option[Boolean] = Some(true),
overrideStartPartition: Option[String] = None,
skipFirstHole: Boolean = true): Unit = {
Option(stagingQueryConf.setups).foreach(_.toScala.foreach(tableUtils.sql))
if (stagingQueryConf.getEngineType != EngineType.SPARK) {
throw new UnsupportedOperationException(
s"Engine type ${stagingQueryConf.getEngineType} is not supported for Staging Query")
}
// the input table is not partitioned, usually for data testing or for kaggle demos
if (stagingQueryConf.startPartition == null) {
tableUtils.sql(stagingQueryConf.query).save(outputTable)
return
}
val overrideStart = overrideStartPartition.getOrElse(stagingQueryConf.startPartition)
val unfilledRanges =
tableUtils.unfilledRanges(outputTable,
PartitionRange(overrideStart, endPartition)(tableUtils.partitionSpec),
skipFirstHole = skipFirstHole)
tableUtils.sql(stagingQueryConf.query).saveUnPartitioned(outputTable)
} else {
val overrideStart = overrideStartPartition.getOrElse(stagingQueryConf.startPartition)
val unfilledRanges =
tableUtils.unfilledRanges(outputTable,
PartitionRange(overrideStart, endPartition)(tableUtils.partitionSpec),
skipFirstHole = skipFirstHole)

if (unfilledRanges.isEmpty) {
logger.info(s"""No unfilled range for $outputTable given
|start partition of ${stagingQueryConf.startPartition}
|override start partition of $overrideStart
|end partition of $endPartition
|""".stripMargin)
return
}
val stagingQueryUnfilledRanges = unfilledRanges.get
logger.info(s"Staging Query unfilled ranges: $stagingQueryUnfilledRanges")
val exceptions = mutable.Buffer.empty[String]
stagingQueryUnfilledRanges.foreach { stagingQueryUnfilledRange =>
try {
val stepRanges = stepDays.map(stagingQueryUnfilledRange.steps).getOrElse(Seq(stagingQueryUnfilledRange))
logger.info(s"Staging query ranges to compute: ${stepRanges.map { _.toString }.pretty}")
stepRanges.zipWithIndex.foreach { case (range, index) =>
val progress = s"| [${index + 1}/${stepRanges.size}]"
logger.info(s"Computing staging query for range: $range $progress")
val renderedQuery =
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
val df = tableUtils.sql(renderedQuery)
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
logger.info(s"Wrote to table $outputTable, into partitions: $range $progress")
}
logger.info(s"Finished writing Staging Query data to $outputTable")
} catch {
case err: Throwable =>
exceptions.append(s"Error handling range $stagingQueryUnfilledRange : ${err.getMessage}\n${err.traceString}")
if (unfilledRanges.isEmpty) {
logger.info(s"""No unfilled range for $outputTable given
|start partition of ${stagingQueryConf.startPartition}
|override start partition of $overrideStart
|end partition of $endPartition
|""".stripMargin)
return
}
}
if (exceptions.nonEmpty) {
val length = exceptions.length
val fullMessage = exceptions.zipWithIndex
.map { case (message, index) =>
s"[${index + 1}/${length} exceptions]\n${message}"
val stagingQueryUnfilledRanges = unfilledRanges.get
logger.info(s"Staging Query unfilled ranges: $stagingQueryUnfilledRanges")
Option(stagingQueryConf.setups).foreach(_.toScala.foreach(tableUtils.sql))
val exceptions = mutable.Buffer.empty[String]
stagingQueryUnfilledRanges.foreach { stagingQueryUnfilledRange =>
try {
val stepRanges = stepDays.map(stagingQueryUnfilledRange.steps).getOrElse(Seq(stagingQueryUnfilledRange))
logger.info(s"Staging query ranges to compute: ${stepRanges.map {
_.toString
}.pretty}")
stepRanges.zipWithIndex.foreach { case (range, index) =>
val progress = s"| [${index + 1}/${stepRanges.size}]"
logger.info(s"Computing staging query for range: $range $progress")
val renderedQuery =
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
val df = tableUtils.sql(renderedQuery)
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
logger.info(s"Wrote to table $outputTable, into partitions: $range $progress")
}
logger.info(s"Finished writing Staging Query data to $outputTable")
} catch {
case err: Throwable =>
exceptions.append(
s"Error handling range $stagingQueryUnfilledRange : ${err.getMessage}\n${err.traceString}")
}
.mkString("\n")
throw new Exception(fullMessage)
}
if (exceptions.nonEmpty) {
val length = exceptions.length
val fullMessage = exceptions.zipWithIndex
.map { case (message, index) =>
s"[${index + 1}/${length} exceptions]\n${message}"
}
.mkString("\n")
throw new Exception(fullMessage)
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package ai.chronon.spark
import ai.chronon.aggregator.windowing.TsUtils
import ai.chronon.api.ColorPrinter.ColorString
import ai.chronon.api.Extensions._
import ai.chronon.api.{Constants, PartitionSpec, Query, QueryUtils}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.{Constants, PartitionSpec, Query, QueryUtils}
import ai.chronon.online.PartitionRange
import ai.chronon.spark.Extensions._
import ai.chronon.spark.TableUtils.{
Expand All @@ -32,17 +32,17 @@ import ai.chronon.spark.TableUtils.{
import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql
import ai.chronon.spark.format.{DefaultFormatProvider, Format, FormatProvider}
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}

import java.io.{PrintWriter, StringWriter}
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneId}
import scala.collection.{Seq, immutable, mutable}
import scala.util.{Failure, Try}

Expand Down
Loading