Skip to content

Add Flink validation job + expose verb in streaming #495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Mar 12, 2025

Conversation

piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Mar 11, 2025

Summary

This PR adds a Flink validation job that we can use to read n rows from Kafka using the standard serde rails the user has configured in their topic info (e.g. avro using schema registry) and runs these rows through catalyst util as well as a Spark DF based approach and compares the results. This is something we can use for quick smoke testing of gaps / missing logic in CU when we have a user group by that needs to be run in streaming.

Some notes:

  • As the Spark DF eval is super slow on a row by row basis (I tried this in single task and distributed task mode), I've gone with invoking these via bulk calls - the existing CU code runs very quickly so the bulk call there is essentially n single calls.
  • Currently we ask the user for the number of rows they wish to validate against (with a default value) - an alternative here is to read n seconds of Kafka data. Went with the n rows approach as I thought its easier to reason about.
  • Fixed some dep issues causing the Flink UI to not work ('commons-text' was added in the exclusions list but seems to have been dropped as part of the bazel refactoring we did late last week)

Tested on Etsy.
Scenario 1 - GroupBy with only explode (no transform)

$ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 10000
Total Mismatches: 0
Mismatches:

Scenario 2 - GroupBy with explode + transform (current etsy/zipline main):

zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 0
Total Mismatches: 10000
Mismatches:

RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc
Is Match: false
Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0))
Spark DF Result: WrappedArray()
Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0))
...

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features
    • Enhanced the application’s validation capabilities for streaming and evaluation, including configurable row thresholds and detailed result comparisons.
    • Introduced a new validation job that gathers comprehensive statistics to ensure data consistency.
  • Bug Fixes
    • Resolved dependency compatibility issues to improve the stability of streaming operations.
  • Tests
    • Added extensive unit and integration tests to verify and maintain accurate validation behavior.
  • Chores
    • Improved interoperability between Java and Scala collections for smoother integration.

Copy link

coderabbitai bot commented Mar 11, 2025

Walkthrough

This pull request adds validation support across multiple components. In Python, new validation parameters are introduced for the Runner and GcpRunner classes along with an updated CLI default. In Scala, additional methods and case classes enhance evaluation and comparison between Catalyst and Spark DataFrame results, and a new Flink validation job is implemented with associated tests. Other improvements include refined exception formatting, visibility updates, dependency adjustments in build files, and new implicit Java‑Scala conversion utilities.

Changes

File(s) Change Summary
api/py/.../default_runner.py, api/py/.../gcp.py, api/py/.../run.py Added validation parameters: introduced validate and validate_rows attributes in Runner, passed as flags in GcpRunner, and updated CLI default for validate-rows.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala Reformatted exception message for validation failure for improved readability.
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala Introduced runSparkSQLBulk and runCatalystBulk methods for bulk evaluation and updated metrics tracking.
online/src/main/scala/ai/chronon/online/CatalystUtil.scala Changed visibility of selectClauses and whereClauseOpt from private to public with explicit type annotations.
flink/BUILD.bazel, tools/build_rules/flink/BUILD Added Apache Commons Lang3 dependency and excluded commons-text to resolve version conflicts.
flink/src/main/scala/ai/chronon/flink/validation/SparkExprEvalComparisonFn.scala, flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala, test files Introduced validation utilities: added a ComparisonResult case class with helper methods, implemented a new Flink-based validation job, and enhanced test coverage.
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala Added implicit classes (IterableOps and JIterableOps) for seamless conversion between Java and Scala iterables.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant VFJ as ValidationFlinkJob
    participant DS as DataStream (FlinkSource)
    participant SE as SparkExpressionEvalFn
    participant CF as Comparison Function
    App->>VFJ: run(metadataStore, kafkaBootstrap, groupByName, validateRows)
    VFJ->>DS: Retrieve event stream
    DS-->>VFJ: Stream events (EventRecord)
    VFJ->>SE: Evaluate events using runSparkSQLBulk & runCatalystBulk
    SE-->>VFJ: Return evaluation results
    VFJ->>CF: Compare results via compareResultRows
    CF-->>VFJ: Return ComparisonResult
    VFJ->>App: Return ValidationStats
Loading

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • varant-zlai
  • david-zlai

Poem

In the realm of code where logic steers,
New validation flags now shift the gears.
From Python to Scala, checks are in flight,
Ensuring data streams run smooth and right.
A dance of updates, subtle and neat,
Our code sings on with a rhythmic beat! 🎶

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)

127-185: ⚠️ Potential issue

Duplicate methods should be removed.

Methods on lines 127-185 appear to be duplicates of the newly implemented methods above.

Remove these duplicate method implementations that are redundant with the new ones.

🧹 Nitpick comments (9)
api/py/ai/chronon/repo/run.py (2)

164-165: Fix help text formatting.

There's an extra space in the help text.

-              help="Number of rows to  run the validation on")
+              help="Number of rows to run the validation on")

162-165: Consider increasing the default validation row count.

The default value of 1000 rows may be too small for effective validation of complex transformations.

-@click.option("--validate-rows", default="1000",
+@click.option("--validate-rows", default="10000",

This aligns with PR objectives stating that 10,000 rows were used in testing.

flink/src/main/scala/ai/chronon/flink/validation/SparkExprEvalComparisonFn.scala (3)

26-34: Doc comment is concise and clear.
Small tip: For large results, note potential complexity of repeated sorting in this doc.


50-51: Sorting by toString may be expensive.
Consider a custom comparator or more optimal data structure for large results.


62-67: Reflection-based equality checks can be risky.
For complex or nested fields, consider a more deterministic custom comparator for reliability.

flink/src/main/scala/ai/chronon/flink/ValidateFlinkJob.scala (4)

113-113: Refine match logic.
Comparing only sizes may miss deeper mismatches.


116-116: Populate differences.
This field is always empty. Consider tracking actual mismatches.


125-131: Be cautious with countWindowAll.
Large window sizes could accumulate too many records.


182-182: Avoid print() in production.
Use a logging sink instead.

🛑 Comments failed to post (2)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (1)

142-142: ⚠️ Potential issue

Correct an implementation inconsistency.

There appears to be duplicate code for creating KafkaFlinkSource. Lines 140-145 and lines 151-157 contain similar code blocks, with line 142 adding a duration parameter while the second block doesn't.

-              new KafkaFlinkSource(kafkaBootstrap, deserializationSchema, topicInfo, Some(Duration.apply(1, TimeUnit.SECONDS)))

Remove this entire first block (lines 140-145) as it appears to be an incomplete merge.

Committable suggestion skipped: line range outside the PR's diff.

flink/src/main/scala/ai/chronon/flink/ValidateFlinkJob.scala (1)

29-46: 🛠️ Refactor suggestion

Watch for unbounded memory.
Using a global window without triggers can grow unbounded.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)

40-44: Add null check for consistency.

Missing null check unlike other conversion methods in this file.

  implicit class IterableOps[T](iterable: java.lang.Iterable[T]) {
    def toScala: Iterable[T] = {
-      iterable.asScala
+      if (iterable == null) {
+        null
+      } else {
+        iterable.asScala
+      }
    }
  }

45-49: Add null check for consistency.

Same issue here - missing null check for consistency with other converters.

  implicit class JIterableOps[T](iterable: Iterable[T]) {
    def toJava: java.lang.Iterable[T] = {
-      iterable.asJava
+      if (iterable == null) {
+        null
+      } else {
+        iterable.asJava
+      }
    }
  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 13f8792 and 80d7ce9.

📒 Files selected for processing (5)
  • api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/validation/SparkExprEvalComparisonFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/validation/SparkExprEvalComparisonTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: bazel_config_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: python_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (24)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (1)

40-49: Good addition supporting Java-Scala Iterable conversions.

These converters complete the set of collection conversion utilities, aligning with PR's validation job needs.

flink/src/test/scala/ai/chronon/flink/validation/SparkExprEvalComparisonTest.scala (11)

8-14: Well tested scenario
Covers empty rows.


16-22: Well tested scenario
Checks identical row matches.


24-30: Well tested scenario
Verifies ordering doesn't matter.


32-50: Well tested scenario
Tests complex types thoroughly.


52-60: Well tested scenario
Handles nested structures.


62-69: Well tested scenario
Flags row count mismatch.


71-78: Well tested scenario
Confirms value mismatch detection.


80-87: Well tested scenario
Ensures key mismatch is caught.


89-96: Well tested scenario
Detects extra fields.


98-120: Well tested scenario
Verifies complex mismatches.


122-133: Well tested scenario
Checks nested complex mismatches.

flink/src/main/scala/ai/chronon/flink/validation/SparkExprEvalComparisonFn.scala (5)

8-22: Neat structure
Captures comparison details comprehensively.


24-34: Clear doc
Explains the comparison logic well.


35-46: Concise mismatch detection
Early return clarifies size discrepancies.


48-88: Robust approach
Sorting ensures order-agnostic comparison.


91-105: Thoughtful equality check
Handles nested maps carefully.

flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (3)

19-27: Helpful imports
Needed for new row logic.


123-174: Good bulk approach
Efficiently collects Spark SQL results.


176-186: Consistent with Catalyst
Parallel method ensures matching eval.

flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (4)

23-37: Simple case classes
Clearly define event & stats.


39-71: Focused comparison function
Window-based approach for aggregated validation.


73-120: Well-organized job
Generates stats for requested row batches.


122-181: Comprehensive entry point
Executes validation and logs results.


// Helper method for deep equality - primarily used to special case types like Maps that don't match correctly
// in EqualsBuilder.reflectionEquals across scala versions 2.12 and 2.13.
private def deepEquals(a: Any, b: Any): Boolean = (a, b) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!


val api = buildApi(onlineClassName, props)
val metadataStore = new MetadataStore(FetchContext(api.genKvStore, MetadataDataset))

if (validateMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would pull this out as a function.

Comment on lines +50 to +51
val sortedCatalystResult = catalystResult.map(m => SortedMap[String, Any]() ++ m).sortBy(_.toString)
val sortedSparkDfResult = sparkDfResult.map(m => SortedMap[String, Any]() ++ m).sortBy(_.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make dataframes out of these - we have a nice Compare.sideBySide method that prints only the differing rows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you will be able to kill the rest of the code under.

Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice PR! can you use the Compare.SideBySide function that we use in tests? will require the objects to be dataframes, but i think it is much simpler to do it that way.

@piyush-zlai
Copy link
Contributor Author

(Dropping notes from our discussion on slack - we'll explore migrating to Comparison.sideBySide in a follow up as it requires a bit of refactoring of the existing Spark code + reworking things here)

@piyush-zlai piyush-zlai merged commit a5b4300 into main Mar 12, 2025
9 checks passed
@piyush-zlai piyush-zlai deleted the piyush/flink_validation_job branch March 12, 2025 15:28
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary
This PR adds a Flink validation job that we can use to read n rows from
Kafka using the standard serde rails the user has configured in their
topic info (e.g. avro using schema registry) and runs these rows through
catalyst util as well as a Spark DF based approach and compares the
results. This is something we can use for quick smoke testing of gaps /
missing logic in CU when we have a user group by that needs to be run in
streaming.

Some notes:
* As the Spark DF eval is super slow on a row by row basis (I tried this
in single task and distributed task mode), I've gone with invoking these
via bulk calls - the existing CU code runs very quickly so the bulk call
there is essentially n single calls.
* Currently we ask the user for the number of rows they wish to validate
against (with a default value) - an alternative here is to read n
seconds of Kafka data. Went with the n rows approach as I thought its
easier to reason about.
* Fixed some dep issues causing the Flink UI to not work ('commons-text'
was added in the exclusions list but seems to have been dropped as part
of the bazel refactoring we did late last week)

Tested on Etsy. 
Scenario 1 - GroupBy with only explode (no transform)
```
$ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 10000
Total Mismatches: 0
Mismatches:
```

Scenario 2 - GroupBy with explode + transform (current etsy/zipline
main):
```
zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 0
Total Mismatches: 10000
Mismatches:

RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc
Is Match: false
Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0))
Spark DF Result: WrappedArray()
Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0))
...
```


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced the application’s validation capabilities for streaming and
evaluation, including configurable row thresholds and detailed result
comparisons.
- Introduced a new validation job that gathers comprehensive statistics
to ensure data consistency.
- **Bug Fixes**
- Resolved dependency compatibility issues to improve the stability of
streaming operations.
- **Tests**
- Added extensive unit and integration tests to verify and maintain
accurate validation behavior.
- **Chores**
- Improved interoperability between Java and Scala collections for
smoother integration.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary
This PR adds a Flink validation job that we can use to read n rows from
Kafka using the standard serde rails the user has configured in their
topic info (e.g. avro using schema registry) and runs these rows through
catalyst util as well as a Spark DF based approach and compares the
results. This is something we can use for quick smoke testing of gaps /
missing logic in CU when we have a user group by that needs to be run in
streaming.

Some notes:
* As the Spark DF eval is super slow on a row by row basis (I tried this
in single task and distributed task mode), I've gone with invoking these
via bulk calls - the existing CU code runs very quickly so the bulk call
there is essentially n single calls.
* Currently we ask the user for the number of rows they wish to validate
against (with a default value) - an alternative here is to read n
seconds of Kafka data. Went with the n rows approach as I thought its
easier to reason about.
* Fixed some dep issues causing the Flink UI to not work ('commons-text'
was added in the exclusions list but seems to have been dropped as part
of the bazel refactoring we did late last week)

Tested on Etsy. 
Scenario 1 - GroupBy with only explode (no transform)
```
$ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 10000
Total Mismatches: 0
Mismatches:
```

Scenario 2 - GroupBy with explode + transform (current etsy/zipline
main):
```
zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 0
Total Mismatches: 10000
Mismatches:

RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc
Is Match: false
Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0))
Spark DF Result: WrappedArray()
Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0))
...
```


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced the application’s validation capabilities for streaming and
evaluation, including configurable row thresholds and detailed result
comparisons.
- Introduced a new validation job that gathers comprehensive statistics
to ensure data consistency.
- **Bug Fixes**
- Resolved dependency compatibility issues to improve the stability of
streaming operations.
- **Tests**
- Added extensive unit and integration tests to verify and maintain
accurate validation behavior.
- **Chores**
- Improved interoperability between Java and Scala collections for
smoother integration.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
This PR adds a Flink validation job that we can use to read n rows from
Kafka using the standard serde rails the user has configured in their
topic info (e.g. avro using schema registry) and runs these rows through
catalyst util as well as a Spark DF based approach and compares the
results. This is something we can use for quick smoke testing of gaps /
missing logic in CU when we have a user group by that needs to be run in
streaming.

Some notes:
* As the Spark DF eval is super slow on a row by row basis (I tried this
in single task and distributed task mode), I've gone with invoking these
via bulk calls - the existing CU code runs very quickly so the bulk call
there is essentially n single calls.
* Currently we ask the user for the number of rows they wish to validate
against (with a default value) - an alternative here is to read n
seconds of Kafka data. Went with the n rows approach as I thought its
easier to reason about.
* Fixed some dep issues causing the Flink UI to not work ('commons-text'
was added in the exclusions list but seems to have been dropped as part
of the bazel refactoring we did late last week)

Tested on our clients. 
Scenario 1 - GroupBy with only explode (no transform)
```
$ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 10000
Total Mismatches: 0
Mismatches:
```

Scenario 2 - GroupBy with explode + transform (current our clients/zipline
main):
```
zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 0
Total Mismatches: 10000
Mismatches:

RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc
Is Match: false
Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0))
Spark DF Result: WrappedArray()
Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0))
...
```


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced the application’s validation capabilities for streaming and
evaluation, including configurable row thresholds and detailed result
comparisons.
- Introduced a new validation job that gathers comprehensive statistics
to ensure data consistency.
- **Bug Fixes**
- Resolved dependency compatibility issues to improve the stability of
streaming operations.
- **Tests**
- Added extensive unit and integration tests to verify and maintain
accurate validation behavior.
- **Chores**
- Improved interoperability between Java and Scala collections for
smoother integration.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
This PR adds a Flink validation job that we can use to read n rows from
Kafka using the standard serde rails the user has configured in their
topic info (e.g. avro using schema registry) and runs these rows through
catalyst util as well as a Spark DF based approach and compares the
results. This is something we can use for quick smoke testing of gaps /
missing logic in CU when we have a user group by that needs to be run in
streaming.

Some notes:
* As the Spark DF eval is super slow on a row by row basis (I tried this
in single task and distributed task mode), I've gone with invoking these
via bulk calls - the existing CU code runs very quickly so the bulk call
there is essentially n single calls.
* Currently we ask the user for the number of rows they wish to validate
against (with a default value) - an alternative here is to read n
seconds of Kafka data. Went with the n rows approach as I thought its
easier to reason about.
* Fixed some dep issues causing the Flink UI to not work ('commons-text'
was added in the exclusions list but seems to have been dropped as part
of the bazel refactoring we did late last week)

Tested on our clients. 
Scenario 1 - GroupBy with only explode (no transform)
```
$ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 10000
Total Mismatches: 0
Mismatches:
```

Scenario 2 - GroupBy with explode + transform (current our clients/zipline
main):
```
zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 0
Total Mismatches: 10000
Mismatches:

RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc
Is Match: false
Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0))
Spark DF Result: WrappedArray()
Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0))
...
```


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced the application’s validation capabilities for streaming and
evaluation, including configurable row thresholds and detailed result
comparisons.
- Introduced a new validation job that gathers comprehensive statistics
to ensure data consistency.
- **Bug Fixes**
- Resolved dependency compatibility issues to improve the stability of
streaming operations.
- **Tests**
- Added extensive unit and integration tests to verify and maintain
accurate validation behavior.
- **Chores**
- Improved interoperability between Java and Scala collections for
smoother integration.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
This PR adds a Flink validation job that we can use to read n rows from
Kafka using the standard serde rails the user has configured in their
topic info (e.g. avro using schema registry) and runs these rows through
catalyst util as well as a Spark DF based approach and compares the
results. This is something we can use for quiour clients smoke testing of gaps /
missing logic in CU when we have a user group by that needs to be run in
streaming.

Some notes:
* As the Spark DF eval is super slow on a row by row basis (I tried this
in single task and distributed task mode), I've gone with invoking these
via bulk calls - the existing CU code runs very quiour clientsly so the bulk call
there is essentially n single calls.
* Currently we ask the user for the number of rows they wish to validate
against (with a default value) - an alternative here is to read n
seconds of Kafka data. Went with the n rows approach as I thought its
easier to reason about.
* Fixed some dep issues causing the Flink UI to not work ('commons-text'
was added in the exclusions list but seems to have been dropped as part
of the bazel refactoring we did late last week)

Tested on our clients. 
Scenario 1 - GroupBy with only explode (no transform)
```
$ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 10000
Total Mismatches: 0
Mismatches:
```

Scenario 2 - GroupBy with explode + transform (current our clients/zipline
main):
```
zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000
...
**** Validation stats for search.beacons.listing.actions **** 

Total Records: 10000
Total Matches: 0
Total Mismatches: 10000
Mismatches:

RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc
Is Match: false
Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0))
Spark DF Result: WrappedArray()
Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0))
...
```


## Cheour clientslist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced the application’s validation capabilities for streaming and
evaluation, including configurable row thresholds and detailed result
comparisons.
- Introduced a new validation job that gathers comprehensive statistics
to ensure data consistency.
- **Bug Fixes**
- Resolved dependency compatibility issues to improve the stability of
streaming operations.
- **Tests**
- Added extensive unit and integration tests to verify and maintain
accurate validation behavior.
- **Chores**
- Improved interoperability between Java and Scala collections for
smoother integration.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants