-
Notifications
You must be signed in to change notification settings - Fork 0
refactor: create planner.thrift + move planner entities #784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis change refactors the planning system by removing legacy planner classes and node types, introducing new unified planner abstractions and Thrift schemas. It migrates all node handling to a new Changes
Sequence Diagram(s)sequenceDiagram
participant Planner
participant Node
participant ThriftObj
Planner->>ThriftObj: Deep copy & clear metadata/execution info
Planner->>Node: toNode(metaData, contentSetter, ThriftObj)
Node-->>Planner: Node with semanticHash, content, metaData
Note over Planner,Node: All planning flows now use unified Node interface
Possibly related PRs
Suggested reviewers
Poem
Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
api/thrift/planner.thrift (1)
57-75
:NodeContent
union covers all node types.
Consider resolving the TODO for metrics nodes.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (13)
api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala
(1 hunks)api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala
(7 hunks)api/thrift/orchestration.thrift
(0 hunks)api/thrift/planner.thrift
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/Join.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/JoinBase.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala
(1 hunks)
💤 Files with no reviewable changes (1)
- api/thrift/orchestration.thrift
⏰ Context from checks skipped due to timeout of 90000ms (30)
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: service_commons_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: online_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: join_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: fetcher_tests
- GitHub Check: flink_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (28)
spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala (1)
7-7
: Import moved to planner package
Replaces orchestration import withai.chronon.planner.JoinMergeNode
, matching the new Thrift schema.api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala (1)
4-4
: Import moved to planner package
Updated toai.chronon.planner.GroupByBackfillNode
in line with the planner refactoring.spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (1)
6-6
: Import moved to planner package
Switched toai.chronon.planner.SourceWithFilterNode
to reflect new node definitions.spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala (1)
7-7
: Import moved to planner package
Now importsai.chronon.planner.JoinBootstrapNode
per Thrift migration.spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1)
7-7
: Import moved to planner package
Changed toai.chronon.planner.JoinDerivationNode
aligning with the planner.thrift definitions.spark/src/main/scala/ai/chronon/spark/Join.scala (1)
26-26
: Import updated correctly. Theplanner
package import forJoinBootstrapNode
andJoinPartNode
aligns with the relocated Thrift definitions.spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
8-8
: Import updated correctly. The switch toai.chronon.planner.JoinPartNode
matches the new package.spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)
24-24
: Import updated correctly. TheJoinBootstrapNode
import now points to theplanner
package as expected.spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (1)
9-13
: Imports updated correctly. All join node classes are now imported fromai.chronon.planner
, matching the refactoring.api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala (7)
8-8
: Import updated correctly. The Thrift node classes import from the newplanner
package is in place.
31-31
: Node type literal updated. The metadata layering call now uses the hardcoded"left_source"
label.
53-53
: Node type literal updated. The metadata layering call now uses the hardcoded"bootstrap"
label.
86-86
: Node type literal updated. The metadata layering call now uses the hardcoded"right_part"
label.
120-120
: Node type literal updated. The metadata layering call now uses the hardcoded"merge"
label.
138-138
: Node type literal updated. The metadata layering call now uses the hardcoded"derive"
label.
174-174
: Node type literal updated. The metadata layering call now uses the hardcoded"label_join"
label.spark/src/main/scala/ai/chronon/spark/Driver.scala (2)
27-27
: Imports updated correctly to planner package.
Matches new Thrift definitions.
833-833
: Instantiates newSourceWithFilterNode
from planner.
Follows refactored package.api/thrift/planner.thrift (10)
1-6
: Namespace and includes look correct.
Definesai.chronon.planner
for Python/Java and includes dependencies.
7-12
:SourceWithFilterNode
struct is well defined.
Fields match code usage.
14-17
:JoinBootstrapNode
struct is straightforward.
No concerns.
19-22
:JoinMergeNode
struct is correct.
Aligns with MergeJobRun.
24-27
:JoinDerivationNode
struct is fine.
Fields consistent.
29-35
:JoinPartNode
struct fields look accurate.
skewKeys
type matches Scala use.
37-40
:LabelJoinNode
struct is valid.
Matches LabelJoin code.
42-45
:GroupByBackfillNode
struct is defined correctly.
47-50
:GroupByUploadNode
struct is correct.
52-55
:GroupByStreamingNode
struct is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (5)
api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala (1)
5-5
: Remove unused imports.
Node
,NodeContent
, andSourceWithFilterNode
are never referenced in this file; they will trigger compiler warnings.-import ai.chronon.planner.{Node, NodeContent, SourceWithFilterNode} +// no planner-side classes referenced hereapi/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (1)
6-6
: Drop the unusedTry
import.It isn’t referenced anywhere in this class.
api/src/main/scala/ai/chronon/api/planner/Planner.scala (1)
15-25
: Shadowed type parameter can confuse readers.
toNode
introduces a newT
that hides the outer one. Consider renaming, e.g.N <: TBase[_, _]
, to avoid mental friction.api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala (2)
74-77
: Dead localcontent
.
val content = new NodeContent()
is created but never used; delete to reduce noise.
255-261
: DuplicateunsetNestedMetadata
logic.The same helper exists inside the class; keep one definition to avoid divergence.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (11)
api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala
(0 hunks)api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala
(1 hunks)api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala
(8 hunks)api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala
(1 hunks)api/src/main/scala/ai/chronon/api/planner/Planner.scala
(1 hunks)api/thrift/planner.thrift
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/Join.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
(4 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala
(3 hunks)
💤 Files with no reviewable changes (1)
- api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala
✅ Files skipped from review due to trivial changes (1)
- spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
🚧 Files skipped from review as they are similar to previous changes (5)
- spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
- spark/src/main/scala/ai/chronon/spark/Driver.scala
- spark/src/main/scala/ai/chronon/spark/Join.scala
- spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala
- api/thrift/planner.thrift
🧰 Additional context used
🧬 Code Graph Analysis (1)
api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala (5)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (1)
foreach
(13-21)api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (2)
TableDependencies
(7-180)fromSource
(101-148)api/src/main/scala/ai/chronon/api/planner/Planner.scala (1)
toNode
(15-25)api/src/main/scala/ai/chronon/api/planner/MetaDataUtils.scala (2)
MetaDataUtils
(28-105)layer
(30-67)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
outputTable
(160-160)
⏰ Context from checks skipped due to timeout of 90000ms (29)
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: online_tests
- GitHub Check: groupby_tests
- GitHub Check: online_tests
- GitHub Check: spark_tests
- GitHub Check: service_commons_tests
- GitHub Check: analyzer_tests
- GitHub Check: flink_tests
- GitHub Check: join_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: fetcher_tests
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: flink_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: aggregator_tests
🔇 Additional comments (2)
api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (1)
43-44
:buildPlan
still a stub.Leaving
???
will blow up at runtime. Either implement or throw a clearUnsupportedOperationException
with context.api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala (1)
246-250
:buildPlan
incomplete.
result.setNodes()
without arguments is invalid; flesh this out or throw a clear exception.
def streamingNode: Option[Node] = groupBy.streamingSource.map { _ => | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamingNode
is empty – code will not compile.
The map
body is missing; the method currently returns Unit
, violating its declared type Option[Node]
.
- def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>
-
- }
+ // TODO: flesh out streaming node generation
+ def streamingNode: Option[Node] = None
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def streamingNode: Option[Node] = groupBy.streamingSource.map { _ => | |
} | |
// TODO: flesh out streaming node generation | |
def streamingNode: Option[Node] = None |
🤖 Prompt for AI Agents
In api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala at lines 39 to
41, the streamingNode method uses map on streamingSource but the map body is
empty, causing it to return Unit instead of Option[Node]. To fix this, implement
the map function to return a valid Node instance based on the streamingSource
element, ensuring the method returns an Option[Node] as declared.
override def onlineNodes: Seq[Node] = { | ||
// depends on all | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onlineNodes
has no implementation – breaks compilation.
It currently returns Unit
, not Seq[Node]
.
- override def onlineNodes: Seq[Node] = {
- // depends on all
- }
+ override def onlineNodes: Seq[Node] = Seq.empty // TODO implement
🤖 Prompt for AI Agents
In api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala around lines 241
to 244, the method onlineNodes is declared to return Seq[Node] but currently has
no implementation and returns Unit, causing a compilation error. Implement the
method to return a valid Seq[Node] as expected, either by providing the actual
logic to retrieve online nodes or returning an appropriate placeholder such as
an empty sequence to satisfy the return type.
Summary
Checklist
Summary by CodeRabbit
New Features
Refactor
Chores