Skip to content

refactor: create planner.thrift + move planner entities #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

nikhil-zlai
Copy link
Contributor

@nikhil-zlai nikhil-zlai commented May 16, 2025

Summary

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced new planning capabilities for group-by and join operations, with dedicated planners for each.
    • Added a new schema definition for planner nodes, supporting enhanced configuration and orchestration.
  • Refactor

    • Standardized node creation and metadata handling for join and group-by planners.
    • Unified planner node types and their management across the system.
  • Chores

    • Updated imports and internal references to align with the new planner structure.
    • Removed outdated planner and node definitions to streamline codebase and schemas.

Copy link

coderabbitai bot commented May 16, 2025

Walkthrough

This change refactors the planning system by removing legacy planner classes and node types, introducing new unified planner abstractions and Thrift schemas. It migrates all node handling to a new Node interface, updates metadata management, and standardizes node construction and hashing. Imports and method signatures are updated accordingly.

Changes

File(s) Change Summary
api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala Removed the GroupByOfflinePlanner class and its companion object, including all related logic and implicit conversions.
api/thrift/orchestration.thrift Removed all modular join and group-by node struct/union definitions (SourceWithFilterNode, JoinBootstrapNode, etc.) and the NodeUnion union.
api/thrift/planner.thrift Added new Thrift file defining unified planner node types, a NodeContent union, Node struct, Mode enum, and ConfPlan struct.
api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala Introduced new GroupByPlanner class for planning GroupBy objects, with methods for backfill and streaming nodes, and plan building.
api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala Renamed and refactored JoinOfflinePlanner to JoinPlanner; standardized node creation, metadata handling, and node typing; removed implicit PlanNode wrappers.
api/src/main/scala/ai/chronon/api/planner/Planner.scala Refactored Planner abstraction: removed old abstract node methods, added buildPlan, new toNode helper, and Mode case class.
api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala Removed PlanNode trait and related methods; renamed companion object to LocalRunner; updated imports.
spark/src/main/scala/ai/chronon/spark/Driver.scala
spark/src/main/scala/ai/chronon/spark/Join.scala
spark/src/main/scala/ai/chronon/spark/JoinBase.scala
spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala
spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala
spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala
Updated imports to use new ai.chronon.planner node types instead of deprecated ai.chronon.orchestration types; updated node instantiations and metadata passing as needed.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala
Changed JoinPartJob constructor to require explicit metaData parameter; removed .setMetaData() usage; updated all usages accordingly.
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala Replaced JoinOfflinePlanner with JoinPlanner in utility methods; updated usage to match new class and method signatures.

Sequence Diagram(s)

sequenceDiagram
    participant Planner
    participant Node
    participant ThriftObj

    Planner->>ThriftObj: Deep copy & clear metadata/execution info
    Planner->>Node: toNode(metaData, contentSetter, ThriftObj)
    Node-->>Planner: Node with semanticHash, content, metaData
    Note over Planner,Node: All planning flows now use unified Node interface
Loading

Possibly related PRs

  • zipline-ai/chronon#739: Introduced the original GroupByOfflinePlanner and JoinOfflinePlanner classes now being removed and refactored here.

Suggested reviewers

  • varant-zlai

Poem

Out with the old, in with the Node,
Planners refactored, a lighter load!
Metadata cleaned, hashes anew,
Thrift files gleam with structs brand new.
🎩✨
Onward we plan, with logic so grand—
Unified, simplified, just as we planned!

Note

⚡️ AI Code Reviews for VS Code, Cursor, Windsurf

CodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback.
Learn more here.


Note

⚡️ Faster reviews with caching

CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.
Enjoy the performance boost—your workflow just got faster.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
api/thrift/planner.thrift (1)

57-75: NodeContent union covers all node types.
Consider resolving the TODO for metrics nodes.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 76f83d5 and 0c7e5d2.

📒 Files selected for processing (13)
  • api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala (7 hunks)
  • api/thrift/orchestration.thrift (0 hunks)
  • api/thrift/planner.thrift (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • api/thrift/orchestration.thrift
⏰ Context from checks skipped due to timeout of 90000ms (30)
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: flink_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: flink_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: batch_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (28)
spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala (1)

7-7: Import moved to planner package
Replaces orchestration import with ai.chronon.planner.JoinMergeNode, matching the new Thrift schema.

api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala (1)

4-4: Import moved to planner package
Updated to ai.chronon.planner.GroupByBackfillNode in line with the planner refactoring.

spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (1)

6-6: Import moved to planner package
Switched to ai.chronon.planner.SourceWithFilterNode to reflect new node definitions.

spark/src/main/scala/ai/chronon/spark/batch/JoinBootstrapJob.scala (1)

7-7: Import moved to planner package
Now imports ai.chronon.planner.JoinBootstrapNode per Thrift migration.

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1)

7-7: Import moved to planner package
Changed to ai.chronon.planner.JoinDerivationNode aligning with the planner.thrift definitions.

spark/src/main/scala/ai/chronon/spark/Join.scala (1)

26-26: Import updated correctly. The planner package import for JoinBootstrapNode and JoinPartNode aligns with the relocated Thrift definitions.

spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)

8-8: Import updated correctly. The switch to ai.chronon.planner.JoinPartNode matches the new package.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)

24-24: Import updated correctly. The JoinBootstrapNode import now points to the planner package as expected.

spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (1)

9-13: Imports updated correctly. All join node classes are now imported from ai.chronon.planner, matching the refactoring.

api/src/main/scala/ai/chronon/api/planner/JoinOfflinePlanner.scala (7)

8-8: Import updated correctly. The Thrift node classes import from the new planner package is in place.


31-31: Node type literal updated. The metadata layering call now uses the hardcoded "left_source" label.


53-53: Node type literal updated. The metadata layering call now uses the hardcoded "bootstrap" label.


86-86: Node type literal updated. The metadata layering call now uses the hardcoded "right_part" label.


120-120: Node type literal updated. The metadata layering call now uses the hardcoded "merge" label.


138-138: Node type literal updated. The metadata layering call now uses the hardcoded "derive" label.


174-174: Node type literal updated. The metadata layering call now uses the hardcoded "label_join" label.

spark/src/main/scala/ai/chronon/spark/Driver.scala (2)

27-27: Imports updated correctly to planner package.
Matches new Thrift definitions.


833-833: Instantiates new SourceWithFilterNode from planner.
Follows refactored package.

api/thrift/planner.thrift (10)

1-6: Namespace and includes look correct.
Defines ai.chronon.planner for Python/Java and includes dependencies.


7-12: SourceWithFilterNode struct is well defined.
Fields match code usage.


14-17: JoinBootstrapNode struct is straightforward.
No concerns.


19-22: JoinMergeNode struct is correct.
Aligns with MergeJobRun.


24-27: JoinDerivationNode struct is fine.
Fields consistent.


29-35: JoinPartNode struct fields look accurate.
skewKeys type matches Scala use.


37-40: LabelJoinNode struct is valid.
Matches LabelJoin code.


42-45: GroupByBackfillNode struct is defined correctly.


47-50: GroupByUploadNode struct is correct.


52-55: GroupByStreamingNode struct is fine.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala (1)

5-5: Remove unused imports.

Node, NodeContent, and SourceWithFilterNode are never referenced in this file; they will trigger compiler warnings.

-import ai.chronon.planner.{Node, NodeContent, SourceWithFilterNode}
+// no planner-side classes referenced here
api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (1)

6-6: Drop the unused Try import.

It isn’t referenced anywhere in this class.

api/src/main/scala/ai/chronon/api/planner/Planner.scala (1)

15-25: Shadowed type parameter can confuse readers.

toNode introduces a new T that hides the outer one. Consider renaming, e.g. N <: TBase[_, _], to avoid mental friction.

api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala (2)

74-77: Dead local content.

val content = new NodeContent() is created but never used; delete to reduce noise.


255-261: Duplicate unsetNestedMetadata logic.

The same helper exists inside the class; keep one definition to avoid divergence.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 0c7e5d2 and 6e4eba1.

📒 Files selected for processing (11)
  • api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala (0 hunks)
  • api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala (8 hunks)
  • api/src/main/scala/ai/chronon/api/planner/LocalRunner.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/planner/Planner.scala (1 hunks)
  • api/thrift/planner.thrift (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (3 hunks)
💤 Files with no reviewable changes (1)
  • api/src/main/scala/ai/chronon/api/planner/GroupByOfflinePlanner.scala
✅ Files skipped from review due to trivial changes (1)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
🚧 Files skipped from review as they are similar to previous changes (5)
  • spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
  • spark/src/main/scala/ai/chronon/spark/Join.scala
  • spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala
  • api/thrift/planner.thrift
🧰 Additional context used
🧬 Code Graph Analysis (1)
api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala (5)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (1)
  • foreach (13-21)
api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (2)
  • TableDependencies (7-180)
  • fromSource (101-148)
api/src/main/scala/ai/chronon/api/planner/Planner.scala (1)
  • toNode (15-25)
api/src/main/scala/ai/chronon/api/planner/MetaDataUtils.scala (2)
  • MetaDataUtils (28-105)
  • layer (30-67)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
  • outputTable (160-160)
⏰ Context from checks skipped due to timeout of 90000ms (29)
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: online_tests
  • GitHub Check: spark_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: flink_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: spark_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: api_tests
  • GitHub Check: batch_tests
  • GitHub Check: flink_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: aggregator_tests
🔇 Additional comments (2)
api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (1)

43-44: buildPlan still a stub.

Leaving ??? will blow up at runtime. Either implement or throw a clear UnsupportedOperationException with context.

api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala (1)

246-250: buildPlan incomplete.

result.setNodes() without arguments is invalid; flesh this out or throw a clear exception.

Comment on lines +39 to +41
def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

streamingNode is empty – code will not compile.

The map body is missing; the method currently returns Unit, violating its declared type Option[Node].

-  def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>
-
-  }
+  // TODO: flesh out streaming node generation
+  def streamingNode: Option[Node] = None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def streamingNode: Option[Node] = groupBy.streamingSource.map { _ =>
}
// TODO: flesh out streaming node generation
def streamingNode: Option[Node] = None
🤖 Prompt for AI Agents
In api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala at lines 39 to
41, the streamingNode method uses map on streamingSource but the map body is
empty, causing it to return Unit instead of Option[Node]. To fix this, implement
the map function to return a valid Node instance based on the streamingSource
element, ensuring the method returns an Option[Node] as declared.

Comment on lines +241 to 244
override def onlineNodes: Seq[Node] = {
// depends on all
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

onlineNodes has no implementation – breaks compilation.

It currently returns Unit, not Seq[Node].

-  override def onlineNodes: Seq[Node] = {
-    // depends on all
-  }
+  override def onlineNodes: Seq[Node] = Seq.empty // TODO implement
🤖 Prompt for AI Agents
In api/src/main/scala/ai/chronon/api/planner/JoinPlanner.scala around lines 241
to 244, the method onlineNodes is declared to return Seq[Node] but currently has
no implementation and returns Unit, causing a compilation error. Implement the
method to return a valid Seq[Node] as expected, either by providing the actual
logic to retrieve online nodes or returning an appropriate placeholder such as
an empty sequence to satisfy the return type.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants