Skip to content

Commit 22fb501

Browse files
wip
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
1 parent e5b6f6f commit 22fb501

File tree

3 files changed

+22
-10
lines changed

3 files changed

+22
-10
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package ai.chronon.integrations.cloud_gcp
2+
3+
import ai.chronon.api
4+
import ai.chronon.spark.{StagingQuery, TableUtils}
5+
6+
class BigQueryStagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tableUtils: TableUtils) extends StagingQuery(stagingQueryConf, endPartition, tableUtils) {
7+
override def computeStagingQuery(stepDays: Option[Int] = None, enableAutoExpand: Option[Boolean] = Some(true), overrideStartPartition: Option[String] = None, skipFirstHole: Boolean = true): Unit = {
8+
super.computeStagingQuery(stepDays, enableAutoExpand, overrideStartPartition, skipFirstHole)
9+
}
10+
11+
}

spark/src/main/scala/ai/chronon/spark/StagingQuery.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
4141
.asInstanceOf[java.util.ArrayList[String]]
4242
.toScala
4343

44+
def computeRenderedQuery(renderedQuery: String, enableAutoExpand: Option[Boolean]) = {
45+
val df = tableUtils.sql(renderedQuery)
46+
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
47+
}
48+
49+
def prepare() = {
50+
Option(stagingQueryConf.setups).foreach(_.toScala.foreach(tableUtils.sql))
51+
}
52+
4453
def computeStagingQuery(stepDays: Option[Int] = None,
4554
enableAutoExpand: Option[Boolean] = Some(true),
4655
overrideStartPartition: Option[String] = None,

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,12 @@ import ai.chronon.api.ScalaJavaConversions._
2323
import ai.chronon.api.{Constants, PartitionSpec, Query, QueryUtils}
2424
import ai.chronon.online.PartitionRange
2525
import ai.chronon.spark.Extensions._
26-
import ai.chronon.spark.TableUtils.{
27-
TableAlreadyExists,
28-
TableCreatedWithInitialData,
29-
TableCreatedWithoutInitialData,
30-
TableCreationStatus
31-
}
26+
import ai.chronon.spark.TableUtils.{TableAlreadyExists, TableCreatedWithInitialData, TableCreatedWithoutInitialData, TableCreationStatus}
3227
import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql
3328
import ai.chronon.spark.format.{DefaultFormatProvider, Format, FormatProvider}
3429
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
3530
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
36-
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
37-
import org.apache.spark.sql.catalyst.plans.logical.Filter
38-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
39-
import org.apache.spark.sql.catalyst.plans.logical.Project
31+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project}
4032
import org.apache.spark.sql.functions._
4133
import org.apache.spark.sql.types._
4234
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}

0 commit comments

Comments
 (0)