Skip to content

refactor: create planner.thrift + move planner entities #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

45 changes: 45 additions & 0 deletions api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package ai.chronon.api.planner
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.{DataModel, GroupBy, MetaData, PartitionSpec, TableDependency, ThriftJsonCodec}
import ai.chronon.planner.{ConfPlan, GroupByBackfillNode, Node, NodeContent}

import scala.util.Try

class GroupByPlanner(groupBy: GroupBy)(implicit outputPartitionSpec: PartitionSpec)
extends Planner[GroupBy](groupBy)(outputPartitionSpec) {

private def tableDeps: Seq[TableDependency] = TableDependencies.fromGroupBy(groupBy)

private def effectiveStepDays: Int = {
val defaultStepDays = if (groupBy.dataModel == DataModel.EVENTS) 15 else 1
val configuredStepDaysOpt = Option(groupBy.metaData.executionInfo).flatMap(e => Option(e.stepDays))
configuredStepDaysOpt.getOrElse(defaultStepDays)
}

// execInfo can be heavy - and we don't want to duplicate it
private def eraseExecutionInfo: GroupBy = {
val result = groupBy.deepCopy()
result.metaData.unsetExecutionInfo()
result
}

def backfillNode: Node = {

val metaData = MetaDataUtils.layer(groupBy.metaData,
"backfill",
groupBy.metaData.name + "/backfill",
tableDeps,
Some(effectiveStepDays))

val node = new GroupByBackfillNode().setGroupBy(eraseExecutionInfo)

toNode(metaData, _.setGroupByBackfill(node), eraseExecutionInfo)
}

def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>

}
Comment on lines +39 to +41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

streamingNode is empty – code will not compile.

The map body is missing; the method currently returns Unit, violating its declared type Option[Node].

-  def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>
-
-  }
+  // TODO: flesh out streaming node generation
+  def streamingNode: Option[Node] = None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>
}
// TODO: flesh out streaming node generation
def streamingNode: Option[Node] = None
🤖 Prompt for AI Agents
In api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala at lines 39 to
41, the streamingNode method uses map on streamingSource but the map body is
empty, causing it to return Unit instead of Option[Node]. To fix this, implement
the map function to return a valid Node instance based on the streamingSource
element, ensuring the method returns an Option[Node] as declared.


override def buildPlan: ConfPlan = ???
}

Loading
Loading