Skip to content

Commit bcc7d65

Browse files
authored
Branch versioning logic (#163)
## Summary Writing down the cases & logic behind branching. Will continue writing about how these are internally represented. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Bug Fixes** - Corrected a typo in `TableDependency` struct, renaming `forceComputae` to `forceCompute`. - **New Features** - Added support for branch-based workflows in data orchestration. - Introduced `RepoIndex` for managing repository data and versions. - Created `SequenceMap` utility for managing unique value sequences. - Added `TablePrinter` for formatted data output. - Introduced `VersionUpdate` class for tracking version changes. - Added `StringExtensions` for MD5 hashing of strings. - **Refactoring** - Removed `LineageIndex` and `LogicalSet` classes. - Updated naming conventions for logical nodes. - Restructured repository parsing and version tracking mechanisms. - **Documentation** - Added comprehensive README for branch workflow management. - Expanded documentation for global planning and batch workload processing. - **Chores** - Updated logging dependencies. - Modified build configuration. - Added new test specifications. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 588627c commit bcc7d65

24 files changed

+1296
-243
lines changed

api/py/ai/chronon/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ def join_part_output_table_name(join, jp, full_name: bool = False):
262262
def partOutputTable(jp: JoinPart): String = (Seq(join.metaData.outputTable) ++ Option(jp.prefix) :+
263263
jp.groupBy.metaData.cleanName).mkString("_")
264264
"""
265+
print(join)
265266
if not join.metaData.name and isinstance(join, api.Join):
266267
__set_name(join, api.Join, "joins")
267268
return "_".join(

api/thrift/orchestration.thrift

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,6 @@ struct NodeInfo {
8282
30: optional LogicalNode conf
8383
}
8484

85-
86-
/** First Pass
87-
* NodeInstance::(name, type, conf_hash) -> #[parent_nodes]
88-
* Node::(name, type) -> #[conf_hash]
89-
90-
* Second Pass
91-
* Node::(name, type, compute_hash) -> #[parent_nodes]
92-
93-
* different file_hashes but same lineage_hash should all go into the same orchestrator workflow
94-
* Node::(name, type, lineage_hash)
95-
**/
96-
9785
struct NodeConnections {
9886
1: optional list<NodeKey> parents
9987
2: optional list<NodeKey> children
@@ -272,7 +260,7 @@ struct TableDependency {
272260
* JoinParts could use data from batch backfills or upload tables when available
273261
* When not available they shouldn't force computation of the backfills and upload tables.
274262
**/
275-
21: optional bool forceComputae
263+
21: optional bool forceCompute
276264
}
277265

278266
union Dependency {

build.sbt

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ lazy val api = project
136136
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
137137
"com.novocode" % "junit-interface" % "0.11" % "test",
138138
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
139-
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test"
139+
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
140+
// needed by thrift
141+
"org.slf4j" % "slf4j-api" % slf4jApiVersion,
140142
)
141143
)
142144

@@ -410,33 +412,22 @@ lazy val hub = (project in file("hub"))
410412
}
411413
)
412414

413-
val scala_test = "org.scalatest" %% "scalatest" % "3.2.19" % "test"
414-
val sl4j = "org.slf4j" % "slf4j-api" % slf4jApiVersion
415-
val logback = "ch.qos.logback" % "logback-classic" % logbackClassicVersion
416-
val commonDependencies = Seq(
417-
scala_test,
418-
sl4j,
419-
logback
420-
)
421415

422416
// orchestrator
423417
lazy val orchestration = project
424418
.dependsOn(online.%("compile->compile;test->test"))
425419
.settings(
426420
assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"),
421+
427422
Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"),
428-
assembly / assemblyMergeStrategy := {
429-
case "log4j2.properties" => MergeStrategy.first
430-
case "META-INF/log4j-provider.properties" => MergeStrategy.first
431-
case PathList("org", "apache", "logging", "log4j", "core", "config", "plugins", "Log4j2Plugins.dat") =>
432-
MergeStrategy.first
433-
case x => (assembly / assemblyMergeStrategy).value(x)
434-
},
435-
libraryDependencies ++= commonDependencies ++ Seq(
436-
"org.apache.logging.log4j" % "log4j-api" % log4j2_version,
437-
"org.apache.logging.log4j" % "log4j-core" % log4j2_version,
438-
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2_version
439-
)
423+
Compile / unmanagedResourceDirectories += baseDirectory.value / "src" / "main" / "resources",
424+
425+
libraryDependencies ++= Seq(
426+
"org.apache.logging.log4j" %% "log4j-api-scala" % "13.1.0",
427+
"org.apache.logging.log4j" % "log4j-core" % "2.20.0",
428+
// "org.slf4j" % "slf4j-api" % slf4jApiVersion,
429+
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
430+
),
440431
)
441432

442433
ThisBuild / assemblyMergeStrategy := {

orchestration/README.md

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
2+
3+
# Branch support
4+
5+
We want to support "branches" that allow users to run pipelines and services with two
6+
critical goals:
7+
8+
- don't pollute production datasets and end-points
9+
- are cheap, by re-using as much of existing datasets as possible
10+
11+
## Scenarios within experimentation
12+
13+
While developing on a branch - users could
14+
- make semantic updates - that change output data - eg., logic within where-s, selects, aggregations etc
15+
- make non-semantic updates - that don't change output data - eg., spark exec memory, # of pods / workers etc
16+
- *note that everything that is stored in metadata field within our API's are non-semantic fields*
17+
- add new nodes
18+
- delete existing nodes
19+
- make changes to several compute nodes at once
20+
- decide to merge the branch into master
21+
22+
With this context, the goal of this document is to develop / describe a representation to handle the above user workflows.
23+
24+
25+
## Motivating Example
26+
27+
Legend:
28+
```
29+
"sq" stands for StagingQuery "j" for Join
30+
"t" stands for table "m" for Model
31+
"gb" for GroupBy
32+
```
33+
34+
Nodes will be numbered - `gb4`, `m2` etc
35+
36+
Semantic changes to node notated using a plus "+".
37+
Eg., Join `J3` becomes `J3+`
38+
39+
Non-Semantic changes with an asterisk "*" - `J3*`
40+
41+
42+
```mermaid
43+
---
44+
title: Initial state of the example
45+
---
46+
47+
graph TD;
48+
sq1-->t1;
49+
t1-->gb1;
50+
gb1-->j1;
51+
t2-->gb2;
52+
gb2-->j1;
53+
j1-->m1;
54+
gb1-->j2;
55+
```
56+
57+
### Semantic updates
58+
59+
Say that, `sq1` changes semantically to `sq1+`. It is going to change the output of all
60+
nodes downstream of it.
61+
62+
```mermaid
63+
---
64+
title: sq1 is updated semantically
65+
---
66+
67+
graph TD;
68+
sq1+-->t1+;
69+
t1+-->gb1+;
70+
gb1+-->j1+;
71+
t2-->gb2;
72+
gb2-->j1+;
73+
j1+-->m1+;
74+
gb1+-->j2+;
75+
76+
style sq1+ fill:wheat,color:black,stroke:#333
77+
style t1+ fill:wheat,color:black,stroke:#333
78+
style gb1+ fill:wheat,color:black,stroke:#333
79+
style j1+ fill:wheat,color:black,stroke:#333
80+
style j2+ fill:wheat,color:black,stroke:#333
81+
style m1+ fill:wheat,color:black,stroke:#333
82+
```
83+
84+
> A major concern here is that, if the local repository of the user is behind remote,
85+
> we will a lot more changes than the user intends to.
86+
87+
One approach to mitigate this is to, only make the CLI only pick up changes to files listed as edited by
88+
commits to the git branch.
89+
90+
Another approach is to force user to rebase on any change to the repo. However, this does not
91+
guarantee that changes while the job is running is accounted for.
92+
93+
### Non Semantic updates
94+
Instead, if `sq1` changes non-semantically to `sq1-`. None of the downstream nodes would change.
95+
96+
```mermaid
97+
---
98+
title: sq1 is updated non-semantically
99+
---
100+
101+
graph TD;
102+
sq1*-->t1;
103+
t1-->gb1;
104+
gb1-->j1;
105+
t2-->gb2;
106+
gb2-->j1;
107+
j1-->m1;
108+
gb1-->j2;
109+
110+
style sq1* fill:lavender,color:black,stroke:#333
111+
```
112+
113+
Depending on who is running the job we need to decide which version of the node to use
114+
- if the branch author is causing the node to be computed we need to use `sq1*` instead of `sq1`
115+
- if the prod flow or other authors who haven't updated `sq1` are causing the compute, we should use `sq1`
116+
- if another branch is also updating `sq1` non semantically to `sq1**` we need to use that instead.
117+
118+
### Adding new nodes
119+
120+
Adding new leaf nodes will not impact any of the existing nodes.
121+
122+
```mermaid
123+
---
124+
title: m2 is added
125+
---
126+
127+
graph TD;
128+
sq1-->t1;
129+
t1-->gb1;
130+
gb1-->j1;
131+
t2-->gb2;
132+
gb2-->j1;
133+
j1-->m1;
134+
gb1-->j2;
135+
j2-->m2;
136+
137+
style m2 fill:lightgreen,color:black,stroke:#333
138+
```
139+
140+
141+
But adding non-leaf node -
142+
as parent to existing node - would almost always cause semantic updates to nodes downstream.
143+
144+
```mermaid
145+
---
146+
title: gb3 is added
147+
---
148+
149+
graph TD;
150+
sq1-->t1;
151+
t1-->gb1;
152+
gb1-->j1+;
153+
t2-->gb2;
154+
t3-->gb3;
155+
gb2-->j1+;
156+
gb3-->j1+;
157+
j1+-->m1+;
158+
gb1-->j2;
159+
160+
style t3 fill:lightgreen,color:black,stroke:#333
161+
style gb3 fill:lightgreen,color:black,stroke:#333
162+
style j1+ fill:wheat,color:black,stroke:#333
163+
style m1+ fill:wheat,color:black,stroke:#333
164+
```
165+
166+
One interesting case here is migrating the sql from an external system to StagingQuery of an
167+
already used table. Even though this is not a leaf node, absorbing it as same as a leaf node change
168+
would be the right thing to do.
169+
170+
171+
```mermaid
172+
---
173+
title: sq2 is added
174+
---
175+
176+
graph TD;
177+
sq1-->t1;
178+
t1-->gb1;
179+
gb1-->j1;
180+
t2-->gb2;
181+
gb2-->j1;
182+
j1-->m1;
183+
gb1-->j2;
184+
sq2-->t2;
185+
186+
style sq2 fill:lightgreen,color:black,stroke:#333
187+
```
188+
189+
### Deleting existing nodes
190+
191+
Deleting leaf nodes is straight forward. We just need to program a cleanup mechanism
192+
to remove data and pipelines generated by that node.
193+
194+
```mermaid
195+
---
196+
title: m1 is deleted
197+
---
198+
199+
graph TD;
200+
sq1-->t1;
201+
t1-->gb1;
202+
gb1-->j1;
203+
t2-->gb2;
204+
gb2-->j1;
205+
j1-->m1;
206+
gb1-->j2;
207+
sq2-->t2;
208+
209+
style m1 fill:coral,color:black,stroke:#333
210+
```
211+
212+
Indirectly connected components - via table references - shouldn't be allowed to be deleted
213+
as long as there are nodes that depend on the table. We will fail this during the sync step.
214+
215+
```mermaid
216+
---
217+
title: sq1 is deleted (not-allowed)
218+
---
219+
220+
graph TD;
221+
sq1-->t1;
222+
t1-->gb1;
223+
gb1-->j1;
224+
t2-->gb2;
225+
gb2-->j1;
226+
j1-->m1;
227+
gb1-->j2;
228+
sq2-->t2;
229+
230+
style sq1 fill:coral,color:black,stroke:#333
231+
```
232+
233+
Directly connected parents when deleted will have updates in the child node - or the compilation
234+
would fail. In these cases it would be ideal to garbage collect upstream chain of the deleted node.
235+
236+
```mermaid
237+
---
238+
title: gb2 is deleted, j1 is updated
239+
---
240+
241+
graph TD;
242+
sq2-->t2;
243+
sq1-->t1;
244+
t1-->gb1;
245+
gb1-->j1+;
246+
t2-->gb2;
247+
gb2-->j1+
248+
j1+-->m1+;
249+
gb1-->j2;
250+
251+
style sq2 fill:coral,color:black,stroke:#333
252+
style t2 fill:coral,color:black,stroke:#333
253+
style gb2 fill:coral,color:black,stroke:#333
254+
style j1+ fill:wheat,color:black,stroke:#333
255+
style m1+ fill:wheat,color:black,stroke:#333
256+
```
257+
258+
### Isolating the changed assets
259+
260+
While development on a branch is in progress we need to create temporary data assets for
261+
the semantically changed nodes - shown in yellow above. Adds, Deletes & Semantic Updates could
262+
trigger this flow.
263+
264+
265+
#### Logic to achieve isolation
266+
Make a new copy of the conf & update the name (file name & metadata.name) -
267+
268+
`new_name = old_name + '_' + branch_name`
269+
270+
This needs to be followed by changing references in the downstream nodes -
271+
all tables and nodes downstream will have the branch suffix.
272+
273+
```
274+
# 1. cli sends file_hash_map to remote
275+
276+
local_file_map = repo.compiled.file_hash_map
277+
remote_file_map = remote.file_map
278+
deleted = remote_file_map - local_file_map
279+
added = local_file_map - remote_file_map
280+
updated = [k in intersect(local_file_map, remote_file_map)]
281+
# 2. remote marks the changed files it needs
282+
283+
(node, lineage_hash) =>
284+
```
285+
286+
### Merging changes into `main`
287+
288+
- Deletes should actually trigger asset and pipeline clean up.
289+
- Updates should trigger asset renaming
290+
- Adds can work as is - we are not suffixing adds.

orchestration/src/main/resources/log4j2.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ appender.console.type = Console
77
appender.console.name = console
88
appender.console.target = SYSTEM_OUT
99
appender.console.layout.type = PatternLayout
10-
appender.console.layout.pattern = %yellow{%d{yyyy/MM/dd HH:mm:ss}} %highlight{%-5level} %green{%file:%line} - %message%n
10+
appender.console.layout.pattern = %cyan{%d{yyyy/MM/dd HH:mm:ss}} %highlight{%-5level} %magenta{%file:%line} - %message%n
1111

1212
# Configure specific logger
1313
logger.chronon.name = ai.chronon

0 commit comments

Comments
 (0)