Skip to content

Commit 8fdbd10

Browse files
authored
WIP - planner concepts + join backfill impl (#739)
## Summary ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced advanced planning and orchestration capabilities for offline data processing, including new planners for join and group-by operations. - Added utilities for metadata layering and enriched partition specification handling. - Introduced a structured approach to offline join planning with detailed metadata and node composition. - Added new traits and classes to support batch run contexts and node execution. - Added comprehensive table dependency generation based on joins, group-bys, and sources. - **Improvements** - Expanded partitioning metadata in API definitions for richer temporal semantics. - Updated orchestration schemas with new node types and renamed entities for clarity. - Improved naming conventions by replacing "Keyword" suffixes with "Folder" across configurations. - Streamlined internal logic for table and job naming, dependency resolution, and window operations. - Enhanced error handling and logging in table utilities. - Adjusted snapshot accuracy logic in merge operations for event data models. - Modified tile drift calculation to use a fixed timestamp offset. - **Bug Fixes** - Corrected logic for snapshot accuracy handling in merge operations. - **Refactor** - Centralized utility methods for window arithmetic and partition specification. - Consolidated job context parameters in join part jobs. - Restricted visibility of label join methods for better encapsulation. - Replaced generic bootstrap job classes with join-specific implementations. - Simplified import statements and method signatures for improved clarity. - Delegated left source table name computation to join offline planner. - **Chores** - Updated `.gitignore` to exclude additional directories. - Removed legacy configuration-to-node conversion code and associated dependency resolver tests. - **Documentation** - Improved code comments and formatting for new and existing classes and methods. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent a151d84 commit 8fdbd10

32 files changed

+909
-761
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ api/python/ai/chronon/fetcher/
2626
api/python/ai/chronon/hub/
2727
api/python/ai/chronon/lineage/
2828
api/python/ai/chronon/orchestration/
29+
api/python/ai/chronon/agent/
2930
api/python/.coverage
3031
api/python/htmlcov/
3132
**/derby.log

api/src/main/scala/ai/chronon/api/Constants.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ object Constants {
8181
val magicNullLong: java.lang.Long = -1234567890L
8282
val magicNullDouble: java.lang.Double = -1234567890.0
8383

84-
val JoinKeyword = "joins"
85-
val GroupByKeyword = "group_bys"
86-
val StagingQueryKeyword = "staging_queries"
87-
val ModelKeyword = "models"
84+
val JoinFolder = "joins"
85+
val GroupByFolder = "group_bys"
86+
val StagingQueryFolder = "staging_queries"
87+
val ModelFolder = "models"
8888

8989
// KV store related constants
9090
// continuation key to help with list pagination

api/src/main/scala/ai/chronon/api/Extensions.scala

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,29 @@ object Extensions {
8080
if (unbounded) "" else s"_${window.length}${window.timeUnit.str}"
8181

8282
def millis: Long = window.length.toLong * window.timeUnit.millis
83+
84+
def inverse: Window = {
85+
if (window == null) return null
86+
window.deepCopy().setLength(0 - window.getLength)
87+
}
88+
8389
}
8490

8591
object WindowUtils {
8692
val Unbounded: Window = new Window(Int.MaxValue, TimeUnit.DAYS)
93+
8794
val Hour: Window = new Window(1, TimeUnit.HOURS)
8895
val Day: Window = new Window(1, TimeUnit.DAYS)
96+
val Null: Window = null
97+
8998
private val SecondMillis: Long = 1000
9099
private val Minute: Long = 60 * SecondMillis
91100
val FiveMinutes: Long = 5 * Minute
92101
private val defaultPartitionSize: api.TimeUnit = api.TimeUnit.DAYS
93102
val onePartition: api.Window = new api.Window(1, defaultPartitionSize)
94103

104+
def hours(millis: Long): Window = new Window((millis / Hour.millis).toInt, TimeUnit.HOURS)
105+
95106
def millisToString(millis: Long): String = {
96107
if (millis % Day.millis == 0) {
97108
new Window((millis / Day.millis).toInt, TimeUnit.DAYS).str
@@ -113,13 +124,32 @@ object Extensions {
113124
timestampMs - (timestampMs % windowSizeMs)
114125
}
115126

116-
def convertUnits(window: Window, offsetUnit: api.TimeUnit): Window = {
127+
def convertUnits(window: Window, outputUnit: api.TimeUnit): Window = {
117128
if (window == null) return null
118-
if (window.timeUnit == offsetUnit) return window
129+
if (window.timeUnit == outputUnit) return window
119130

120-
val offsetSpanMillis = new Window(1, offsetUnit).millis
131+
val offsetSpanMillis = new Window(1, outputUnit).millis
121132
val windowLength = math.ceil(window.millis.toDouble / offsetSpanMillis.toDouble).toInt
122-
new Window(windowLength, offsetUnit)
133+
new Window(windowLength, outputUnit)
134+
}
135+
136+
def plus(a: Window, b: Window): Window = {
137+
if (a == null) return b
138+
if (b == null) return a
139+
140+
require(a.timeUnit == b.timeUnit, s"Cannot add windows with different time units ${a.timeUnit} vs. ${b.timeUnit}")
141+
142+
new Window(a.length + b.length, a.timeUnit)
143+
}
144+
145+
def minus(a: Window, b: Window): Window = {
146+
if (a == null) return null
147+
if (b == null) return a
148+
149+
require(a.timeUnit == b.timeUnit,
150+
s"Cannot subtract windows with different time units ${a.timeUnit} vs. ${b.timeUnit}")
151+
152+
new Window(a.length - b.length, a.timeUnit)
123153
}
124154

125155
def zero(timeUnits: api.TimeUnit = api.TimeUnit.DAYS): Window = new Window(0, timeUnits)
@@ -129,8 +159,12 @@ object Extensions {
129159
def cleanName: String = metaData.name.sanitize
130160

131161
def outputTable: String = s"${metaData.outputNamespace}.${metaData.cleanName}"
162+
163+
// legacy way of generating label info - we might end-up doing views again, but probably with better names
132164
def outputLabelTable: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labels"
133165
def outputFinalView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled"
166+
def outputLatestLabelView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled_latest"
167+
134168
def outputLabelTableV2: String =
135169
s"${metaData.outputNamespace}.${metaData.cleanName}_with_labels" // Used for the LabelJoinV2 flow
136170
def loggedTable: String = s"${outputTable}_logged"
@@ -372,6 +406,13 @@ object Extensions {
372406
else { source.getJoinSource.getJoin.metaData.outputTable }
373407
}
374408

409+
def mutationsTable: Option[String] = for (
410+
entities <- Option(source.getEntities);
411+
mutationsTable <- Option(entities.getMutationTable)
412+
) yield {
413+
mutationsTable
414+
}
415+
375416
def overwriteTable(table: String): Unit = {
376417
if (source.isSetEntities) { source.getEntities.setSnapshotTable(table) }
377418
else if (source.isSetEvents) { source.getEvents.setTable(table) }
@@ -441,12 +482,26 @@ object Extensions {
441482
*/
442483
def cleanTopic: String = source.topic.cleanSpec
443484

485+
def partitionInterval: Window = Option(source.query.partitionInterval).getOrElse(WindowUtils.onePartition)
444486
}
445487

446488
implicit class GroupByOps(groupBy: GroupBy) extends GroupBy(groupBy) {
447489

448490
def keyNameForKvStore: String = {
449-
_keyNameForKvStore(groupBy.metaData, GroupByKeyword)
491+
_keyNameForKvStore(groupBy.metaData, GroupByFolder)
492+
}
493+
494+
def allWindows: Array[Window] = {
495+
groupBy.aggregations
496+
.iterator()
497+
.toScala
498+
.flatMap { agg =>
499+
Option(agg.windows)
500+
.map(_.iterator().toScala)
501+
.getOrElse(Array(WindowUtils.Null).iterator)
502+
}
503+
.toArray
504+
.distinct
450505
}
451506

452507
def maxWindow: Option[Window] = {
@@ -841,7 +896,7 @@ object Extensions {
841896

842897
implicit class JoinOps(val join: Join) extends Serializable {
843898
def keyNameForKvStore: String = {
844-
_keyNameForKvStore(join.metaData, JoinKeyword)
899+
_keyNameForKvStore(join.metaData, JoinFolder)
845900
}
846901

847902
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
@@ -1242,13 +1297,13 @@ object Extensions {
12421297

12431298
implicit class StagingQueryOps(stagingQuery: StagingQuery) {
12441299
def keyNameForKvStore: String = {
1245-
_keyNameForKvStore(stagingQuery.metaData, StagingQueryKeyword)
1300+
_keyNameForKvStore(stagingQuery.metaData, StagingQueryFolder)
12461301
}
12471302
}
12481303

12491304
implicit class ModelOps(model: Model) {
12501305
def keyNameForKvStore: String = {
1251-
_keyNameForKvStore(model.metaData, ModelKeyword)
1306+
_keyNameForKvStore(model.metaData, ModelFolder)
12521307
}
12531308
}
12541309

0 commit comments

Comments
 (0)