Skip to content

Commit 6e4eba1

Browse files
committed
WIP - introducing confNode
1 parent 0c7e5d2 commit 6e4eba1

File tree

11 files changed

+198
-198
lines changed

11 files changed

+198
-198
lines changed

api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala

Lines changed: 0 additions & 43 deletions
This file was deleted.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package ai.chronon.api.planner
2+
import ai.chronon.api.Extensions.GroupByOps
3+
import ai.chronon.api.{DataModel, GroupBy, MetaData, PartitionSpec, TableDependency, ThriftJsonCodec}
4+
import ai.chronon.planner.{ConfPlan, GroupByBackfillNode, Node, NodeContent}
5+
6+
import scala.util.Try
7+
8+
class GroupByPlanner(groupBy: GroupBy)(implicit outputPartitionSpec: PartitionSpec)
9+
extends Planner[GroupBy](groupBy)(outputPartitionSpec) {
10+
11+
private def tableDeps: Seq[TableDependency] = TableDependencies.fromGroupBy(groupBy)
12+
13+
private def effectiveStepDays: Int = {
14+
val defaultStepDays = if (groupBy.dataModel == DataModel.EVENTS) 15 else 1
15+
val configuredStepDaysOpt = Option(groupBy.metaData.executionInfo).flatMap(e => Option(e.stepDays))
16+
configuredStepDaysOpt.getOrElse(defaultStepDays)
17+
}
18+
19+
// execInfo can be heavy - and we don't want to duplicate it
20+
private def eraseExecutionInfo: GroupBy = {
21+
val result = groupBy.deepCopy()
22+
result.metaData.unsetExecutionInfo()
23+
result
24+
}
25+
26+
def backfillNode: Node = {
27+
28+
val metaData = MetaDataUtils.layer(groupBy.metaData,
29+
"backfill",
30+
groupBy.metaData.name + "/backfill",
31+
tableDeps,
32+
Some(effectiveStepDays))
33+
34+
val node = new GroupByBackfillNode().setGroupBy(eraseExecutionInfo)
35+
36+
toNode(metaData, _.setGroupByBackfill(node), eraseExecutionInfo)
37+
}
38+
39+
def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>
40+
41+
}
42+
43+
override def buildPlan: ConfPlan = ???
44+
}
45+

api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala renamed to api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala

Lines changed: 87 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,33 @@ package ai.chronon.api.planner
33
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps}
44
import ai.chronon.api.ScalaJavaConversions.{IterableOps, IteratorOps}
55
import ai.chronon.api._
6-
import ai.chronon.api.planner.JoinOfflinePlanner._
7-
import ai.chronon.api.planner.GroupByOfflinePlanner._
86
import ai.chronon.planner._
97

10-
import scala.collection.mutable
11-
import scala.language.{implicitConversions, reflectiveCalls}
128
import scala.collection.Seq
9+
import scala.language.{implicitConversions, reflectiveCalls}
1310

14-
class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec)
11+
class JoinPlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec)
1512
extends Planner[Join](join)(outputPartitionSpec) {
1613

17-
val leftSourceNode: SourceWithFilterNode = {
14+
// will mutate the join in place - use on deepCopy-ied objects only
15+
private def unsetNestedMetadata(join: Join): Unit = {
16+
join.unsetMetaData()
17+
Option(join.joinParts).foreach(_.iterator().toScala.foreach(_.groupBy.unsetMetaData()))
18+
Option(join.labelParts).foreach(_.labels.iterator().toScala.foreach(_.groupBy.unsetMetaData()))
19+
join.unsetOnlineExternalParts()
20+
}
21+
22+
private def joinWithoutExecutionInfo: Join = {
23+
val copied = join.deepCopy()
24+
copied.metaData.unsetExecutionInfo()
25+
Option(copied.joinParts).foreach(_.iterator().toScala.foreach(_.groupBy.metaData.unsetExecutionInfo()))
26+
Option(copied.labelParts).foreach(_.labels.iterator().toScala.foreach(_.groupBy.metaData.unsetExecutionInfo()))
27+
copied.unsetOnlineExternalParts()
28+
copied
29+
}
30+
31+
val leftSourceNode: Node = {
32+
1833
val left = join.left
1934
val result = new SourceWithFilterNode()
2035
.setSource(left)
@@ -34,12 +49,12 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
3449
stepDays = Some(1)
3550
)
3651

37-
result.setMetaData(metaData)
52+
toNode(metaData, _.setSourceWithFilter(result), result)
3853
}
3954

40-
val bootstrapNodeOpt: Option[JoinBootstrapNode] = Option(join.bootstrapParts).map { bootstrapParts =>
55+
private val bootstrapNodeOpt: Option[Node] = Option(join.bootstrapParts).map { bootstrapParts =>
4156
val result = new JoinBootstrapNode()
42-
.setJoin(join)
57+
.setJoin(joinWithoutExecutionInfo)
4358

4459
// bootstrap tables are unfortunately unique to the join - can't be re-used if a new join part is added
4560
val bootstrapNodeName = join.metaData.name + "/boostrap"
@@ -56,12 +71,25 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
5671
stepDays = Some(1)
5772
)
5873

59-
result.setMetaData(metaData)
74+
val content = new NodeContent()
75+
content.setJoinBootstrap(result)
76+
77+
val copy = result.deepCopy()
78+
unsetNestedMetadata(copy.join)
79+
80+
toNode(metaData, _.setJoinBootstrap(result), copy)
81+
}
82+
83+
private def copyAndEraseExecutionInfo(joinPart: JoinPart): JoinPart = {
84+
val copy = joinPart.deepCopy()
85+
copy.groupBy.metaData.unsetExecutionInfo()
86+
copy
6087
}
6188

62-
private def buildJoinPartNode(joinPart: JoinPart): JoinPartNode = {
89+
private def buildJoinPartNode(joinPart: JoinPart): Node = {
90+
6391
val result = new JoinPartNode()
64-
.setJoinPart(joinPart)
92+
.setJoinPart(copyAndEraseExecutionInfo(joinPart))
6593
.setLeftDataModel(join.left.dataModel)
6694
.setLeftSourceTable(leftSourceNode.metaData.outputTable)
6795

@@ -76,7 +104,7 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
76104
.map(_.stepDays)
77105
.getOrElse(joinPart.groupBy.dataModel match {
78106
case DataModel.ENTITIES => 1
79-
case DataModel.EVENTS => 15
107+
case DataModel.EVENTS => 15
80108
})
81109

82110
// pull conf params from the groupBy metadata, but use the join namespace to write to.
@@ -90,12 +118,15 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
90118
)
91119
.setOutputNamespace(join.metaData.outputNamespace)
92120

93-
result.setMetaData(metaData)
121+
val copy = result.deepCopy()
122+
copy.joinPart.groupBy.unsetMetaData()
123+
124+
toNode(metaData, _.setJoinPart(result), copy)
94125
}
95126

96-
private val joinPartNodes: Seq[JoinPartNode] = join.joinParts.toScala.map { buildJoinPartNode }.toSeq
127+
private val joinPartNodes: Seq[Node] = join.joinParts.toScala.map { buildJoinPartNode }.toSeq
97128

98-
val mergeNode: JoinMergeNode = {
129+
val mergeNode: Node = {
99130
val result = new JoinMergeNode()
100131
.setJoin(join)
101132

@@ -124,10 +155,16 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
124155
stepDays = Some(1)
125156
)
126157

127-
result.setMetaData(metaData)
158+
val copy = result.deepCopy()
159+
unsetNestedMetadata(copy.join)
160+
copy.join.unsetDerivations()
161+
copy.join.unsetLabelParts()
162+
163+
toNode(metaData, _.setJoinMerge(result), copy)
128164
}
129165

130-
private val derivationNodeOpt: Option[JoinDerivationNode] = Option(join.derivations).map { _ =>
166+
private val derivationNodeOpt: Option[Node] = Option(join.derivations).map { _ =>
167+
131168
val result = new JoinDerivationNode()
132169
.setJoin(join)
133170

@@ -142,7 +179,11 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
142179
stepDays = Some(1)
143180
)
144181

145-
result.setMetaData(metaData)
182+
val copy = result.deepCopy()
183+
unsetNestedMetadata(copy.join)
184+
copy.join.unsetLabelParts()
185+
186+
toNode(metaData, _.setJoinDerivation(result), copy)
146187
}
147188

148189
// these need us to additionally (groupBy backfill) generate the snapshot tables
@@ -156,7 +197,7 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
156197
)
157198
.getOrElse(Array.empty)
158199

159-
private val labelJoinNodeOpt: Option[LabelJoinNode] = Option(join.labelParts).map { labelParts =>
200+
private val labelJoinNodeOpt: Option[Node] = Option(join.labelParts).map { labelParts =>
160201
val result = new LabelJoinNode()
161202
.setJoin(join)
162203

@@ -177,112 +218,46 @@ class JoinOfflinePlanner(join: Join)(implicit outputPartitionSpec: PartitionSpec
177218
stepDays = Some(1)
178219
)
179220

180-
result.setMetaData(metaData)
181-
}
221+
val copy = result.deepCopy()
222+
unsetNestedMetadata(result.join)
182223

183-
override def offlineNodes: Seq[PlanNode] = {
184-
val result: mutable.ArrayBuffer[PlanNode] = mutable.ArrayBuffer.empty[PlanNode]
224+
toNode(metaData, _.setLabelJoin(result), copy)
225+
}
185226

186-
result.append(leftSourceNode)
187-
bootstrapNodeOpt.foreach(bn => result.append(bn))
188-
joinPartNodes.foreach(jpn => result.append(jpn))
227+
def offlineNodes: Seq[Node] = {
228+
229+
Seq(leftSourceNode) ++
230+
bootstrapNodeOpt ++
231+
joinPartNodes ++
232+
Seq(mergeNode) ++
233+
derivationNodeOpt ++
234+
snapshotLabelParts.map(_.groupBy).map{
235+
gb => new GroupByPlanner(gb).backfillNode
236+
} ++
237+
labelJoinNodeOpt
238+
}
189239

190-
result.append(mergeNode)
191-
derivationNodeOpt.foreach(dn => result.append(dn))
192-
snapshotLabelParts.foreach(lp => result.append(lp.groupBy))
193-
labelJoinNodeOpt.foreach(ljn => result.append(ljn))
194240

195-
result
241+
override def onlineNodes: Seq[Node] = {
242+
// depends on all
196243
}
197244

198-
override def onlineNodes: Seq[PlanNode] = ???
245+
246+
override def buildPlan: ConfPlan = {
247+
248+
val result = new ConfPlan()
249+
result.setNodes()
250+
}
199251
}
200252

201-
object JoinOfflinePlanner {
253+
object JoinPlanner {
202254

255+
// will mutate the join in place - use on deepCopy-ied objects only
203256
private def unsetNestedMetadata(join: Join): Unit = {
204257
join.unsetMetaData()
205258
Option(join.joinParts).foreach(_.iterator().toScala.foreach(_.groupBy.unsetMetaData()))
206259
Option(join.labelParts).foreach(_.labels.iterator().toScala.foreach(_.groupBy.unsetMetaData()))
207260
join.unsetOnlineExternalParts()
208261
}
209262

210-
implicit class LabelJoinNodeIsPlanNode(node: LabelJoinNode) extends PlanNode {
211-
override def metaData: MetaData = node.metaData
212-
override def contents: Any = node
213-
override def semanticHash: String = ThriftJsonCodec.hexDigest({
214-
val result = node.deepCopy()
215-
result.unsetMetaData()
216-
unsetNestedMetadata(result.join)
217-
result
218-
})
219-
}
220-
221-
implicit class JoinDerivationNodeIsPlanNode(node: JoinDerivationNode) extends PlanNode {
222-
override def metaData: MetaData = node.metaData
223-
override def contents: Any = node
224-
override def semanticHash: String = ThriftJsonCodec.hexDigest({
225-
val result = node.deepCopy()
226-
result.unsetMetaData()
227-
unsetNestedMetadata(result.join)
228-
result.join.unsetLabelParts()
229-
result
230-
})
231-
}
232-
233-
implicit class JoinMergeNodeIsPlanNode(node: JoinMergeNode) extends PlanNode {
234-
override def metaData: MetaData = node.metaData
235-
override def contents: Any = node
236-
override def semanticHash: String = ThriftJsonCodec.hexDigest({
237-
val result = node.deepCopy()
238-
result.unsetMetaData()
239-
unsetNestedMetadata(result.join)
240-
result.join.unsetDerivations()
241-
result.join.unsetLabelParts()
242-
result
243-
})
244-
}
245-
246-
implicit class JoinPartNodeIsPlanNode(node: JoinPartNode) extends PlanNode {
247-
override def metaData: MetaData = node.metaData
248-
override def contents: Any = node
249-
override def semanticHash: String = ThriftJsonCodec.hexDigest({
250-
val result = node.deepCopy()
251-
result.unsetMetaData()
252-
result.joinPart.groupBy.unsetMetaData()
253-
result
254-
})
255-
}
256-
257-
implicit class JoinBootstrapNodeIsPlanNode(node: JoinBootstrapNode) extends PlanNode {
258-
override def metaData: MetaData = node.metaData
259-
override def contents: Any = node
260-
override def semanticHash: String = ThriftJsonCodec.hexDigest({
261-
val result = node.deepCopy()
262-
result.unsetMetaData()
263-
unsetNestedMetadata(result.join)
264-
result
265-
})
266-
}
267-
268-
implicit class SourceWithFilterNodeIsPlanNode(node: SourceWithFilterNode) extends PlanNode {
269-
override def metaData: MetaData = node.metaData
270-
override def contents: Any = node
271-
override def semanticHash: String = ThriftJsonCodec.hexDigest({
272-
val result = node.deepCopy()
273-
result.unsetMetaData()
274-
result
275-
})
276-
}
277-
278-
implicit class JoinIsPlanNode(node: Join) extends PlanNode {
279-
override def metaData: MetaData = node.metaData
280-
override def contents: Any = node
281-
override def semanticHash: String = ThriftJsonCodec.hexDigest({
282-
val result = node.deepCopy()
283-
unsetNestedMetadata(node)
284-
result
285-
})
286-
}
287-
288263
}

0 commit comments

Comments
 (0)