Skip to content

Commit fb9a931

Browse files
fix: properly set partition column (#407)
## Summary - #381 introduced the ability to configure a partition column at the node-level. This PR simply fixes a missed spot on the plumbing of the new StagingQuery attribute. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the query builder to support specifying a partition column, providing greater customization for query formation and partitioning. - **Improvements** - Improved handling of partition columns by introducing a fallback mechanism to ensure valid values are used when necessary. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent e1be518 commit fb9a931

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

api/src/main/scala/ai/chronon/api/Builders.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,14 @@ object Builders {
307307
query: String = null,
308308
metaData: MetaData = null,
309309
startPartition: String = null,
310-
setups: Seq[String] = null
310+
setups: Seq[String] = null,
311+
partitionColumn: String = null
311312
): StagingQuery = {
312313
val stagingQuery = new StagingQuery()
313314
stagingQuery.setQuery(query)
314315
stagingQuery.setMetaData(metaData)
315316
stagingQuery.setStartPartition(startPartition)
317+
stagingQuery.setPartitionColumn(partitionColumn)
316318
if (setups != null) stagingQuery.setSetups(setups.toJava)
317319
stagingQuery
318320
}

spark/src/main/scala/ai/chronon/spark/StagingQuery.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
3535
.map(_.toScala.toMap)
3636
.orNull
3737

38-
private val partitionCols: Seq[String] = Seq(tableUtils.partitionColumn) ++
39-
Option(stagingQueryConf.metaData.customJsonLookUp(key = "additional_partition_cols"))
40-
.getOrElse(new java.util.ArrayList[String]())
41-
.asInstanceOf[java.util.ArrayList[String]]
42-
.toScala
38+
private val partitionCols: Seq[String] =
39+
Seq(Option(stagingQueryConf.getPartitionColumn).getOrElse(tableUtils.partitionColumn)) ++
40+
Option(stagingQueryConf.metaData.customJsonLookUp(key = "additional_partition_cols"))
41+
.getOrElse(new java.util.ArrayList[String]())
42+
.asInstanceOf[java.util.ArrayList[String]]
43+
.toScala
4344

4445
def computeStagingQuery(stepDays: Option[Int] = None,
4546
enableAutoExpand: Option[Boolean] = Some(true),

0 commit comments

Comments
 (0)