-
Notifications
You must be signed in to change notification settings - Fork 0
Simple LabelJoin flow #546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces a new method Changes
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)
52-60
: Suggest skipping final table scan.
You might reuse joinedDf instead of scanning the table again, improving performance.- def finalResult = tableUtils.scanDf(null, outputLabelTable, range = Some(leftDsToRange)) + val finalResult = joinedDf
174-208
: Add unit tests.
Tests for this join and renaming logic are missing. Comprehensive coverage would boost confidence.Would you like help writing these tests?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
api/src/main/scala/ai/chronon/api/Extensions.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (16)
Extensions
(43-1254)outputLabelTable
(122-122)outputLabelTableV2
(125-127)dataModel
(322-327)dataModel
(458-465)setups
(479-487)setups
(781-785)setups
(1047-1051)outputColumnName
(194-196)outputTable
(121-121)keys
(537-547)keys
(810-823)rightToLeft
(748-758)valueColumns
(517-537)valueColumns
(746-746)pretty
(1092-1097)
⏰ Context from checks skipped due to timeout of 90000ms (15)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
125-127
: New method aligns well with existing naming.spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
113-172
: Logic is clear and robust.
No immediate issues, though support for multiple windows is currently unsupported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)
50-50
: Use a config or constant.
HardcodingstepDays=1
may hamper flexibility if requirements change.
135-150
: Enhance error message.
Improve guidance on how to produce missing partitions for clarity.
167-201
: Watch join performance.
Multiple left outer joins may degrade performance with large datasets. Consider optimizing or limiting concurrency.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (16)
Extensions
(43-1254)outputLabelTable
(122-122)outputLabelTableV2
(125-127)dataModel
(322-327)dataModel
(458-465)setups
(479-487)setups
(781-785)setups
(1047-1051)outputColumnName
(194-196)outputTable
(121-121)keys
(537-547)keys
(810-823)rightToLeft
(748-758)valueColumns
(517-537)valueColumns
(746-746)pretty
(1092-1097)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
89-89
: Verify potential injection risk.
tableUtils.sql
is executed with dynamic input. Confirm it's sanitized or safe from undesired queries.
s"Multiple aggregations not yet supported for label join ${jp.groupBy.metaData.name}") | ||
|
||
assert( | ||
Option(jp.groupBy.aggregations.get(0).windows).get.size() == 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kill this?
@@ -0,0 +1,202 @@ | |||
/* | |||
* Copyright (C) 2023 The Chronon Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not Apache 2.0 licensed anymore right?
.map(_.asScala.toMap) | ||
.getOrElse(Map.empty[String, String]) | ||
|
||
private val stepDays = 1 // Always use 1 for step days to avoid shuffle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we just let this flow through from config for now, we can decide whether to promote it to source default later . Or are you using this to test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, we actually want to set the following as a table property in order to actually not shuffle: https://github.com/zipline-ai/chronon/pull/531/files#diff-041194356b02fb92c7ef95d7c0a83513a8639f7c9b6987b551a9d080ca7dd662L378-L379
val joinedDf = computeLabelJoin(leftDf, leftDsToRange) | ||
joinedDf.save(outputLabelTable, | ||
confTableProps, | ||
Seq(Constants.LabelPartitionColumn, tableUtils.partitionColumn), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering determines the partition hierarchy here - wouldn't we want to partition on the LHS partition column first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
59-83
: 🛠️ Refactor suggestionAdd error handling for compute operation.
The method should handle exceptions when scanning dataframes or saving results.
def compute(): DataFrame = { + try { runAssertions() Option(labelJoinConf.setups).foreach(_.foreach(tableUtils.sql)) logger.info(s"Label join to fill $joinOutputDsToLabel.") val leftDsToRange = PartitionRange(joinOutputDsToLabel, joinOutputDsToLabel) def finalResult = tableUtils.scanDf(null, outputLabelTable, range = Some(leftDsToRange)) val startMillis = System.currentTimeMillis() val joinBaseDf = tableUtils.scanDf(null, joinConf.metaData.outputTable, range = Some(leftDsToRange)) val joinedDf = computeLabelJoin(joinBaseDf, leftDsToRange) joinedDf.save(outputLabelTable, confTableProps, Seq(tableUtils.partitionColumn), true) val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000) metrics.gauge(Metrics.Name.LatencyMinutes, elapsedMins) logger.info(s"Wrote to table $outputLabelTable, into partitions: $leftDsToRange in $elapsedMins mins") finalResult + } catch { + case e: Exception => + logger.error(s"Error computing label join: ${e.getMessage}", e) + metrics.counter(Metrics.Name.Errors, 1) + throw e + } }
🧹 Nitpick comments (10)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (5)
22-33
: Implementation looks good but missing proper documentation.Class could benefit from scaladoc to explain purpose and parameters.
+/** + * Implements a simple label join by joining forward-looking partitions from snapshot tables + * back to the join output of labelJoinParts. + * + * @param joinConf Join configuration containing left source and join parts + * @param tableUtils Utility for table operations + * @param joinOutputDsToLabel Output dataset label to join + */ class LabelJoinV2(joinConf: api.Join, tableUtils: TableUtils, joinOutputDsToLabel: String) {
25-25
: Strengthen assertion for namespace validation.Current check only catches null/empty, not whitespace-only strings.
- assert(Option(joinConf.metaData.outputNamespace).nonEmpty, "output namespace could not be empty or null") + assert(Option(joinConf.metaData.outputNamespace).exists(_.trim.nonEmpty), "output namespace could not be empty, null or whitespace")
78-78
: Extract time conversion constant.Replace magic number with named constant for readability.
+ private val MillisToMinutes = 60 * 1000 val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000)
109-126
: Improve error message for missing partitions.Error message is detailed but could be more concise.
if (missingPartitions.nonEmpty) { throw new RuntimeException( - s"""Missing following partitions from $snapshotTable: $missingPartitions - | - |When computing for $leftDsAsRange - | - |Label table contains the following windows: ${windowToOutputColumnName.keys.mkString(", ")} (days) - | - |So the required partitions to compute labels for $leftDsAsRange are: ${requiredPartitions.mkString(", ")} - | - |Found existing partitions in snapshot table: ${existingSnapshotPartitions.mkString(", ")} - | - |(Looking ahead in the snapshot table by the corresponding window length). - | - |you may need to run the snapshot job for the missing days. - |""".stripMargin + s"""Missing partitions in $snapshotTable: $missingPartitions + |Required for: $leftDsAsRange with windows: ${windowToOutputColumnName.keys.mkString(", ")} (days) + |Required partitions: ${requiredPartitions.mkString(", ")} + |Existing partitions: ${existingSnapshotPartitions.mkString(", ")} + |Run snapshot job for missing days.""".stripMargin ) }
136-138
: Add handling for empty DataFrames.Consider validating that rightDfs isn't empty before folding.
+ if (rightDfs.isEmpty) { + logger.warn("No right DataFrames to join. Returning base DataFrame.") + return joinBaseDf + } val joined = rightDfs.foldLeft(joinBaseDf) { (left, right) => joinWithLeft(left, right, joinPart) }spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (5)
17-39
: Well-structured test class setup, but could use documentation.Add class-level scaladoc to explain test purpose.
+/** + * Test suite for LabelJoinV2 functionality. + * Tests label join operation with various data configurations and validates output. + */ class LabelJoinV2Test extends AnyFlatSpec {
42-49
: Extract schema definition to constants.Move schema definitions to class-level constants for reusability.
+ // Schema definitions + private val ViewsSchema = List( + Column("user", api.StringType, 10000), + Column("item", api.StringType, 100), + Column("time_spent_ms", api.LongType, 5000) + ) it should "label join v2" in { - - val viewsSchema = List( - Column("user", api.StringType, 10000), - Column("item", api.StringType, 100), - Column("time_spent_ms", api.LongType, 5000) - ) val viewsTable = s"$namespace.view_events" - DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable) + DataFrameGen.events(spark, ViewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable)
104-110
: Implementation looks good, but add comments.Add comments explaining the label join computation steps.
// Now compute the snapshots for the label join GroupBy.computeBackfill(labelsGroupBy, today, tableUtils) tableUtils.sql(s"SELECT * FROM ${labelsGroupBy.metaData.outputTable}").show() + // Initialize LabelJoinV2 with the join configuration and compute the label join // Now compute the label join for forty days ago val labelJoin = new LabelJoinV2(joinConf, tableUtils, fortyDaysAgo) val result = labelJoin.compute()
113-130
: Refactor SQL string construction for maintainability.Long SQL strings are hard to maintain. Extract parts to variables.
+ // SQL query parts + val joinTableQuery = s"SELECT * FROM $joinOutputTable WHERE ds = \"$fortyDaysAgo\"" + val aggregationQuery = s""" + SELECT + item, SUM(time_spent_ms) as time_spent_ms_sum_7d + FROM + $viewsTable + WHERE + ds BETWEEN $thirtyThreeDaysAgo AND $thirtyNineDaysAgo + GROUP BY + item + """ + val expected = s""" - | SELECT j.*, gb.time_spent_ms_sum_7d FROM - | (SELECT * FROM $joinOutputTable WHERE ds = "$fortyDaysAgo") as j - | JOIN - | ( - | SELECT - | item, SUM(time_spent_ms) as time_spent_ms_sum_7d - | FROM - | $viewsTable - | WHERE - | ds BETWEEN $thirtyThreeDaysAgo AND $thirtyNineDaysAgo - | GROUP BY - | item - | ) as gb - | on j.item = gb.item + | SELECT j.*, gb.time_spent_ms_sum_7d FROM + | ($joinTableQuery) as j + | JOIN + | ($aggregationQuery) as gb + | on j.item = gb.item |""".stripMargin
144-149
: Method looks good, but could benefit from documentation.Add method documentation to explain purpose and parameters.
+ /** + * Creates a test label join configuration from a sequence of GroupBy configurations. + * @param groupBys Sequence of GroupBy configurations to include in the label join + * @return LabelParts configuration for testing + */ def createTestLabelJoin(groupBys: Seq[ai.chronon.api.GroupBy]): ai.chronon.api.LabelParts = { val labelJoinParts = groupBys.map(gb => Builders.JoinPart(groupBy = gb)).toList Builders.LabelPart( labels = labelJoinParts ) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(59-83)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (15)
Extensions
(43-1254)outputLabelTable
(122-122)outputLabelTableV2
(125-127)dataModel
(322-327)dataModel
(458-465)setups
(479-487)setups
(781-785)setups
(1047-1051)outputColumnName
(194-196)keys
(537-547)keys
(810-823)rightToLeft
(748-758)valueColumns
(517-537)valueColumns
(746-746)pretty
(1092-1097)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_tests
- GitHub Check: enforce_triggered_workflows
def computeLabelJoin(joinBaseDf: DataFrame, leftDsAsRange: PartitionRange): DataFrame = { | ||
// TODO -- support multiple (going to change API first) | ||
val labelJoinPart = labelJoinConf.labels.asScala.head | ||
|
||
// First get a map of window to output column name for the labels within that window | ||
val windowToOutputColumnName = labelJoinPart.groupBy.aggregations.asScala | ||
.flatMap { agg => | ||
agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w => | ||
// TODO -- support buckets | ||
val aggPart = AggregationPart(agg.operation, agg.inputColumn, w) | ||
(w.length, aggPart.outputColumnName) | ||
} | ||
} | ||
.groupBy(_._1) | ||
.mapValues(_.map(_._2)) | ||
.toMap | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Address TODO comments before finalizing.
Multiple TODO comments indicate incomplete functionality.
🏁 Script executed:
#!/bin/bash
# Check for other TODOs in the codebase related to LabelJoin
rg -A 2 "TODO.*(?i)labeljoin" --type scala
Length of output: 44
Action Required: Resolve TODOs in LabelJoinV2.scala
The file still has two TODO comments (for supporting multiple labels and buckets) that need to be addressed before finalizing the PR.
- Verify if these are still pending features.
- Remove or implement the TODO sections as appropriate.
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)
17-39
: Consider organizing date variables for clarity.The date variables are many and scattered. Consider grouping them into a DateFixtures object.
24-24
: Unused variable.labelDS is defined but never used.
40-105
: Test cleanup missing.Test creates tables but doesn't drop them after completion.
Add cleanup code after assertions to drop created tables.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(59-83)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (6)
1-16
: Imports look good.All necessary imports are present.
40-134
: Missing test coverage for edge cases.Only testing happy path. Add tests for error conditions.
109-111
: LGTM - Core test logic.This correctly tests the LabelJoinV2.compute() functionality.
114-122
: SQL query verification approach is solid.Using SQL for expected results is a good approach.
127-133
: Good debugging output.Helpful diagnostic information when tests fail.
136-141
: Utility method isn't used.createTestLabelJoin is defined but not used in this test.
Is this intended for future tests? If not, consider removing it.
705f0d7
to
f4e45ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
24-24
: Parameterize date to improve test reusability.You could avoid a hard-coded date for flexible test scenarios:
- private val labelDS = "2022-10-30" + private def labelDS = java.time.LocalDate.now().toStringspark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)
41-42
: Consider supporting multiple label joins.
85-87
: Implement or remove TODO for multiple label joins.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
api/src/main/scala/ai/chronon/api/Extensions.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- api/src/main/scala/ai/chronon/api/Extensions.scala
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (15)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)dataModel
(322-327)dataModel
(458-465)setups
(479-487)setups
(779-783)setups
(1045-1049)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(59-83)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)
40-42
: Add tests for edge cases.
136-141
: Helper function looks good.spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)
25-25
: Good defensive check for the output namespace.
73-76
: Add error handling for scanning and saving steps.
|
||
val leftDsToRange = PartitionRange(joinOutputDsToLabel, joinOutputDsToLabel) | ||
|
||
def finalResult = tableUtils.scanDf(null, outputLabelTable, range = Some(leftDsToRange)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe just get rid of the function here and call scanDf
at the end? For readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)
38-49
: Optimize schema retrieval.
Fetching schemas repeatedly can be costly. Consider collecting them once per table for efficiency.
81-81
: Address the TODO.
Complete or remove the "TODO -- support buckets" if no longer needed.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (14)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)outputTable
(121-121)dataModel
(322-327)dataModel
(458-465)team
(993-993)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (14)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (4)
1-25
: Looks fine.
50-69
: Assertion checks are good.
101-186
: Wrap save logic with fallback.
Consider error handling if.save()
fails, preventing partial data writes.
188-221
: Looks correct.
Verify column type alignment if sources differ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)
38-48
: Add method documentation.Add Scaladoc explaining the purpose and return values.
71-99
: Complex transformation chain needs refactoring.Break this method into smaller steps for better readability.
def getWindowToLabelOutputInfos() = { // Create a map of window to LabelOutputInfo // Each window could be shared across multiple labelJoinParts val labelJoinParts = labelJoinConf.labels.asScala val labelDsAsRange = PartitionRange(labelDs, labelDs) + // Step 1: Extract window, output column, and join date information for each aggregation + val windowInfoByLabelJoinPart = labelJoinParts.map { labelJoinPart => + val windowInfo = labelJoinPart.groupBy.aggregations.asScala + .flatMap { agg => + agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w => + // TODO -- support buckets + val aggPart = AggregationPart(agg.operation, agg.inputColumn, w) + val joinDs = labelDsAsRange.shift(w.length * -1) + (w.length, aggPart.outputColumnName, joinDs) + } + } + .groupBy(_._1) + .mapValues(_.map(_._2)) + .mapValues((labelJoinPart, _)) + windowInfo + } + + // Step 2: Combine all window info across label join parts + val flattenedWindowInfo = windowInfoByLabelJoinPart.flatMap(_.toList) + + // Step 3: Group by window length and create final output info + flattenedWindowInfo + .groupBy(_._1) + .mapValues(_.map(_._2)) + .map { case (window, labelJoinPartAndOutputCols) => + val labelInfos = labelJoinPartAndOutputCols.map(info => LabelPartOutputInfo(info._1, info._2)) + val joinPartitionDsAsRange = labelDsAsRange.shift(window * -1) + window -> LabelOutputInfo(joinPartitionDsAsRange, labelInfos) + } - labelJoinParts - .map { labelJoinPart => - labelJoinPart.groupBy.aggregations.asScala - .flatMap { agg => - agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w => - // TODO -- support buckets - val aggPart = AggregationPart(agg.operation, agg.inputColumn, w) - val joinDs = labelDsAsRange.shift(w.length * -1) - (w.length, aggPart.outputColumnName, joinDs) - } - } - .groupBy(_._1) - .mapValues(_.map(_._2)) // Drop the window (it's the key) - .mapValues((labelJoinPart, _)) // add the labelJoinPart - } - .flatMap(_.toList) - .groupBy(_._1) // Flatten map and combine into one map with window as key and all labelJoinParts as value - .mapValues(_.map(_._2)) // Drop the duplicate window - .map { case (window, labelJoinPartAndOutputCols) => - val labelInfos = labelJoinPartAndOutputCols.map(info => LabelPartOutputInfo(info._1, info._2)) - val joinPartitionDsAsRange = labelDsAsRange.shift(window * -1) - window -> LabelOutputInfo(joinPartitionDsAsRange, labelInfos) - } }
101-209
: Refactor long compute method.Extract logical sections into separate methods for better maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)outputTable
(121-121)dataModel
(322-327)dataModel
(458-465)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(101-209)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: join_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)
39-138
: Missing test coverage for edge cases.Current test only covers happy path. Add tests for error scenarios.
140-145
: LGTM! Helper method implementation looks good.Clean implementation for creating test label joins from GroupBy configurations.
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (4)
26-37
: Initialization looks good.Constructor properly validates namespace and sets up necessary components.
219-223
: Use Set for efficient filtering of non-value columns.Replace array contains check with Set membership for better performance.
-val nonValueColumns = joinPart.rightToLeft.keys.toArray ++ Array(Constants.TimeColumn, - tableUtils.partitionColumn, - Constants.TimePartitionColumn, - Constants.LabelPartitionColumn) -val valueColumns = rightDf.schema.names.filterNot(nonValueColumns.contains) +val nonValueColumns = (joinPart.rightToLeft.keys ++ Set( + Constants.TimeColumn, + tableUtils.partitionColumn, + Constants.TimePartitionColumn, + Constants.LabelPartitionColumn +)).toSet +val valueColumns = rightDf.schema.names.filterNot(nonValueColumns)
112-142
: Good error handling for missing partitions.Excellent logging and error handling for missing partition scenarios.
187-193
: Good metrics tracking implementation.Proper performance monitoring with detailed logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)
45-49
: Consolidate repeated schema definitions.
You define similar schemas multiple times. Factor them out to a helper method for cleaner code.Also applies to: 145-149
95-101
: Avoid duplicating joinConf setup.
Use a shared method to build the join configuration to reduce repetition.Also applies to: 208-213
102-116
: Consider empty result set test.
Try verifying behavior when the snapshot or item queries table is empty.spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)
37-37
: Fix variable name typo.
RenamelableColumnPrefix
tolabelColumnPrefix
for clarity.- private val lableColumnPrefix = "label_" + private val labelColumnPrefix = "label_"
82-82
: Resolve remaining TODO.
"Support buckets" is still pending. Clarify whether to implement or remove the placeholder.Do you want me to open an issue to track adding bucket support?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)outputTable
(121-121)dataModel
(322-327)dataModel
(458-465)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(102-224)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)
43-69
: Add negative test paths.
Coverage is limited to success scenarios. Missing partition handling isn't tested.
143-179
: Well done handling multiple label parts.
Consider adding edge-case tests for zero or null partitions to ensure resilience.spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
234-244
: Validate overwriting columns carefully.
Dropping and re-adding columns may cause unexpected issues if columns were previously renamed or typed differently.
@@ -122,6 +122,8 @@ object Extensions { | |||
def outputLabelTable: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labels" | |||
def outputFinalView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled" | |||
def outputLatestLabelView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled_latest" | |||
def outputLabelTableV2: String = | |||
s"${metaData.outputNamespace}.${metaData.cleanName}_with_labels" // Used for the LabelJoinV2 flow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets call this "_labels" and the other one "_labels_legacy"
args.joinConf, | ||
tableUtils, | ||
args.endDate() | ||
) | ||
labelJoin.computeLabelJoin(args.stepDays.toOption) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cuz we need step days only on the joinpart and on the actual label join job - we always step days = 1?
import scala.collection.Seq | ||
|
||
case class LabelPartOutputInfo(labelPart: JoinPart, outputColumnNames: Seq[String]) | ||
case class LabelOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indicate that this is all the label columns / plural
case class LabelOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo]) | |
case class AllLabelsOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo]) |
(w.length, aggPart.outputColumnName, joinDs) | ||
} | ||
} | ||
.groupBy(_._1) | ||
.mapValues(_.map(_._2)) // Drop the window (it's the key) | ||
.mapValues((labelJoinPart, _)) // add the labelJoinPart | ||
} | ||
.flatMap(_.toList) | ||
.groupBy(_._1) // Flatten map and combine into one map with window as key | ||
.mapValues(_.map(_._2)) // Drop the duplicate window | ||
.map { case (window, labelJoinPartAndOutputCols) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we simplify this actually? the inner grouping is probably not necessary, and you could probably directly compute LabelPartOutputInfo
right here in the inner loop.
|
||
// Each unique window is an output partition in the joined table | ||
// Each window may contain a subset of the joinParts and their columns | ||
validWindowToOutputs.foreach { case (windowLength, joinOutputInfo) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: split the inner blocks into separate functions
} | ||
} | ||
|
||
def compute(): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function is pretty large - can we split this actually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets make the logic easy to follow - added few readability related comments.
import scala.collection.Seq | ||
|
||
case class LabelPartOutputInfo(labelPart: JoinPart, outputColumnNames: Seq[String]) | ||
case class LabelOutputInfo(joinDsAsRange: PartitionRange, LabelPartOutputInfos: Seq[LabelPartOutputInfo]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
camelCase on the LabelpartOutputInfos
labelJoinPart.groupBy.aggregations.asScala | ||
.flatMap { agg => | ||
agg.windows.asScala.filter(_.timeUnit == TimeUnit.DAYS).map { w => | ||
// TODO -- support buckets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do an assertion here in the meantime?
private val tableName = "test_label_join" | ||
private val labelDS = "2022-10-30" | ||
private val tableUtils = TableTestUtils(spark) | ||
tableUtils.createDatabase(namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this to the end of the variable declarations to separate the different steps.
private val confTableProps = Option(joinConf.metaData.tableProperties) | ||
.map(_.asScala.toMap) | ||
.getOrElse(Map.empty[String, String]) | ||
private val lableColumnPrefix = "label_" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about double underscore to make clear this is engine-generated? If this is convention already elsewhere no worries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (5)
15-15
: Remove unused import.
BloomFilter is unused. Consider removing.
37-37
: Rename for clarity.
ReplacelableColumnPrefix
withlabelColumnPrefix
.
83-83
: Implement or remove TODO.
Bucket support is incomplete.Should I open a new issue to track it?
146-152
: Handle partial partitions.
Consider fallback logic for partially missing partitions.
186-201
: Consolidate partition logic.
Combining branches for base join may simplify maintenance.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)outputTable
(121-121)dataModel
(322-327)dataModel
(458-465)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (2)
52-71
: Add test coverage.
EnsurerunAssertions()
is validated by unit tests.
230-263
: Re-check column collisions.
Confirm columns don’t conflict after key renaming and prefixing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)
37-37
: Typo in prefix variable
RenamelableColumnPrefix
tolabelColumnPrefix
for clarity.- private val lableColumnPrefix = "label_" + private val labelColumnPrefix = "label_"
83-84
: TODO bucket support
Would you like help to implement or remove the placeholder?
218-220
: Consider broadcast or filter
Chaining left-outer joins on large data can be costly. Look into Spark broadcast or bloom filters for efficiency.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)outputTable
(121-121)dataModel
(322-327)dataModel
(458-465)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)
1-266
: Add a dedicated test suite
No unit tests found here. Kindly add a suite to validate each method’s logic.
213-214
: Confirm partition range for snapshots
You're scanning the right DF withlabelDsAsRange
but left DF with a shifted range. Verify this alignment.
239-243
: Use Set for non-value columns
Optimize membership checks as previously suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)
20-20
: Move Spark init after variable declarations.
Keeps initialization steps grouped together.
299-371
: Consider a partition gap test.
Verifies progression when some windows are missing.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(103-170)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)
37-99
: Add edge-case tests for missing partitions & empty dataframes.
Current coverage focuses on standard flows; negative scenarios remain untested.
137-194
: Nice coverage of multiple label parts.
This addresses the prior request for multi-part testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (7)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (7)
37-37
: Fix typo in variable name.
lableColumnPrefix
should belabelColumnPrefix
.- private val lableColumnPrefix = "label_" + private val labelColumnPrefix = "label_"
98-99
: Document the shift operation purpose.Add comment explaining why backward shift by window length is necessary.
- // The labelDs is a lookback from the labelSnapshot partition back to the join output table + // We shift backward by window length because we need to find the join output partition + // that corresponds to the start of the window period relative to our label date
104-171
: Decompose large compute method.Break down into smaller, focused functions for improved readability.
Extract validation, partition checking, and dataframe union into separate methods.
240-244
: Improve non-value columns filtering.Use Set for more efficient membership testing.
- val nonValueColumns = joinPart.rightToLeft.keys.toArray ++ Array(Constants.TimeColumn, - tableUtils.partitionColumn, - Constants.TimePartitionColumn, - Constants.LabelPartitionColumn) - val valueColumns = rightDf.schema.names.filterNot(nonValueColumns.contains) + val nonValueColumns = (joinPart.rightToLeft.keys ++ Set( + Constants.TimeColumn, + tableUtils.partitionColumn, + Constants.TimePartitionColumn, + Constants.LabelPartitionColumn + )).toSet + val valueColumns = rightDf.schema.names.filterNot(nonValueColumns)
248-248
: Simplify column dropping.Use more concise approach for dropping multiple columns.
- val cleanLeftDf = valueColumns.foldLeft(leftDf)((df, colName) => df.drop(s"${lableColumnPrefix}_$colName")) + val columnsToRemove = valueColumns.map(colName => s"${labelColumnPrefix}_$colName") + val cleanLeftDf = leftDf.drop(columnsToRemove:_*)
167-170
: Handle empty dataframe sequences.Add check before union operation to avoid potential issues.
if (allOutputDfs.length == 1) { allOutputDfs.head + } else if (allOutputDfs.isEmpty) { + throw new IllegalStateException("No output dataframes were generated") } else { allOutputDfs.reduce(_ union _) }
263-264
: Add error handling for join operation.Consider adding try-catch block for join errors.
Wrap join with error handling to provide better diagnostics when join fails.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)outputTable
(121-121)dataModel
(322-327)dataModel
(458-465)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
// TODO -- support buckets | ||
assert(Option(agg.buckets).isEmpty, "Buckets as labels are not yet supported in LabelJoinV2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Add implementation plan for bucket support.
The TODO comment lacks context about implementation timeline.
🏁 Script executed:
#!/bin/bash
# Check other TODOs for bucket implementation
rg "TODO.*bucket" --type scala
Length of output: 461
Bucket support: Implementation plan required
The TODO in spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(lines 83-84) is too terse. Update it with a brief plan and timeline for bucket support.
e4bfd24
to
d6a35b9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)
17-36
: Class initialization is correct.Consistent use of SparkSession and initialization logic. Consider adding test-level teardown if needed for cleanup.
37-135
: Consider edge-case tests.Tests appear robust. Add negative scenarios (e.g., no matching partitions or empty input data) to ensure failure paths are covered.
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)
17-39
: Check for bucket support implementation.A TODO indicates missing bucket support. If it's planned soon, consider clarifying the timeline or removing the TODO.
40-70
: Schema extraction might be optimized.Repeatedly scanning schemas can be costlier on large tables. Consider caching schemas if performance becomes an issue.
72-102
: Window mapping logic is clear.Mapping windows to output info is straightforward. Possibly split large functions for easier maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
api/src/main/scala/ai/chronon/api/Extensions.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- api/src/main/scala/ai/chronon/api/Extensions.scala
- spark/src/main/scala/ai/chronon/spark/Driver.scala
🧰 Additional context used
🧬 Code Definitions (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(103-169)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (13)
Extensions
(43-1252)outputLabelTable
(122-122)outputLabelTableV2
(125-127)outputTable
(121-121)dataModel
(322-327)dataModel
(458-465)outputColumnName
(194-196)keys
(537-547)keys
(808-821)rightToLeft
(746-756)valueColumns
(517-537)valueColumns
(744-744)pretty
(1090-1095)
⏰ Context from checks skipped due to timeout of 90000ms (15)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (4)
1-16
: Imports look fine.All relevant dependencies are properly referenced and there's no redundancy.
137-196
: Good handling of multiple label parts.Ensures coverage for multi-window scenarios. Possibly add a test for single aggregator with multiple columns.
197-294
: Union logic is clear.Combining partial results into the final DataFrame is well-structured. No immediate issues found.
295-374
: Final checks on label join consistency.Verifies updated columns for second run. This ensures incremental labeling. Nice coverage.
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (3)
1-16
: Imports and definitions look correct.Context classes and logger usage are consistent with project style.
51-70
: Assertion checks are good.Ensures correct data model usage and daily windows. All critical validations are present.
170-265
: Overall flow is coherent.Method names and logging clarify the label join steps. Implementation aligns with best practices.
def compute(): DataFrame = { | ||
logger.info(s"Running LabelJoinV2 for $labelDs") | ||
|
||
runAssertions() | ||
|
||
// First get a map of window to LabelOutputInfo | ||
val windowToLabelOutputInfos = getWindowToLabelOutputInfos | ||
|
||
// Find existing partition in the join table | ||
val joinTable = joinConf.metaData.outputTable | ||
val existingJoinPartitions = tableUtils.partitions(joinTable) | ||
|
||
// Split the windows into two groups, one that has a corresponding partition in the join table and one that doesn't | ||
// If a partition is missing, we can't compute the labels for that window, but the job will proceed with the rest | ||
val (computableWindowToOutputs, missingWindowToOutputs) = windowToLabelOutputInfos.partition { | ||
case (_, labelOutputInfo) => | ||
existingJoinPartitions.contains(labelOutputInfo.joinDsAsRange.start) | ||
} | ||
|
||
if (missingWindowToOutputs.nonEmpty) { | ||
logger.info( | ||
s"""Missing following partitions from $joinTable: ${missingWindowToOutputs.values | ||
.map(_.joinDsAsRange.start) | ||
.mkString(", ")} | ||
| | ||
|Found existing partitions: ${existingJoinPartitions.mkString(", ")} | ||
| | ||
|Therefore unable to compute the labels for ${missingWindowToOutputs.keys.mkString(", ")} | ||
| | ||
|For requested ds: $labelDs | ||
| | ||
|Proceeding with valid windows: ${computableWindowToOutputs.keys.mkString(", ")} | ||
| | ||
|""".stripMargin | ||
) | ||
|
||
require( | ||
computableWindowToOutputs.isEmpty, | ||
"No valid windows to compute labels for given the existing join output range." + | ||
s"Consider backfilling the join output table for the following days: ${missingWindowToOutputs.values.map(_.joinDsAsRange.start)}." | ||
) | ||
} | ||
|
||
// Find existing partition in the outputLabelTable (different from the join output table used above) | ||
// This is used below in computing baseJoinDf | ||
val existingLabelTableOutputPartitions = tableUtils.partitions(outputLabelTable) | ||
logger.info(s"Found existing partitions in Label Table: ${existingLabelTableOutputPartitions.mkString(", ")}") | ||
|
||
// Each unique window is an output partition in the joined table | ||
// Each window may contain a subset of the joinParts and their columns | ||
computableWindowToOutputs.foreach { case (windowLength, joinOutputInfo) => | ||
computeOutputForWindow(windowLength, joinOutputInfo, existingLabelTableOutputPartitions, windowToLabelOutputInfos) | ||
} | ||
|
||
val allOutputDfs = computableWindowToOutputs.values | ||
.map(_.joinDsAsRange) | ||
.map { range => | ||
tableUtils.scanDf(null, outputLabelTable, range = Some(range)) | ||
} | ||
.toSeq | ||
|
||
if (allOutputDfs.length == 1) { | ||
allOutputDfs.head | ||
} else { | ||
allOutputDfs.reduce(_ union _) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
compute() is quite long.
Splitting into smaller methods would improve readability. Consider factoring out partition checks and DataFrame merges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)
37-74
: Consider edge-case tests.
These lines only test normal usage. Add more tests for missing partitions, empty DataFrames, or incomplete data to improve reliability.
137-294
: Good coverage of multiple windows.
This validates various window aggregations. Consider verifying performance with large data.
299-303
: Extend incremental scenario checks.
We see a second run. Testing for partial partitions or no new partitions would be helpful.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(103-169)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (2)
76-78
: Keep labelParts flexible.
Multiple label parts can be tested in the same spec for clarity, but the second test covers it too.
371-372
: Overall nicely structured tests!
All checks and comparisons look consistent.
eba4664
to
2406274
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
1-375
: 🛠️ Refactor suggestionAdd test for error handling when no valid windows exist.
Missing tests for error conditions, such as when no valid windows are available for computation.
it should "throw an error when no valid windows exist" in { // Setup similar to other tests, but with partitions that don't exist in the join table // Use a partition that doesn't exist in the join table val nonExistentPartition = tableUtils.partitionSpec.minus(today, new Window(200, TimeUnit.DAYS)) val labelJoin = new LabelJoinV2(joinConf, tableUtils, nonExistentPartition) // Expect an IllegalArgumentException when no valid windows are available val exception = intercept[IllegalArgumentException] { labelJoin.compute() } assert(exception.getMessage.contains("No valid windows to compute labels for")) }
🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)
37-371
: Consider extracting repeated test setup code.Both tests contain significant duplicate setup code that could be extracted into helper methods.
// Add these helper methods to reduce duplication + private def createTestDataFrames(viewsTable: String, viewsSchema: List[Column], itemQueriesTable: String): Unit = { + // Create and save views data + DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200).drop("ts").save(viewsTable) + + // Create and save item queries data + val itemQueries = List(Column("item", api.StringType, 100)) + DataFrameGen.events(spark, itemQueries, 2000, partitions = 100).save(itemQueriesTable) + } + + private def createTestSources(viewsTable: String): Source = { + Builders.Source.events( + query = Builders.Query(selects = Builders.Selects("time_spent_ms"), startPartition = yearAgo), + table = viewsTable + ) + } + + private def createTestJoinConfig(viewsSource: Source, labelParts: LabelPart, itemQueriesTable: String, + viewsGroupBy: GroupBy, testName: String): Join = { + val start = tableUtils.partitionSpec.minus(today, new Window(100, TimeUnit.DAYS)) + Builders.Join( + left = Builders.Source.events(Builders.Query(startPartition = start), table = itemQueriesTable), + joinParts = Seq(Builders.JoinPart(groupBy = viewsGroupBy, prefix = "user")), + labelParts = labelParts, + metaData = Builders.MetaData(name = s"test.item_snapshot_features_$testName", namespace = namespace, team = "chronon") + ) + }
231-279
: Consider breaking down complex SQL into more readable parts.The SQL query is quite complex with multiple UNIONs. Extract common parts or use string interpolation for readability.
+ private def createExpectedSql( + joinOutputTable: String, + labelGbOutputTable: String, + labelGbOutputTable2: String, + fortyDaysAgo: String, + fortyThreeDaysAgo: String, + fortySevenDaysAgo: String, + thirtyThreeDaysAgo: String + ): String = { + val part1 = s""" + SELECT + j.*, + gb.time_spent_ms_sum_7d as label__time_spent_ms_sum_7d, + null as label__time_spent_ms_sum_10d, + gb2.time_spent_ms_max_7d as label__time_spent_ms_max_7d, + null as label__time_spent_ms_max_14d + FROM + (SELECT * FROM $joinOutputTable WHERE ds = "$fortyDaysAgo") as j + LEFT OUTER JOIN + (SELECT * FROM $labelGbOutputTable WHERE ds = "$thirtyThreeDaysAgo") as gb + on j.item = gb.item + LEFT OUTER JOIN + (SELECT * FROM $labelGbOutputTable2 WHERE ds = "$thirtyThreeDaysAgo") as gb2 + on j.item = gb2.item + """ + + // Similar for part2 and part3 + + s"$part1 UNION $part2 UNION $part3" + }
105-108
: Add assertions to verify specific column values.Beyond checking for zero difference, add assertions that verify specific column values.
val labelComputed = labelJoin.compute() println("Label computed::") labelComputed.show() + + // Add assertions for specific column values + val firstRow = labelComputed.limit(1).collect().headOption + assert(firstRow.isDefined, "Expected at least one row in the result") + firstRow.foreach { row => + assert(row.getAs[String]("item") != null, "Item column should not be null") + // Add more specific assertions for your expected label column values + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
api/src/main/scala/ai/chronon/api/Extensions.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- api/src/main/scala/ai/chronon/api/Extensions.scala
- spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala
🧰 Additional context used
🧬 Code Definitions (1)
spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (1)
spark/src/main/scala/ai/chronon/spark/LabelJoinV2.scala (1)
compute
(103-169)
⏰ Context from checks skipped due to timeout of 90000ms (15)
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
spark/src/main/scala/ai/chronon/spark/Driver.scala (2)
420-421
: Updated instantiation to use LabelJoinV2.The code now uses the new LabelJoinV2 class instead of LabelJoin.
425-425
: Simplified method call with no parameters.The
compute()
method no longer requires the stepDays parameter, simplifying the interface.spark/src/test/scala/ai/chronon/spark/test/join/LabelJoinV2Test.scala (3)
16-35
: Setup looks good and covers needed test scenarios.Test class initialization with necessary variables for date ranges.
37-135
: First test case validates basic functionality.The test for single label part and window correctly verifies base functionality through SQL comparison.
137-371
: Comprehensive testing of advanced use case.The second test effectively validates multiple label parts and windows scenario, including the handling of newer windows without losing previous data.
## Summary Implements the simple label join logic to create a materialized table by joining forward looking partitions from the snapshot table back of labelJoinParts back to join output. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an updated mechanism for formatting and standardizing label outputs with the addition of `outputLabelTableV2`. - Added a new distributed label join operation with the `LabelJoinV2` class, featuring robust validations, comprehensive error handling, and detailed logging for improved data integration. - Implemented a comprehensive test suite for the `LabelJoinV2` functionality to ensure accuracy and reliability of label joins. - **Updates** - Replaced the existing `LabelJoin` class with the new `LabelJoinV2` class, enhancing the label join process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: ezvz <[email protected]>
## Summary Implements the simple label join logic to create a materialized table by joining forward looking partitions from the snapshot table back of labelJoinParts back to join output. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an updated mechanism for formatting and standardizing label outputs with the addition of `outputLabelTableV2`. - Added a new distributed label join operation with the `LabelJoinV2` class, featuring robust validations, comprehensive error handling, and detailed logging for improved data integration. - Implemented a comprehensive test suite for the `LabelJoinV2` functionality to ensure accuracy and reliability of label joins. - **Updates** - Replaced the existing `LabelJoin` class with the new `LabelJoinV2` class, enhancing the label join process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: ezvz <[email protected]>
## Summary Implements the simple label join logic to create a materialized table by joining forward looking partitions from the snapshot table back of labelJoinParts back to join output. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an updated mechanism for formatting and standardizing label outputs with the addition of `outputLabelTableV2`. - Added a new distributed label join operation with the `LabelJoinV2` class, featuring robust validations, comprehensive error handling, and detailed logging for improved data integration. - Implemented a comprehensive test suite for the `LabelJoinV2` functionality to ensure accuracy and reliability of label joins. - **Updates** - Replaced the existing `LabelJoin` class with the new `LabelJoinV2` class, enhancing the label join process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: ezvz <[email protected]>
## Summary Implements the simple label join logic to create a materialized table by joining forward looking partitions from the snapshot table back of labelJoinParts back to join output. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an updated mechanism for formatting and standardizing label outputs with the addition of `outputLabelTableV2`. - Added a new distributed label join operation with the `LabelJoinV2` class, featuring robust validations, comprehensive error handling, and detailed logging for improved data integration. - Implemented a comprehensive test suite for the `LabelJoinV2` functionality to ensure accuracy and reliability of label joins. - **Updates** - Replaced the existing `LabelJoin` class with the new `LabelJoinV2` class, enhancing the label join process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: ezvz <[email protected]>
## Summary Implements the simple label join logic to create a materialized table by joining forward looking partitions from the snapshot table baour clients of labelJoinParts baour clients to join output. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an updated mechanism for formatting and standardizing label outputs with the addition of `outputLabelTableV2`. - Added a new distributed label join operation with the `LabelJoinV2` class, featuring robust validations, comprehensive error handling, and detailed logging for improved data integration. - Implemented a comprehensive test suite for the `LabelJoinV2` functionality to ensure accuracy and reliability of label joins. - **Updates** - Replaced the existing `LabelJoin` class with the new `LabelJoinV2` class, enhancing the label join process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: ezvz <[email protected]>
Summary
TODO: Unit tests
Implements the simple label join logic to create a materialized table by joining forward looking partitions from the snapshot table back of labelJoinParts back to join output.
Checklist
Summary by CodeRabbit
New Features
outputLabelTableV2
.LabelJoinV2
class, featuring robust validations, comprehensive error handling, and detailed logging for improved data integration.LabelJoinV2
functionality to ensure accuracy and reliability of label joins.Updates
LabelJoin
class with the newLabelJoinV2
class, enhancing the label join process.