Skip to content

fix: remove references to custom json, float essential apis to top #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 13, 2025

Conversation

nikhil-zlai
Copy link
Contributor

@nikhil-zlai nikhil-zlai commented Mar 10, 2025

Summary

custom json is not supposed to be relied on in scala code at all - it is purely meant for users to attach THEIR metadata to objects.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced new configuration flags that improve control over tiling behavior and streaming error handling.
    • Expanded metadata support to include additional output partition columns, enhancing data staging and query processing.
  • Documentation

    • Updated instructions for enabling tiled reads to reflect the new configuration parameter.
  • Tests

    • Enhanced test coverage to validate the new serving and tiling configurations.

Copy link

coderabbitai bot commented Mar 10, 2025

Walkthrough

The PR removes legacy JSON parsing and team extraction methods from metadata extensions and online endpoints. It replaces obsolete tiling checks with new flag-based methods (e.g., servingFlagValue, tilingFlag, dontThrowOnDecodeFailFlag). Updates occur across API, online, testing, Spark, Flink, Python, Thrift definitions, and documentation. Additionally, the MetaData builder now accepts new parameters for partition columns and execution information.

Changes

Files Change Summary
api/.../Extensions.scala
api/.../Builders.scala
Removed legacy JSON/team extraction methods; added flag methods to GroupByOps and updated MetaData constructor (removed dependencies, added additionalOutputPartitionColumns and executionInfo).
api/.../test/ExtensionsTest.scala
spark/.../FetcherTest.scala
spark/.../StagingQueryTest.scala
Removed outdated tests; replaced custom JSON tests with serving flag validations; updated Spark session handling and assertions for partition columns & tiling flag.
api/thrift/api.thrift
api/thrift/common.thrift
Modified MetaData structure: replaced/renumbered fields and added serving-related fields; introduced new structs for environment & configuration properties with a serving map.
online/.../MetadataEndPoint.scala
online/.../Metrics.scala
online/.../GroupByServingInfoParsed.scala
Removed methods for JSON-based team extraction and tiling checks; updated references from owningTeam to direct team property.
online/.../fetcher/{FetchContext.scala, GroupByFetcher.scala, GroupByResponseHandler.scala} Replaced context-based tiling and error flag checks with operation-specific flag methods; updated conditions in streaming decode error handling.
spark/.../StagingQuery.scala Simplified partition column initialization using new additional output partition columns from metadata.
flink/.../FlinkRowAggregators.scala Added mutation detection logic with isMutation and reversalIndex to support deletion handling.
api/py/.../parse_teams.py Extended merging logic to include the new serving property during team parsing.
docs/source/Tiled_Architecture.md Updated configuration instructions from enable_tiling=true to tiling=true within metaData.executionInfo.conf.serving.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant GroupByOps
    participant MetadataEndpoint
    participant ResponseHandler

    Client->>GroupByOps: Request flag values
    GroupByOps-->>Client: Return servingFlagValue, tilingFlag, dontThrowOnDecodeFailFlag
    Client->>MetadataEndpoint: Get team info (direct access)
    MetadataEndpoint-->>Client: Return team value
    Client->>ResponseHandler: Process streaming decode using tilingFlag
    ResponseHandler-->>Client: Handle errors based on dontThrowOnDecodeFailFlag
Loading

Possibly related PRs

Suggested reviewers

  • tchow-zlai
  • piyush-zlai

Poem

Code refines in flag and tile,
Old paths fade with gentle style.
Tests and docs now sing the tune,
New parameters rise like moon.
Cheers to changes, crisp and bright!
🚀✨

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 93f183a and 45b8964.

📒 Files selected for processing (4)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (4 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (8)
api/src/main/scala/ai/chronon/api/Extensions.scala (3)

582-592: New flag-based configuration access method.

Clean implementation for retrieving serving flag values from execution metadata.


594-594: Convenient helper for tiling configuration.

Good use of the new servingFlagValue method to check if tiling is enabled.


596-596: Helper for decoding failure behavior.

Provides a clean way to check the decode failure flag setting.

flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (3)

45-50: Added mutation detection logic.

Properly identifies if we're dealing with entity mutation sources.


52-60: Added reversal column index tracking.

Ensures proper validation when using mutation sources.


100-106: Enhanced update logic to handle deletions.

Now correctly differentiates between updates and deletions based on reversal flag.

spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (2)

558-565: Updated metadata configuration to use new flag approach.

Replaces customJson with executionInfo and serving flag configuration.


567-567: Added verification for tiling flag.

Good test assertion to ensure the flag is properly set.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

.getOrElse(new java.util.ArrayList[String]())
.asInstanceOf[java.util.ArrayList[String]]
.toScala
Seq(tableUtils.partitionColumn) ++ stagingQueryConf.metaData.additionalOutputPartitionColumns.toScala
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the main change that ben needs

Comment on lines +256 to +258
5: optional list<string> additionalOutputPartitionColumns

6: optional map<string, string> tableProperties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep tableProperties the same field number as before (5)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought outputPartitionCols are more important than table props. Safe to change these for now actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should call out that folks need to recompile their existing configs right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call - we need to release a wheel and cut them over. was working on it separately. (basically I am doing the compile for them)


// tag_key -> tag_value - tags allow for repository wide querying, deprecations etc
// this is object level tag - applies to all columns produced by the object - GroupBy, Join, Model etc
6: optional map<string, string> tags
20: optional map<string, string> tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field number here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the spacing in field nums allows for adding things new fields in the right order. so if we find some other thing later, we can add it in the right place instead of in the end.

Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q on removing some of the fetcher events based code (if tests pass we should probably be ok..)

@@ -462,9 +448,9 @@ object Extensions {

// Check if tiling is enabled for a given GroupBy. Defaults to false if the 'enable_tiling' flag isn't set.
def isTilingEnabled: Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can delete I think as this was moved to flagStore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +256 to +258
5: optional list<string> additionalOutputPartitionColumns

6: optional map<string, string> tableProperties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should call out that folks need to recompile their existing configs right?

}
}

private def mergeRawEventsFromStreaming(queryTimeMs: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need this for the cdc side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think so - we can still tile. we can't deal with undeletable aggs in either case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so there is a chaining tests that i want to keep on - which currently can only correctly respond to untiled entries in the inmemory kv store. :-/

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 1bb7c0c and 4604e61.

📒 Files selected for processing (5)
  • flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (4 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (26)
spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala (2)

263-271: Added tilingEnabled parameter to match updated API.

The parameter replaces the custom JSON-based tiling check with an explicit flag.


273-279: Updated API call with explicit tiling parameter.

Explicit tilingEnabled parameter replaces the previous JSON-based check, aligning with PR objectives.

spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (5)

65-65: Formatting improvement.

Added blank line enhances readability between code blocks.


72-72: Formatting improvement.

Added blank line separates logical sections of code.


54-65: Removed assertion that enforced tiling restriction.

Based on the AI summary, an assertion preventing tiling for Entity groupBys was removed. This aligns with the PR objective to remove custom JSON dependencies.


163-163: Added tilingEnabled parameter with default false.

New parameter allows explicit tiling control, replacing previous JSON-based checks.


186-186: Using tilingEnabled parameter.

Parameter is properly passed to the putStreaming method, completing the implementation.

online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (3)

115-115: Simplified control flow.

Direct call to mergeTiledIrsFromStreaming removes conditional branching based on tiling status.


123-146: Improved null safety using Option.

Wrapping streamingResponses in Option prevents NPEs and enables functional transformation.


158-158: Compatible interface adaptation.

Updated to handle the new Option-based approach with streamingIrs.map(_.iterator).orNull.

spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (10)

21-21: No issues with new import.


26-26: Imports look fine.


129-133: Mutation detection logic is concise.


134-139: Dynamic time column logic looks good.


140-142: Indexing appears correct.


143-146: Key builder usage is fine.


154-155: Key extraction for grouping is fine.


159-159: Key and timestamp unpacking is efficient.


167-169: isDelete logic is correct, but see aggregator calls below.


183-183: Final tuple creation is fine.

flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (6)

8-8: Import addition is okay.


45-50: isMutation calculation is clear.


52-60: Reversal column requirement is valid.


80-80: Timestamp retrieval is straightforward.


100-100: Accurate isDelete condition.


102-107: Delete/Update calls are correctly ordered.

Comment on lines 171 to 177
// TODO: refactor the prod code and use it in test instead
if (isDelete) {
rowAggregator.update(aggIr, chrononRow)
} else {
rowAggregator.delete(aggIr, chrononRow)
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Aggregator deletion logic is reversed.
If isDelete is true, call delete; otherwise call update:

-if (isDelete) {
-  rowAggregator.update(aggIr, chrononRow)
-} else {
-  rowAggregator.delete(aggIr, chrononRow)
-}
+if (isDelete) {
+  rowAggregator.delete(aggIr, chrononRow)
+} else {
+  rowAggregator.update(aggIr, chrononRow)
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TODO: refactor the prod code and use it in test instead
if (isDelete) {
rowAggregator.update(aggIr, chrononRow)
} else {
rowAggregator.delete(aggIr, chrononRow)
}
// TODO: refactor the prod code and use it in test instead
if (isDelete) {
rowAggregator.delete(aggIr, chrononRow)
} else {
rowAggregator.update(aggIr, chrononRow)
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 08e3315 and 4c75619.

📒 Files selected for processing (8)
  • api/py/ai/chronon/cli/compile/parse_teams.py (2 hunks)
  • api/src/main/scala/ai/chronon/api/Builders.scala (3 hunks)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (1 hunks)
  • api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala (2 hunks)
  • api/thrift/common.thrift (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: python_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (19)
api/py/ai/chronon/cli/compile/parse_teams.py (2)

126-126: Added serving property support.

Properly extends the merge functionality to include the new serving property.


139-139: Consistent serving property handling.

Matches the pattern used for other properties (backfill, upload, streaming).

api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala (3)

21-23: Updated imports to support new flag-based approach.

Added necessary imports for ConfigProperties and ExecutionInfo.


121-134: Replaced JSON-based method with flag-based approach.

The implementation now correctly uses ExecutionInfo with ConfigProperties instead of customJson.


136-144: Updated assertions to use tilingFlag instead of isTilingEnabled.

Test cases appropriately check flag values directly instead of using the removed method.

spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (5)

28-28: Added TileCodec import.

Required for the updated method signature.


58-65: Removed assertion restriction for tiling with entity groupings.

Simplifies code and removes unnecessary constraint.


88-90: Updated memoryStream type to use TileCodec.

Changed from tuple array to a tuple containing an array of TileUpdate and a TileCodec.


98-101: Updated putRequests mapping to use tileIrBytes.

Replaced tileBytes with tileIrBytes for consistency with new approach.


108-112: Used tileIrBytes in PutRequest.

Ensures consistent usage of the new bytes format.

api/src/main/scala/ai/chronon/api/Builders.scala (2)

273-275: Updated MetaData parameters.

Removed dependencies parameter and added support for additionalOutputPartitionColumns and executionInfo.


301-312: Added executionInfo handling logic.

Properly sets executionInfo with historicalBackfill and handles additionalOutputPartitionColumns.

api/thrift/common.thrift (4)

28-38: Well-documented struct explanation.

Good documentation explaining the environment variables layering and priority logic.


44-44: Added new serving configuration support.

New serving field enables structured configuration for serving rather than relying on custom JSON.


47-57: Good documentation for ConfigProperties.

Clear explanation of configuration properties and their layering hierarchy.


63-63: Consistent implementation of serving field.

Correctly added same configuration pattern to both structs.

api/src/main/scala/ai/chronon/api/Extensions.scala (3)

582-592: Good implementation of serving flag retrieval.

New method properly navigates the configuration hierarchy to retrieve serving flags.


594-594: Clean helper for tiling flag.

Simple helper that replaces previous custom JSON implementation with structured configuration.


596-596: Added helper for decode failure flag.

New clean helper method consistent with the flag-based approach.

Comment on lines 582 to 596
def servingFlagValue(flag: String): Option[String] = {
for (
execInfo <- Option(groupBy.metaData.executionInfo);
conf <- Option(execInfo.conf);
servingConf <- Option(conf.serving);
value <- Option(servingConf.get(flag))
) {
return Some(value)
}
None
}

def tilingFlag: Boolean = servingFlagValue("tiling").exists(_.toLowerCase() == "true")

def throwOnDecodeFailFlag: Boolean = servingFlagValue("decode.throw_on_fail").exists(_.toLowerCase() == "true")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify flag-based approach is used consistently.

Ensure all previous custom JSON lookups are replaced with the new flag-based approach.


🏁 Script executed:

#!/bin/bash
# Check for any remaining references to customJson in the codebase
rg -i "customJson" --type scala

Length of output: 1105


Action required: Replace remaining custom JSON lookups.
The flag-based access in Extensions.scala is correct, but customJson is still used elsewhere. Update the following:

  • api/src/main/scala/ai/chronon/api/Builders.scala: Remove the customJson parameter and its usage.
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala & StagingQueryTest.scala: Refactor tests to use the new flag-based approach instead of supplying custom JSON.
  • api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala: Adjust test expectations/comments if needed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (2)

178-180: Remove test-related TODO.

These comments suggest test code in production.

-        // This is not really testing the flink aggregation function
-        // TODO: refactor the prod code and use it in test instead

164-164: Remove commented code.

Dead code should be removed.

-    // val preAgg: Array[Byte] = tileCodec.makeTileIr(aggIr, isComplete = false)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 4c75619 and e8983b5.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (3)

43-45: Good case class addition.

TileUpdate encapsulates data well.


134-143: Clean mutation detection.

Good extraction of mutation logic.


127-127: Better return type.

Structured return type improves code clarity.

Comment on lines 174 to 184
rowAggregator.update(aggIr, chrononRow)
}

val tileCodec = new TileCodec(groupBy, schema)
val preAgg: Array[Byte] = tileCodec.makeTileIr(aggIr, true)

(rowsKeys, tileTimestamp, preAgg)
val isDelete = isMutation && row.getAs[Boolean](reversalIndex)

// This is not really testing the flink aggregation function
// TODO: refactor the prod code and use it in test instead
if (isDelete) {
rowAggregator.delete(aggIr, chrononRow)
} else {
rowAggregator.update(aggIr, chrononRow)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Redundant update call.

Update call on line 174 always happens before the deletion/update check, duplicating work.

-        rowAggregator.update(aggIr, chrononRow)
-
-        val isDelete = isMutation && row.getAs[Boolean](reversalIndex)
-
-        // This is not really testing the flink aggregation function
-        // TODO: refactor the prod code and use it in test instead
-        if (isDelete) {
-          rowAggregator.delete(aggIr, chrononRow)
-        } else {
-          rowAggregator.update(aggIr, chrononRow)
-        }
+        val isDelete = isMutation && row.getAs[Boolean](reversalIndex)
+
+        if (isDelete) {
+          rowAggregator.delete(aggIr, chrononRow)
+        } else {
+          rowAggregator.update(aggIr, chrononRow)
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
rowAggregator.update(aggIr, chrononRow)
}
val tileCodec = new TileCodec(groupBy, schema)
val preAgg: Array[Byte] = tileCodec.makeTileIr(aggIr, true)
(rowsKeys, tileTimestamp, preAgg)
val isDelete = isMutation && row.getAs[Boolean](reversalIndex)
// This is not really testing the flink aggregation function
// TODO: refactor the prod code and use it in test instead
if (isDelete) {
rowAggregator.delete(aggIr, chrononRow)
} else {
rowAggregator.update(aggIr, chrononRow)
}
val isDelete = isMutation && row.getAs[Boolean](reversalIndex)
if (isDelete) {
rowAggregator.delete(aggIr, chrononRow)
} else {
rowAggregator.update(aggIr, chrononRow)
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (2)

127-127: Update function documentation to match new return type.

Return type changed from Array[(Array[Any], Long, Array[Byte])] to (Array[TileUpdate], TileCodec).


176-177: Consider addressing TODO comment.

Could refactor the test code to use production code as suggested in the TODO.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between e8983b5 and 93f183a.

📒 Files selected for processing (2)
  • docs/source/Tiled_Architecture.md (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
docs/source/Tiled_Architecture.md (1)

82-83: Config update check.
Instruction now uses tiling=true on metaData.executionInfo.conf.serving—aligns with the new config and removes custom JSON references.

spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (5)

43-45: Good addition of TileUpdate case class.

Clean encapsulation of update data.


134-143: Clean mutation detection logic.

Properly determines time column based on mutation status.


148-151: Good use of key builder.

Leverages FastHashing for efficient key generation.


175-183: Fixed aggregation logic issues.

Correctly fixed the deletion/update logic as noted in previous reviews.


191-191: Clean return value.

Return tuple provides both updates and codec for downstream consumers.

@nikhil-zlai nikhil-zlai merged commit 20359a2 into main Mar 13, 2025
20 checks passed
@nikhil-zlai nikhil-zlai deleted the nikhil/kill_custom_json branch March 13, 2025 23:18
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
)

## Summary
custom json is not supposed to be relied on in scala code at all - it is
purely meant for users to attach THEIR metadata to objects.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced new configuration flags that improve control over tiling
behavior and streaming error handling.
- Expanded metadata support to include additional output partition
columns, enhancing data staging and query processing.

- **Documentation**
- Updated instructions for enabling tiled reads to reflect the new
configuration parameter.

- **Tests**
- Enhanced test coverage to validate the new serving and tiling
configurations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
)

## Summary
custom json is not supposed to be relied on in scala code at all - it is
purely meant for users to attach THEIR metadata to objects.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced new configuration flags that improve control over tiling
behavior and streaming error handling.
- Expanded metadata support to include additional output partition
columns, enhancing data staging and query processing.

- **Documentation**
- Updated instructions for enabling tiled reads to reflect the new
configuration parameter.

- **Tests**
- Enhanced test coverage to validate the new serving and tiling
configurations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
)

## Summary
custom json is not supposed to be relied on in scala code at all - it is
purely meant for users to attach THEIR metadata to objects.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced new configuration flags that improve control over tiling
behavior and streaming error handling.
- Expanded metadata support to include additional output partition
columns, enhancing data staging and query processing.

- **Documentation**
- Updated instructions for enabling tiled reads to reflect the new
configuration parameter.

- **Tests**
- Enhanced test coverage to validate the new serving and tiling
configurations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
)

## Summary
custom json is not supposed to be relied on in scala code at all - it is
purely meant for users to attach THEIR metadata to objects.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced new configuration flags that improve control over tiling
behavior and streaming error handling.
- Expanded metadata support to include additional output partition
columns, enhancing data staging and query processing.

- **Documentation**
- Updated instructions for enabling tiled reads to reflect the new
configuration parameter.

- **Tests**
- Enhanced test coverage to validate the new serving and tiling
configurations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
)

## Summary
custom json is not supposed to be relied on in scala code at all - it is
purely meant for users to attach THEIR metadata to objects.

## Cheour clientslist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced new configuration flags that improve control over tiling
behavior and streaming error handling.
- Expanded metadata support to include additional output partition
columns, enhancing data staging and query processing.

- **Documentation**
- Updated instructions for enabling tiled reads to reflect the new
configuration parameter.

- **Tests**
- Enhanced test coverage to validate the new serving and tiling
configurations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants