Skip to content

Simple LabelJoin flow #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 2, 2025
Merged

Simple LabelJoin flow #546

merged 14 commits into from
Apr 2, 2025

Conversation

varant-zlai
Copy link
Collaborator

@varant-zlai varant-zlai commented Mar 27, 2025

Summary

TODO: Unit tests

Implements the simple label join logic to create a materialized table by joining forward looking partitions from the snapshot table back of labelJoinParts back to join output.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced an updated mechanism for formatting and standardizing label outputs with the addition of outputLabelTableV2.
    • Added a new distributed label join operation with the LabelJoinV2 class, featuring robust validations, comprehensive error handling, and detailed logging for improved data integration.
    • Implemented a comprehensive test suite for the LabelJoinV2 functionality to ensure accuracy and reliability of label joins.
  • Updates

    • Replaced the existing LabelJoin class with the new LabelJoinV2 class, enhancing the label join process.

Copy link

coderabbitai bot commented Mar 27, 2025

Walkthrough

This pull request introduces a new method outputLabelTableV2 in the MetadataOps implicit class within the Extensions object, designed to format output table names. Additionally, a new file LabelJoinV2.scala is added, containing the LabelJoinV2 class, which implements the logic for executing label joins in a Spark environment, including configuration validation and DataFrame operations. A corresponding test suite is also created to ensure the functionality works as intended.

Changes

File Summary
api/.../Extensions.scala Added outputLabelTableV2 in MetadataOps to construct output table names by combining outputNamespace and cleanName with _with_labels suffix.
spark/.../LabelJoinV2.scala Added LabelJoinV2 class for label join operations in Spark. Implements runAssertions(), compute(), and joinWithLeft() for validation, logging, and DataFrame joins.
spark/.../test/join/LabelJoinV2Test.scala Added LabelJoinV2Test class for testing label join functionality. Includes tests for join operations and a method createTestLabelJoin for setup.
spark/.../Driver.scala Updated LabelJoin to LabelJoinV2 and changed method from computeLabelJoin() to compute().

Suggested reviewers

  • nikhil-zlai
  • piyush-zlai

Poem

In code we join with spark and flair,
New methods cast magic in the air.
A label here, a join so tight,
Our DataFrames dance through day and night.
Cheers to code that’s light and fair!
🚀✨

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)

52-60: Suggest skipping final table scan.
You might reuse joinedDf instead of scanning the table again, improving performance.

-    def finalResult = tableUtils.scanDf(null, outputLabelTable, range = Some(leftDsToRange))
+    val finalResult = joinedDf

174-208: Add unit tests.
Tests for this join and renaming logic are missing. Comprehensive coverage would boost confidence.

Would you like help writing these tests?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 4687bc4 and 1f22176.

📒 Files selected for processing (2)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (16)
  • Extensions (43-1254)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • dataModel (322-327)
  • dataModel (458-465)
  • setups (479-487)
  • setups (781-785)
  • setups (1047-1051)
  • outputColumnName (194-196)
  • outputTable (121-121)
  • keys (537-547)
  • keys (810-823)
  • rightToLeft (748-758)
  • valueColumns (517-537)
  • valueColumns (746-746)
  • pretty (1092-1097)
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)

125-127: New method aligns well with existing naming.

spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)

113-172: Logic is clear and robust.
No immediate issues, though support for multiple windows is currently unsupported.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)

50-50: Use a config or constant.
Hardcoding stepDays=1 may hamper flexibility if requirements change.


135-150: Enhance error message.
Improve guidance on how to produce missing partitions for clarity.


167-201: Watch join performance.
Multiple left outer joins may degrade performance with large datasets. Consider optimizing or limiting concurrency.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 1f22176 and 0172246.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (16)
  • Extensions (43-1254)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • dataModel (322-327)
  • dataModel (458-465)
  • setups (479-487)
  • setups (781-785)
  • setups (1047-1051)
  • outputColumnName (194-196)
  • outputTable (121-121)
  • keys (537-547)
  • keys (810-823)
  • rightToLeft (748-758)
  • valueColumns (517-537)
  • valueColumns (746-746)
  • pretty (1092-1097)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)

89-89: Verify potential injection risk.
tableUtils.sql is executed with dynamic input. Confirm it's sanitized or safe from undesired queries.

s"Multiple aggregations not yet supported for label join ${jp.groupBy.metaData.name}")

assert(
Option(jp.groupBy.aggregations.get(0).windows).get.size() == 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill this?

@@ -0,0 +1,202 @@
/*
* Copyright (C) 2023 The Chronon Authors.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not Apache 2.0 licensed anymore right?

.map(_.asScala.toMap)
.getOrElse(Map.empty[String, String])

private val stepDays = 1 // Always use 1 for step days to avoid shuffle
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we just let this flow through from config for now, we can decide whether to promote it to source default later . Or are you using this to test?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, we actually want to set the following as a table property in order to actually not shuffle: https://github.com/zipline-ai/chronon/pull/531/files#diff-041194356b02fb92c7ef95d7c0a83513a8639f7c9b6987b551a9d080ca7dd662L378-L379

val joinedDf = computeLabelJoin(leftDf, leftDsToRange)
joinedDf.save(outputLabelTable,
confTableProps,
Seq(Constants.LabelPartitionColumn, tableUtils.partitionColumn),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering determines the partition hierarchy here - wouldn't we want to partition on the LHS partition column first?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)

59-83: 🛠️ Refactor suggestion

Add error handling for compute operation.

The method should handle exceptions when scanning dataframes or saving results.

 def compute(): DataFrame = {
+    try {
       runAssertions()
 
       Option(labelJoinConf.setups).foreach(_.foreach(tableUtils.sql))
 
       logger.info(s"Label join to fill $joinOutputDsToLabel.")
 
       val leftDsToRange = PartitionRange(joinOutputDsToLabel, joinOutputDsToLabel)
 
       def finalResult = tableUtils.scanDf(null, outputLabelTable, range = Some(leftDsToRange))
 
       val startMillis = System.currentTimeMillis()
 
       val joinBaseDf = tableUtils.scanDf(null, joinConf.metaData.outputTable, range = Some(leftDsToRange))
 
       val joinedDf = computeLabelJoin(joinBaseDf, leftDsToRange)
       joinedDf.save(outputLabelTable, confTableProps, Seq(tableUtils.partitionColumn), true)
 
       val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000)
       metrics.gauge(Metrics.Name.LatencyMinutes, elapsedMins)
 
       logger.info(s"Wrote to table $outputLabelTable, into partitions: $leftDsToRange in $elapsedMins mins")
       finalResult
+    } catch {
+      case e: Exception =>
+        logger.error(s"Error computing label join: ${e.getMessage}", e)
+        metrics.counter(Metrics.Name.Errors, 1)
+        throw e
+    }
 }
🧹 Nitpick comments (10)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (5)

22-33: Implementation looks good but missing proper documentation.

Class could benefit from scaladoc to explain purpose and parameters.

+/**
+ * Implements a simple label join by joining forward-looking partitions from snapshot tables
+ * back to the join output of labelJoinParts.
+ *
+ * @param joinConf Join configuration containing left source and join parts
+ * @param tableUtils Utility for table operations
+ * @param joinOutputDsToLabel Output dataset label to join
+ */
 class LabelJoinV2(joinConf: api.Join, tableUtils: TableUtils, joinOutputDsToLabel: String) {

25-25: Strengthen assertion for namespace validation.

Current check only catches null/empty, not whitespace-only strings.

-  assert(Option(joinConf.metaData.outputNamespace).nonEmpty, "output namespace could not be empty or null")
+  assert(Option(joinConf.metaData.outputNamespace).exists(_.trim.nonEmpty), "output namespace could not be empty, null or whitespace")

78-78: Extract time conversion constant.

Replace magic number with named constant for readability.

+  private val MillisToMinutes = 60 * 1000
   val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000)

109-126: Improve error message for missing partitions.

Error message is detailed but could be more concise.

     if (missingPartitions.nonEmpty) {
       throw new RuntimeException(
-        s"""Missing following partitions from $snapshotTable: $missingPartitions
-           |
-           |When computing for $leftDsAsRange
-           |
-           |Label table contains the following windows: ${windowToOutputColumnName.keys.mkString(", ")} (days)
-           |
-           |So the required partitions to compute labels for $leftDsAsRange are: ${requiredPartitions.mkString(", ")}
-           |
-           |Found existing partitions in snapshot table: ${existingSnapshotPartitions.mkString(", ")}
-           |
-           |(Looking ahead in the snapshot table by the corresponding window length).
-           |
-           |you may need to run the snapshot job for the missing days.
-           |""".stripMargin
+        s"""Missing partitions in $snapshotTable: $missingPartitions
+           |Required for: $leftDsAsRange with windows: ${windowToOutputColumnName.keys.mkString(", ")} (days)
+           |Required partitions: ${requiredPartitions.mkString(", ")}
+           |Existing partitions: ${existingSnapshotPartitions.mkString(", ")}
+           |Run snapshot job for missing days.""".stripMargin
       )
     }

136-138: Add handling for empty DataFrames.

Consider validating that rightDfs isn't empty before folding.

+    if (rightDfs.isEmpty) {
+      logger.warn("No right DataFrames to join. Returning base DataFrame.")
+      return joinBaseDf
+    }
     val joined = rightDfs.foldLeft(joinBaseDf) { (left, right) =>
       joinWithLeft(left, right, joinPart)
     }
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (5)

17-39: Well-structured test class setup, but could use documentation.

Add class-level scaladoc to explain test purpose.

+/**
+ * Test suite for LabelJoinV2 functionality.
+ * Tests label join operation with various data configurations and validates output.
+ */
 class LabelJoinV2Test extends AnyFlatSpec {

42-49: Extract schema definition to constants.

Move schema definitions to class-level constants for reusability.

+  // Schema definitions
+  private val ViewsSchema = List(
+    Column("user", api.StringType, 10000),
+    Column("item", api.StringType, 100),
+    Column("time_spent_ms", api.LongType, 5000)
+  )

   it should "label join v2" in {
-
-    val viewsSchema = List(
-      Column("user", api.StringType, 10000),
-      Column("item", api.StringType, 100),
-      Column("time_spent_ms", api.LongType, 5000)
-    )

     val viewsTable = s"$namespace.view_events"
-    DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable)
+    DataFrameGen.events(spark, ViewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable)

104-110: Implementation looks good, but add comments.

Add comments explaining the label join computation steps.

     // Now compute the snapshots for the label join
     GroupBy.computeBackfill(labelsGroupBy, today, tableUtils)
     tableUtils.sql(s"SELECT * FROM ${labelsGroupBy.metaData.outputTable}").show()
 
+    // Initialize LabelJoinV2 with the join configuration and compute the label join
     // Now compute the label join for forty days ago
     val labelJoin = new LabelJoinV2(joinConf, tableUtils, fortyDaysAgo)
     val result = labelJoin.compute()

113-130: Refactor SQL string construction for maintainability.

Long SQL strings are hard to maintain. Extract parts to variables.

+    // SQL query parts
+    val joinTableQuery = s"SELECT * FROM $joinOutputTable WHERE ds = \"$fortyDaysAgo\""
+    val aggregationQuery = s"""
+      SELECT
+        item, SUM(time_spent_ms) as time_spent_ms_sum_7d
+      FROM
+        $viewsTable
+      WHERE
+        ds BETWEEN $thirtyThreeDaysAgo AND $thirtyNineDaysAgo
+      GROUP BY
+        item
+      """
+
     val expected =
       s"""
-         | SELECT j.*, gb.time_spent_ms_sum_7d FROM
-         | (SELECT * FROM $joinOutputTable WHERE ds = "$fortyDaysAgo") as j
-         | JOIN
-         | (
-         |  SELECT
-         |    item, SUM(time_spent_ms) as time_spent_ms_sum_7d
-         |  FROM
-         |    $viewsTable
-         |  WHERE
-         |    ds BETWEEN $thirtyThreeDaysAgo AND $thirtyNineDaysAgo
-         |  GROUP BY
-         |    item
-         | ) as gb
-         | on j.item = gb.item
+         | SELECT j.*, gb.time_spent_ms_sum_7d FROM
+         | ($joinTableQuery) as j
+         | JOIN
+         | ($aggregationQuery) as gb
+         | on j.item = gb.item
          |""".stripMargin

144-149: Method looks good, but could benefit from documentation.

Add method documentation to explain purpose and parameters.

+  /**
+   * Creates a test label join configuration from a sequence of GroupBy configurations.
+   * @param groupBys Sequence of GroupBy configurations to include in the label join
+   * @return LabelParts configuration for testing
+   */
   def createTestLabelJoin(groupBys: Seq[ai.chronon.api.GroupBy]): ai.chronon.api.LabelParts = {
     val labelJoinParts = groupBys.map(gb => Builders.JoinPart(groupBy = gb)).toList
     Builders.LabelPart(
       labels = labelJoinParts
     )
   }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 0172246 and 90ac6a6.

📒 Files selected for processing (2)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (59-83)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (15)
  • Extensions (43-1254)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • dataModel (322-327)
  • dataModel (458-465)
  • setups (479-487)
  • setups (781-785)
  • setups (1047-1051)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (810-823)
  • rightToLeft (748-758)
  • valueColumns (517-537)
  • valueColumns (746-746)
  • pretty (1092-1097)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows

Comment on lines 85 to 181
def computeLabelJoin(joinBaseDf: DataFrame, leftDsAsRange: PartitionRange): DataFrame = {
// TODO -- support multiple (going to change API first)
val labelJoinPart = labelJoinConf.labels.asScala.head

// First get a map of window to output column name for the labels within that window
val windowToOutputColumnName = labelJoinPart.groupBy.aggregations.asScala
.flatMap { agg =>
agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w =>
// TODO -- support buckets
val aggPart = AggregationPart(agg.operation, agg.inputColumn, w)
(w.length, aggPart.outputColumnName)
}
}
.groupBy(_._1)
.mapValues(_.map(_._2))
.toMap

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Address TODO comments before finalizing.

Multiple TODO comments indicate incomplete functionality.


🏁 Script executed:

#!/bin/bash
# Check for other TODOs in the codebase related to LabelJoin
rg -A 2 "TODO.*(?i)labeljoin" --type scala

Length of output: 44


Action Required: Resolve TODOs in LabelJoinV2.scala

The file still has two TODO comments (for supporting multiple labels and buckets) that need to be addressed before finalizing the PR.

  • Verify if these are still pending features.
  • Remove or implement the TODO sections as appropriate.

@varant-zlai varant-zlai changed the title WIP -- Simple LabelJoin flow Simple LabelJoin flow Mar 27, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)

17-39: Consider organizing date variables for clarity.

The date variables are many and scattered. Consider grouping them into a DateFixtures object.


24-24: Unused variable.

labelDS is defined but never used.


40-105: Test cleanup missing.

Test creates tables but doesn't drop them after completion.

Add cleanup code after assertions to drop created tables.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 90ac6a6 and e2e61ee.

📒 Files selected for processing (1)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (59-83)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (6)

1-16: Imports look good.

All necessary imports are present.


40-134: Missing test coverage for edge cases.

Only testing happy path. Add tests for error conditions.


109-111: LGTM - Core test logic.

This correctly tests the LabelJoinV2.compute() functionality.


114-122: SQL query verification approach is solid.

Using SQL for expected results is a good approach.


127-133: Good debugging output.

Helpful diagnostic information when tests fail.


136-141: Utility method isn't used.

createTestLabelJoin is defined but not used in this test.

Is this intended for future tests? If not, consider removing it.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)

24-24: Parameterize date to improve test reusability.

You could avoid a hard-coded date for flexible test scenarios:

- private val labelDS = "2022-10-30"
+ private def labelDS = java.time.LocalDate.now().toString
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)

41-42: Consider supporting multiple label joins.


85-87: Implement or remove TODO for multiple label joins.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between e2e61ee and f4e45ef.

📒 Files selected for processing (3)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/src/main/scala/ai/chronon/api/Extensions.scala
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (15)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • dataModel (322-327)
  • dataModel (458-465)
  • setups (479-487)
  • setups (779-783)
  • setups (1045-1049)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (59-83)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)

40-42: Add tests for edge cases.


136-141: Helper function looks good.

spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)

25-25: Good defensive check for the output namespace.


73-76: Add error handling for scanning and saving steps.


val leftDsToRange = PartitionRange(joinOutputDsToLabel, joinOutputDsToLabel)

def finalResult = tableUtils.scanDf(null, outputLabelTable, range = Some(leftDsToRange))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe just get rid of the function here and call scanDf at the end? For readability.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)

38-49: Optimize schema retrieval.
Fetching schemas repeatedly can be costly. Consider collecting them once per table for efficiency.


81-81: Address the TODO.
Complete or remove the "TODO -- support buckets" if no longer needed.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 961b1b2 and f5a1d0f.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (14)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • outputTable (121-121)
  • dataModel (322-327)
  • dataModel (458-465)
  • team (993-993)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (14)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (4)

1-25: Looks fine.


50-69: Assertion checks are good.


101-186: Wrap save logic with fallback.
Consider error handling if .save() fails, preventing partial data writes.


188-221: Looks correct.
Verify column type alignment if sources differ.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)

38-48: Add method documentation.

Add Scaladoc explaining the purpose and return values.


71-99: Complex transformation chain needs refactoring.

Break this method into smaller steps for better readability.

def getWindowToLabelOutputInfos() = {
  // Create a map of window to LabelOutputInfo
  // Each window could be shared across multiple labelJoinParts
  val labelJoinParts = labelJoinConf.labels.asScala
  val labelDsAsRange = PartitionRange(labelDs, labelDs)
  
+  // Step 1: Extract window, output column, and join date information for each aggregation
+  val windowInfoByLabelJoinPart = labelJoinParts.map { labelJoinPart =>
+    val windowInfo = labelJoinPart.groupBy.aggregations.asScala
+      .flatMap { agg =>
+        agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w =>
+          // TODO -- support buckets
+          val aggPart = AggregationPart(agg.operation, agg.inputColumn, w)
+          val joinDs = labelDsAsRange.shift(w.length * -1)
+          (w.length, aggPart.outputColumnName, joinDs)
+        }
+      }
+      .groupBy(_._1)
+      .mapValues(_.map(_._2))
+      .mapValues((labelJoinPart, _))
+    windowInfo
+  }
+  
+  // Step 2: Combine all window info across label join parts
+  val flattenedWindowInfo = windowInfoByLabelJoinPart.flatMap(_.toList)
+  
+  // Step 3: Group by window length and create final output info
+  flattenedWindowInfo
+    .groupBy(_._1)
+    .mapValues(_.map(_._2))
+    .map { case (window, labelJoinPartAndOutputCols) =>
+      val labelInfos = labelJoinPartAndOutputCols.map(info => LabelPartOutputInfo(info._1, info._2))
+      val joinPartitionDsAsRange = labelDsAsRange.shift(window * -1)
+      window -> LabelOutputInfo(joinPartitionDsAsRange, labelInfos)
+    }
-  labelJoinParts
-    .map { labelJoinPart =>
-      labelJoinPart.groupBy.aggregations.asScala
-        .flatMap { agg =>
-          agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w =>
-            // TODO -- support buckets
-            val aggPart = AggregationPart(agg.operation, agg.inputColumn, w)
-            val joinDs = labelDsAsRange.shift(w.length * -1)
-            (w.length, aggPart.outputColumnName, joinDs)
-          }
-        }
-        .groupBy(_._1)
-        .mapValues(_.map(_._2)) // Drop the window (it's the key)
-        .mapValues((labelJoinPart, _)) // add the labelJoinPart
-    }
-    .flatMap(_.toList)
-    .groupBy(_._1) // Flatten map and combine into one map with window as key and all labelJoinParts as value
-    .mapValues(_.map(_._2)) // Drop the duplicate window
-    .map { case (window, labelJoinPartAndOutputCols) =>
-      val labelInfos = labelJoinPartAndOutputCols.map(info => LabelPartOutputInfo(info._1, info._2))
-      val joinPartitionDsAsRange = labelDsAsRange.shift(window * -1)
-      window -> LabelOutputInfo(joinPartitionDsAsRange, labelInfos)
-    }
}

101-209: Refactor long compute method.

Extract logical sections into separate methods for better maintainability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between f5a1d0f and 8e3e437.

📒 Files selected for processing (2)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • outputTable (121-121)
  • dataModel (322-327)
  • dataModel (458-465)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (101-209)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: join_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)

39-138: Missing test coverage for edge cases.

Current test only covers happy path. Add tests for error scenarios.


140-145: LGTM! Helper method implementation looks good.

Clean implementation for creating test label joins from GroupBy configurations.

spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (4)

26-37: Initialization looks good.

Constructor properly validates namespace and sets up necessary components.


219-223: Use Set for efficient filtering of non-value columns.

Replace array contains check with Set membership for better performance.

-val nonValueColumns = joinPart.rightToLeft.keys.toArray ++ Array(Constants.TimeColumn,
-                                                               tableUtils.partitionColumn,
-                                                               Constants.TimePartitionColumn,
-                                                               Constants.LabelPartitionColumn)
-val valueColumns = rightDf.schema.names.filterNot(nonValueColumns.contains)
+val nonValueColumns = (joinPart.rightToLeft.keys ++ Set(
+  Constants.TimeColumn,
+  tableUtils.partitionColumn,
+  Constants.TimePartitionColumn,
+  Constants.LabelPartitionColumn
+)).toSet
+val valueColumns = rightDf.schema.names.filterNot(nonValueColumns)

112-142: Good error handling for missing partitions.

Excellent logging and error handling for missing partition scenarios.


187-193: Good metrics tracking implementation.

Proper performance monitoring with detailed logging.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)

45-49: Consolidate repeated schema definitions.
You define similar schemas multiple times. Factor them out to a helper method for cleaner code.

Also applies to: 145-149


95-101: Avoid duplicating joinConf setup.
Use a shared method to build the join configuration to reduce repetition.

Also applies to: 208-213


102-116: Consider empty result set test.
Try verifying behavior when the snapshot or item queries table is empty.

spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)

37-37: Fix variable name typo.
Rename lableColumnPrefix to labelColumnPrefix for clarity.

-  private val lableColumnPrefix = "label_"
+  private val labelColumnPrefix = "label_"

82-82: Resolve remaining TODO.
"Support buckets" is still pending. Clarify whether to implement or remove the placeholder.

Do you want me to open an issue to track adding bucket support?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 8e3e437 and 60f99da.

📒 Files selected for processing (2)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • outputTable (121-121)
  • dataModel (322-327)
  • dataModel (458-465)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (102-224)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)

43-69: Add negative test paths.
Coverage is limited to success scenarios. Missing partition handling isn't tested.


143-179: Well done handling multiple label parts.
Consider adding edge-case tests for zero or null partitions to ensure resilience.

spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)

234-244: Validate overwriting columns carefully.
Dropping and re-adding columns may cause unexpected issues if columns were previously renamed or typed differently.

@@ -122,6 +122,8 @@ object Extensions {
def outputLabelTable: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labels"
def outputFinalView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled"
def outputLatestLabelView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled_latest"
def outputLabelTableV2: String =
s"${metaData.outputNamespace}.${metaData.cleanName}_with_labels" // Used for the LabelJoinV2 flow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets call this "_labels" and the other one "_labels_legacy"

args.joinConf,
tableUtils,
args.endDate()
)
labelJoin.computeLabelJoin(args.stepDays.toOption)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cuz we need step days only on the joinpart and on the actual label join job - we always step days = 1?

import scala.collection.Seq

case class LabelPartOutputInfo(labelPart: JoinPart, outputColumnNames: Seq[String])
case class LabelOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indicate that this is all the label columns / plural

Suggested change
case class LabelOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo])
case class AllLabelsOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo])

Comment on lines 85 to 95
(w.length, aggPart.outputColumnName, joinDs)
}
}
.groupBy(_._1)
.mapValues(_.map(_._2)) // Drop the window (it's the key)
.mapValues((labelJoinPart, _)) // add the labelJoinPart
}
.flatMap(_.toList)
.groupBy(_._1) // Flatten map and combine into one map with window as key
.mapValues(_.map(_._2)) // Drop the duplicate window
.map { case (window, labelJoinPartAndOutputCols) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simplify this actually? the inner grouping is probably not necessary, and you could probably directly compute LabelPartOutputInfo right here in the inner loop.


// Each unique window is an output partition in the joined table
// Each window may contain a subset of the joinParts and their columns
validWindowToOutputs.foreach { case (windowLength, joinOutputInfo) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: split the inner blocks into separate functions

}
}

def compute(): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is pretty large - can we split this actually?

Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets make the logic easy to follow - added few readability related comments.

import scala.collection.Seq

case class LabelPartOutputInfo(labelPart: JoinPart, outputColumnNames: Seq[String])
case class LabelOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

camelCase on the LabelpartOutputInfos

labelJoinPart.groupBy.aggregations.asScala
.flatMap { agg =>
agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w =>
// TODO -- support buckets
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do an assertion here in the meantime?

private val tableName = "test_label_join"
private val labelDS = "2022-10-30"
private val tableUtils = TableTestUtils(spark)
tableUtils.createDatabase(namespace)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this to the end of the variable declarations to separate the different steps.

private val confTableProps = Option(joinConf.metaData.tableProperties)
.map(_.asScala.toMap)
.getOrElse(Map.empty[String, String])
private val lableColumnPrefix = "label_"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about double underscore to make clear this is engine-generated? If this is convention already elsewhere no worries.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (5)

15-15: Remove unused import.
BloomFilter is unused. Consider removing.


37-37: Rename for clarity.
Replace lableColumnPrefix with labelColumnPrefix.


83-83: Implement or remove TODO.
Bucket support is incomplete.

Should I open a new issue to track it?


146-152: Handle partial partitions.
Consider fallback logic for partially missing partitions.


186-201: Consolidate partition logic.
Combining branches for base join may simplify maintenance.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 60f99da and 8859268.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • outputTable (121-121)
  • dataModel (322-327)
  • dataModel (458-465)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)

52-71: Add test coverage.
Ensure runAssertions() is validated by unit tests.


230-263: Re-check column collisions.
Confirm columns don’t conflict after key renaming and prefixing.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)

37-37: Typo in prefix variable
Rename lableColumnPrefix to labelColumnPrefix for clarity.

-  private val lableColumnPrefix = "label_"
+  private val labelColumnPrefix = "label_"

83-84: TODO bucket support
Would you like help to implement or remove the placeholder?


218-220: Consider broadcast or filter
Chaining left-outer joins on large data can be costly. Look into Spark broadcast or bloom filters for efficiency.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 8859268 and 1e4fba4.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • outputTable (121-121)
  • dataModel (322-327)
  • dataModel (458-465)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)

1-266: Add a dedicated test suite
No unit tests found here. Kindly add a suite to validate each method’s logic.


213-214: Confirm partition range for snapshots
You're scanning the right DF with labelDsAsRange but left DF with a shifted range. Verify this alignment.


239-243: Use Set for non-value columns
Optimize membership checks as previously suggested.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)

20-20: Move Spark init after variable declarations.
Keeps initialization steps grouped together.


299-371: Consider a partition gap test.
Verifies progression when some windows are missing.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 1e4fba4 and bb05257.

📒 Files selected for processing (1)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (103-170)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)

37-99: Add edge-case tests for missing partitions & empty dataframes.
Current coverage focuses on standard flows; negative scenarios remain untested.


137-194: Nice coverage of multiple label parts.
This addresses the prior request for multi-part testing.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (7)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (7)

37-37: Fix typo in variable name.

lableColumnPrefix should be labelColumnPrefix.

-  private val lableColumnPrefix = "label_"
+  private val labelColumnPrefix = "label_"

98-99: Document the shift operation purpose.

Add comment explaining why backward shift by window length is necessary.

-        // The labelDs is a lookback from the labelSnapshot partition back to the join output table
+        // We shift backward by window length because we need to find the join output partition 
+        // that corresponds to the start of the window period relative to our label date

104-171: Decompose large compute method.

Break down into smaller, focused functions for improved readability.

Extract validation, partition checking, and dataframe union into separate methods.


240-244: Improve non-value columns filtering.

Use Set for more efficient membership testing.

-    val nonValueColumns = joinPart.rightToLeft.keys.toArray ++ Array(Constants.TimeColumn,
-                                                                     tableUtils.partitionColumn,
-                                                                     Constants.TimePartitionColumn,
-                                                                     Constants.LabelPartitionColumn)
-    val valueColumns = rightDf.schema.names.filterNot(nonValueColumns.contains)
+    val nonValueColumns = (joinPart.rightToLeft.keys ++ Set(
+      Constants.TimeColumn,
+      tableUtils.partitionColumn,
+      Constants.TimePartitionColumn,
+      Constants.LabelPartitionColumn
+    )).toSet
+    val valueColumns = rightDf.schema.names.filterNot(nonValueColumns)

248-248: Simplify column dropping.

Use more concise approach for dropping multiple columns.

-    val cleanLeftDf = valueColumns.foldLeft(leftDf)((df, colName) => df.drop(s"${lableColumnPrefix}_$colName"))
+    val columnsToRemove = valueColumns.map(colName => s"${labelColumnPrefix}_$colName")
+    val cleanLeftDf = leftDf.drop(columnsToRemove:_*)

167-170: Handle empty dataframe sequences.

Add check before union operation to avoid potential issues.

     if (allOutputDfs.length == 1) {
       allOutputDfs.head
+    } else if (allOutputDfs.isEmpty) {
+      throw new IllegalStateException("No output dataframes were generated")
     } else {
       allOutputDfs.reduce(_ union _)
     }

263-264: Add error handling for join operation.

Consider adding try-catch block for join errors.

Wrap join with error handling to provide better diagnostics when join fails.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between bb05257 and bec5f70.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • outputTable (121-121)
  • dataModel (322-327)
  • dataModel (458-465)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows

// TODO -- support buckets
assert(Option(agg.buckets).isEmpty, "Buckets as labels are not yet supported in LabelJoinV2")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add implementation plan for bucket support.

The TODO comment lacks context about implementation timeline.


🏁 Script executed:

#!/bin/bash
# Check other TODOs for bucket implementation
rg "TODO.*bucket" --type scala

Length of output: 461


Bucket support: Implementation plan required
The TODO in spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (lines 83-84) is too terse. Update it with a brief plan and timeline for bucket support.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)

17-36: Class initialization is correct.

Consistent use of SparkSession and initialization logic. Consider adding test-level teardown if needed for cleanup.


37-135: Consider edge-case tests.

Tests appear robust. Add negative scenarios (e.g., no matching partitions or empty input data) to ensure failure paths are covered.

spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)

17-39: Check for bucket support implementation.

A TODO indicates missing bucket support. If it's planned soon, consider clarifying the timeline or removing the TODO.


40-70: Schema extraction might be optimized.

Repeatedly scanning schemas can be costlier on large tables. Consider caching schemas if performance becomes an issue.


72-102: Window mapping logic is clear.

Mapping windows to output info is straightforward. Possibly split large functions for easier maintainability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between bec5f70 and d6a35b9.

📒 Files selected for processing (4)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • api/src/main/scala/ai/chronon/api/Extensions.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (103-169)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
  • Extensions (43-1252)
  • outputLabelTable (122-122)
  • outputLabelTableV2 (125-127)
  • outputTable (121-121)
  • dataModel (322-327)
  • dataModel (458-465)
  • outputColumnName (194-196)
  • keys (537-547)
  • keys (808-821)
  • rightToLeft (746-756)
  • valueColumns (517-537)
  • valueColumns (744-744)
  • pretty (1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (4)

1-16: Imports look fine.

All relevant dependencies are properly referenced and there's no redundancy.


137-196: Good handling of multiple label parts.

Ensures coverage for multi-window scenarios. Possibly add a test for single aggregator with multiple columns.


197-294: Union logic is clear.

Combining partial results into the final DataFrame is well-structured. No immediate issues found.


295-374: Final checks on label join consistency.

Verifies updated columns for second run. This ensures incremental labeling. Nice coverage.

spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)

1-16: Imports and definitions look correct.

Context classes and logger usage are consistent with project style.


51-70: Assertion checks are good.

Ensures correct data model usage and daily windows. All critical validations are present.


170-265: Overall flow is coherent.

Method names and logging clarify the label join steps. Implementation aligns with best practices.

Comment on lines +103 to +169
def compute(): DataFrame = {
logger.info(s"Running LabelJoinV2 for $labelDs")

runAssertions()

// First get a map of window to LabelOutputInfo
val windowToLabelOutputInfos = getWindowToLabelOutputInfos

// Find existing partition in the join table
val joinTable = joinConf.metaData.outputTable
val existingJoinPartitions = tableUtils.partitions(joinTable)

// Split the windows into two groups, one that has a corresponding partition in the join table and one that doesn't
// If a partition is missing, we can't compute the labels for that window, but the job will proceed with the rest
val (computableWindowToOutputs, missingWindowToOutputs) = windowToLabelOutputInfos.partition {
case (_, labelOutputInfo) =>
existingJoinPartitions.contains(labelOutputInfo.joinDsAsRange.start)
}

if (missingWindowToOutputs.nonEmpty) {
logger.info(
s"""Missing following partitions from $joinTable: ${missingWindowToOutputs.values
.map(_.joinDsAsRange.start)
.mkString(", ")}
|
|Found existing partitions: ${existingJoinPartitions.mkString(", ")}
|
|Therefore unable to compute the labels for ${missingWindowToOutputs.keys.mkString(", ")}
|
|For requested ds: $labelDs
|
|Proceeding with valid windows: ${computableWindowToOutputs.keys.mkString(", ")}
|
|""".stripMargin
)

require(
computableWindowToOutputs.isEmpty,
"No valid windows to compute labels for given the existing join output range." +
s"Consider backfilling the join output table for the following days: ${missingWindowToOutputs.values.map(_.joinDsAsRange.start)}."
)
}

// Find existing partition in the outputLabelTable (different from the join output table used above)
// This is used below in computing baseJoinDf
val existingLabelTableOutputPartitions = tableUtils.partitions(outputLabelTable)
logger.info(s"Found existing partitions in Label Table: ${existingLabelTableOutputPartitions.mkString(", ")}")

// Each unique window is an output partition in the joined table
// Each window may contain a subset of the joinParts and their columns
computableWindowToOutputs.foreach { case (windowLength, joinOutputInfo) =>
computeOutputForWindow(windowLength, joinOutputInfo, existingLabelTableOutputPartitions, windowToLabelOutputInfos)
}

val allOutputDfs = computableWindowToOutputs.values
.map(_.joinDsAsRange)
.map { range =>
tableUtils.scanDf(null, outputLabelTable, range = Some(range))
}
.toSeq

if (allOutputDfs.length == 1) {
allOutputDfs.head
} else {
allOutputDfs.reduce(_ union _)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

compute() is quite long.

Splitting into smaller methods would improve readability. Consider factoring out partition checks and DataFrame merges.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)

37-74: Consider edge-case tests.
These lines only test normal usage. Add more tests for missing partitions, empty DataFrames, or incomplete data to improve reliability.


137-294: Good coverage of multiple windows.
This validates various window aggregations. Consider verifying performance with large data.


299-303: Extend incremental scenario checks.
We see a second run. Testing for partial partitions or no new partitions would be helpful.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between d6a35b9 and eba4664.

📒 Files selected for processing (1)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (103-169)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)

76-78: Keep labelParts flexible.
Multiple label parts can be tested in the same spec for clarity, but the second test covers it too.


371-372: Overall nicely structured tests!
All checks and comparisons look consistent.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)

1-375: 🛠️ Refactor suggestion

Add test for error handling when no valid windows exist.

Missing tests for error conditions, such as when no valid windows are available for computation.

it should "throw an error when no valid windows exist" in {
  // Setup similar to other tests, but with partitions that don't exist in the join table
  
  // Use a partition that doesn't exist in the join table
  val nonExistentPartition = tableUtils.partitionSpec.minus(today, new Window(200, TimeUnit.DAYS))
  
  val labelJoin = new LabelJoinV2(joinConf, tableUtils, nonExistentPartition)
  
  // Expect an IllegalArgumentException when no valid windows are available
  val exception = intercept[IllegalArgumentException] {
    labelJoin.compute()
  }
  
  assert(exception.getMessage.contains("No valid windows to compute labels for"))
}
🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)

37-371: Consider extracting repeated test setup code.

Both tests contain significant duplicate setup code that could be extracted into helper methods.

// Add these helper methods to reduce duplication

+  private def createTestDataFrames(viewsTable: String, viewsSchema: List[Column], itemQueriesTable: String): Unit = {
+    // Create and save views data
+    DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable)
+    
+    // Create and save item queries data
+    val itemQueries = List(Column("item", api.StringType, 100))
+    DataFrameGen.events(spark, itemQueries, 2000, partitions = 100).save(itemQueriesTable)
+  }
+
+  private def createTestSources(viewsTable: String): Source = {
+    Builders.Source.events(
+      query = Builders.Query(selects = Builders.Selects("time_spent_ms"), startPartition = yearAgo),
+      table = viewsTable
+    )
+  }
+
+  private def createTestJoinConfig(viewsSource: Source, labelParts: LabelPart, itemQueriesTable: String, 
+                                 viewsGroupBy: GroupBy, testName: String): Join = {
+    val start = tableUtils.partitionSpec.minus(today, new Window(100, TimeUnit.DAYS))
+    Builders.Join(
+      left = Builders.Source.events(Builders.Query(startPartition = start), table = itemQueriesTable),
+      joinParts = Seq(Builders.JoinPart(groupBy = viewsGroupBy, prefix = "user")),
+      labelParts = labelParts,
+      metaData = Builders.MetaData(name = s"test.item_snapshot_features_$testName", namespace = namespace, team = "chronon")
+    )
+  }

231-279: Consider breaking down complex SQL into more readable parts.

The SQL query is quite complex with multiple UNIONs. Extract common parts or use string interpolation for readability.

+  private def createExpectedSql(
+    joinOutputTable: String, 
+    labelGbOutputTable: String, 
+    labelGbOutputTable2: String, 
+    fortyDaysAgo: String,
+    fortyThreeDaysAgo: String,
+    fortySevenDaysAgo: String,
+    thirtyThreeDaysAgo: String
+  ): String = {
+    val part1 = s"""
+      SELECT
+          j.*,
+          gb.time_spent_ms_sum_7d as label__time_spent_ms_sum_7d,
+          null as label__time_spent_ms_sum_10d,
+          gb2.time_spent_ms_max_7d as label__time_spent_ms_max_7d,
+          null as label__time_spent_ms_max_14d
+      FROM
+      (SELECT * FROM $joinOutputTable WHERE ds = "$fortyDaysAgo") as j
+      LEFT OUTER JOIN
+      (SELECT * FROM $labelGbOutputTable WHERE ds = "$thirtyThreeDaysAgo") as gb
+      on j.item = gb.item
+      LEFT OUTER JOIN
+      (SELECT * FROM $labelGbOutputTable2 WHERE ds = "$thirtyThreeDaysAgo") as gb2
+      on j.item = gb2.item
+    """
+    
+    // Similar for part2 and part3
+    
+    s"$part1 UNION $part2 UNION $part3"
+  }

105-108: Add assertions to verify specific column values.

Beyond checking for zero difference, add assertions that verify specific column values.

  val labelComputed = labelJoin.compute()
  println("Label computed::")
  labelComputed.show()
+
+ // Add assertions for specific column values
+ val firstRow = labelComputed.limit(1).collect().headOption
+ assert(firstRow.isDefined, "Expected at least one row in the result")
+ firstRow.foreach { row =>
+   assert(row.getAs[String]("item") != null, "Item column should not be null")
+   // Add more specific assertions for your expected label column values
+ }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between eba4664 and 2406274.

📒 Files selected for processing (4)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • api/src/main/scala/ai/chronon/api/Extensions.scala
  • spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
  • compute (103-169)
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
spark/src/main/scala/ai/chronon/spark/Driver.scala (2)

420-421: Updated instantiation to use LabelJoinV2.

The code now uses the new LabelJoinV2 class instead of LabelJoin.


425-425: Simplified method call with no parameters.

The compute() method no longer requires the stepDays parameter, simplifying the interface.

spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)

16-35: Setup looks good and covers needed test scenarios.

Test class initialization with necessary variables for date ranges.


37-135: First test case validates basic functionality.

The test for single label part and window correctly verifies base functionality through SQL comparison.


137-371: Comprehensive testing of advanced use case.

The second test effectively validates multiple label parts and windows scenario, including the handling of newer windows without losing previous data.

@varant-zlai varant-zlai merged commit c41466d into main Apr 2, 2025
19 checks passed
@varant-zlai varant-zlai deleted the vz--label_join_v2 branch April 2, 2025 20:04
@coderabbitai coderabbitai bot mentioned this pull request Apr 25, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

Implements the simple label join logic to create a materialized table by
joining forward looking partitions from the snapshot table back of
labelJoinParts back to join output.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an updated mechanism for formatting and standardizing label
outputs with the addition of `outputLabelTableV2`.
- Added a new distributed label join operation with the `LabelJoinV2`
class, featuring robust validations, comprehensive error handling, and
detailed logging for improved data integration.
- Implemented a comprehensive test suite for the `LabelJoinV2`
functionality to ensure accuracy and reliability of label joins.

- **Updates**
- Replaced the existing `LabelJoin` class with the new `LabelJoinV2`
class, enhancing the label join process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

Implements the simple label join logic to create a materialized table by
joining forward looking partitions from the snapshot table back of
labelJoinParts back to join output.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an updated mechanism for formatting and standardizing label
outputs with the addition of `outputLabelTableV2`.
- Added a new distributed label join operation with the `LabelJoinV2`
class, featuring robust validations, comprehensive error handling, and
detailed logging for improved data integration.
- Implemented a comprehensive test suite for the `LabelJoinV2`
functionality to ensure accuracy and reliability of label joins.

- **Updates**
- Replaced the existing `LabelJoin` class with the new `LabelJoinV2`
class, enhancing the label join process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Implements the simple label join logic to create a materialized table by
joining forward looking partitions from the snapshot table back of
labelJoinParts back to join output.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an updated mechanism for formatting and standardizing label
outputs with the addition of `outputLabelTableV2`.
- Added a new distributed label join operation with the `LabelJoinV2`
class, featuring robust validations, comprehensive error handling, and
detailed logging for improved data integration.
- Implemented a comprehensive test suite for the `LabelJoinV2`
functionality to ensure accuracy and reliability of label joins.

- **Updates**
- Replaced the existing `LabelJoin` class with the new `LabelJoinV2`
class, enhancing the label join process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Implements the simple label join logic to create a materialized table by
joining forward looking partitions from the snapshot table back of
labelJoinParts back to join output.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an updated mechanism for formatting and standardizing label
outputs with the addition of `outputLabelTableV2`.
- Added a new distributed label join operation with the `LabelJoinV2`
class, featuring robust validations, comprehensive error handling, and
detailed logging for improved data integration.
- Implemented a comprehensive test suite for the `LabelJoinV2`
functionality to ensure accuracy and reliability of label joins.

- **Updates**
- Replaced the existing `LabelJoin` class with the new `LabelJoinV2`
class, enhancing the label join process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

Implements the simple label join logic to create a materialized table by
joining forward looking partitions from the snapshot table baour clients of
labelJoinParts baour clients to join output.

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an updated mechanism for formatting and standardizing label
outputs with the addition of `outputLabelTableV2`.
- Added a new distributed label join operation with the `LabelJoinV2`
class, featuring robust validations, comprehensive error handling, and
detailed logging for improved data integration.
- Implemented a comprehensive test suite for the `LabelJoinV2`
functionality to ensure accuracy and reliability of label joins.

- **Updates**
- Replaced the existing `LabelJoin` class with the new `LabelJoinV2`
class, enhancing the label join process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants