-
Notifications
You must be signed in to change notification settings - Fork 0
Add Flink validation job + expose verb in streaming #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # flink/BUILD.bazel
WalkthroughThis pull request adds validation support across multiple components. In Python, new validation parameters are introduced for the Runner and GcpRunner classes along with an updated CLI default. In Scala, additional methods and case classes enhance evaluation and comparison between Catalyst and Spark DataFrame results, and a new Flink validation job is implemented with associated tests. Other improvements include refined exception formatting, visibility updates, dependency adjustments in build files, and new implicit Java‑Scala conversion utilities. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant VFJ as ValidationFlinkJob
participant DS as DataStream (FlinkSource)
participant SE as SparkExpressionEvalFn
participant CF as Comparison Function
App->>VFJ: run(metadataStore, kafkaBootstrap, groupByName, validateRows)
VFJ->>DS: Retrieve event stream
DS-->>VFJ: Stream events (EventRecord)
VFJ->>SE: Evaluate events using runSparkSQLBulk & runCatalystBulk
SE-->>VFJ: Return evaluation results
VFJ->>CF: Compare results via compareResultRows
CF-->>VFJ: Return ComparisonResult
VFJ->>App: Return ValidationStats
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
127-185
:⚠️ Potential issueDuplicate methods should be removed.
Methods on lines 127-185 appear to be duplicates of the newly implemented methods above.
Remove these duplicate method implementations that are redundant with the new ones.
🧹 Nitpick comments (9)
api/py/ai/chronon/repo/run.py (2)
164-165
: Fix help text formatting.There's an extra space in the help text.
- help="Number of rows to run the validation on") + help="Number of rows to run the validation on")
162-165
: Consider increasing the default validation row count.The default value of 1000 rows may be too small for effective validation of complex transformations.
-@click.option("--validate-rows", default="1000", +@click.option("--validate-rows", default="10000",This aligns with PR objectives stating that 10,000 rows were used in testing.
flink/src/main/scala/ai/chronon/flink/validation/SparkExprEvalComparisonFn.scala (3)
26-34
: Doc comment is concise and clear.
Small tip: For large results, note potential complexity of repeated sorting in this doc.
50-51
: Sorting bytoString
may be expensive.
Consider a custom comparator or more optimal data structure for large results.
62-67
: Reflection-based equality checks can be risky.
For complex or nested fields, consider a more deterministic custom comparator for reliability.flink/src/main/scala/ai/chronon/flink/ValidateFlinkJob.scala (4)
113-113
: Refine match logic.
Comparing only sizes may miss deeper mismatches.
116-116
: Populatedifferences
.
This field is always empty. Consider tracking actual mismatches.
125-131
: Be cautious withcountWindowAll
.
Large window sizes could accumulate too many records.
182-182
: Avoidprint()
in production.
Use a logging sink instead.
🛑 Comments failed to post (2)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (1)
142-142:
⚠️ Potential issueCorrect an implementation inconsistency.
There appears to be duplicate code for creating KafkaFlinkSource. Lines 140-145 and lines 151-157 contain similar code blocks, with line 142 adding a duration parameter while the second block doesn't.
- new KafkaFlinkSource(kafkaBootstrap, deserializationSchema, topicInfo, Some(Duration.apply(1, TimeUnit.SECONDS)))
Remove this entire first block (lines 140-145) as it appears to be an incomplete merge.
Committable suggestion skipped: line range outside the PR's diff.
flink/src/main/scala/ai/chronon/flink/ValidateFlinkJob.scala (1)
29-46: 🛠️ Refactor suggestion
Watch for unbounded memory.
Using a global window without triggers can grow unbounded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
40-44
: Add null check for consistency.Missing null check unlike other conversion methods in this file.
implicit class IterableOps[T](iterable: java.lang.Iterable[T]) { def toScala: Iterable[T] = { - iterable.asScala + if (iterable == null) { + null + } else { + iterable.asScala + } } }
45-49
: Add null check for consistency.Same issue here - missing null check for consistency with other converters.
implicit class JIterableOps[T](iterable: Iterable[T]) { def toJava: java.lang.Iterable[T] = { - iterable.asJava + if (iterable == null) { + null + } else { + iterable.asJava + } } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/validation/SparkExprEvalComparisonFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala
(1 hunks)flink/src/test/scala/ai/chronon/flink/validation/SparkExprEvalComparisonTest.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: bazel_config_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: python_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (24)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (1)
40-49
: Good addition supporting Java-Scala Iterable conversions.These converters complete the set of collection conversion utilities, aligning with PR's validation job needs.
flink/src/test/scala/ai/chronon/flink/validation/SparkExprEvalComparisonTest.scala (11)
8-14
: Well tested scenario
Covers empty rows.
16-22
: Well tested scenario
Checks identical row matches.
24-30
: Well tested scenario
Verifies ordering doesn't matter.
32-50
: Well tested scenario
Tests complex types thoroughly.
52-60
: Well tested scenario
Handles nested structures.
62-69
: Well tested scenario
Flags row count mismatch.
71-78
: Well tested scenario
Confirms value mismatch detection.
80-87
: Well tested scenario
Ensures key mismatch is caught.
89-96
: Well tested scenario
Detects extra fields.
98-120
: Well tested scenario
Verifies complex mismatches.
122-133
: Well tested scenario
Checks nested complex mismatches.flink/src/main/scala/ai/chronon/flink/validation/SparkExprEvalComparisonFn.scala (5)
8-22
: Neat structure
Captures comparison details comprehensively.
24-34
: Clear doc
Explains the comparison logic well.
35-46
: Concise mismatch detection
Early return clarifies size discrepancies.
48-88
: Robust approach
Sorting ensures order-agnostic comparison.
91-105
: Thoughtful equality check
Handles nested maps carefully.flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (3)
19-27
: Helpful imports
Needed for new row logic.
123-174
: Good bulk approach
Efficiently collects Spark SQL results.
176-186
: Consistent with Catalyst
Parallel method ensures matching eval.flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (4)
23-37
: Simple case classes
Clearly define event & stats.
39-71
: Focused comparison function
Window-based approach for aggregated validation.
73-120
: Well-organized job
Generates stats for requested row batches.
122-181
: Comprehensive entry point
Executes validation and logs results.
…ush/flink_validation_job
|
||
// Helper method for deep equality - primarily used to special case types like Maps that don't match correctly | ||
// in EqualsBuilder.reflectionEquals across scala versions 2.12 and 2.13. | ||
private def deepEquals(a: Any, b: Any): Boolean = (a, b) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
neat!
|
||
val api = buildApi(onlineClassName, props) | ||
val metadataStore = new MetadataStore(FetchContext(api.genKvStore, MetadataDataset)) | ||
|
||
if (validateMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would pull this out as a function.
val sortedCatalystResult = catalystResult.map(m => SortedMap[String, Any]() ++ m).sortBy(_.toString) | ||
val sortedSparkDfResult = sparkDfResult.map(m => SortedMap[String, Any]() ++ m).sortBy(_.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we make dataframes out of these - we have a nice Compare.sideBySide
method that prints only the differing rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will be able to kill the rest of the code under.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice PR! can you use the Compare.SideBySide function that we use in tests? will require the objects to be dataframes, but i think it is much simpler to do it that way.
(Dropping notes from our discussion on slack - we'll explore migrating to Comparison.sideBySide in a follow up as it requires a bit of refactoring of the existing Spark code + reworking things here) |
## Summary This PR adds a Flink validation job that we can use to read n rows from Kafka using the standard serde rails the user has configured in their topic info (e.g. avro using schema registry) and runs these rows through catalyst util as well as a Spark DF based approach and compares the results. This is something we can use for quick smoke testing of gaps / missing logic in CU when we have a user group by that needs to be run in streaming. Some notes: * As the Spark DF eval is super slow on a row by row basis (I tried this in single task and distributed task mode), I've gone with invoking these via bulk calls - the existing CU code runs very quickly so the bulk call there is essentially n single calls. * Currently we ask the user for the number of rows they wish to validate against (with a default value) - an alternative here is to read n seconds of Kafka data. Went with the n rows approach as I thought its easier to reason about. * Fixed some dep issues causing the Flink UI to not work ('commons-text' was added in the exclusions list but seems to have been dropped as part of the bazel refactoring we did late last week) Tested on Etsy. Scenario 1 - GroupBy with only explode (no transform) ``` $ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6 **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 10000 Total Mismatches: 0 Mismatches: ``` Scenario 2 - GroupBy with explode + transform (current etsy/zipline main): ``` zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 0 Total Mismatches: 10000 Mismatches: RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc Is Match: false Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0)) Spark DF Result: WrappedArray() Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0)) ... ``` ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the application’s validation capabilities for streaming and evaluation, including configurable row thresholds and detailed result comparisons. - Introduced a new validation job that gathers comprehensive statistics to ensure data consistency. - **Bug Fixes** - Resolved dependency compatibility issues to improve the stability of streaming operations. - **Tests** - Added extensive unit and integration tests to verify and maintain accurate validation behavior. - **Chores** - Improved interoperability between Java and Scala collections for smoother integration. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds a Flink validation job that we can use to read n rows from Kafka using the standard serde rails the user has configured in their topic info (e.g. avro using schema registry) and runs these rows through catalyst util as well as a Spark DF based approach and compares the results. This is something we can use for quick smoke testing of gaps / missing logic in CU when we have a user group by that needs to be run in streaming. Some notes: * As the Spark DF eval is super slow on a row by row basis (I tried this in single task and distributed task mode), I've gone with invoking these via bulk calls - the existing CU code runs very quickly so the bulk call there is essentially n single calls. * Currently we ask the user for the number of rows they wish to validate against (with a default value) - an alternative here is to read n seconds of Kafka data. Went with the n rows approach as I thought its easier to reason about. * Fixed some dep issues causing the Flink UI to not work ('commons-text' was added in the exclusions list but seems to have been dropped as part of the bazel refactoring we did late last week) Tested on Etsy. Scenario 1 - GroupBy with only explode (no transform) ``` $ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6 **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 10000 Total Mismatches: 0 Mismatches: ``` Scenario 2 - GroupBy with explode + transform (current etsy/zipline main): ``` zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.etsycloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 0 Total Mismatches: 10000 Mismatches: RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc Is Match: false Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0)) Spark DF Result: WrappedArray() Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0)) ... ``` ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the application’s validation capabilities for streaming and evaluation, including configurable row thresholds and detailed result comparisons. - Introduced a new validation job that gathers comprehensive statistics to ensure data consistency. - **Bug Fixes** - Resolved dependency compatibility issues to improve the stability of streaming operations. - **Tests** - Added extensive unit and integration tests to verify and maintain accurate validation behavior. - **Chores** - Improved interoperability between Java and Scala collections for smoother integration. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds a Flink validation job that we can use to read n rows from Kafka using the standard serde rails the user has configured in their topic info (e.g. avro using schema registry) and runs these rows through catalyst util as well as a Spark DF based approach and compares the results. This is something we can use for quick smoke testing of gaps / missing logic in CU when we have a user group by that needs to be run in streaming. Some notes: * As the Spark DF eval is super slow on a row by row basis (I tried this in single task and distributed task mode), I've gone with invoking these via bulk calls - the existing CU code runs very quickly so the bulk call there is essentially n single calls. * Currently we ask the user for the number of rows they wish to validate against (with a default value) - an alternative here is to read n seconds of Kafka data. Went with the n rows approach as I thought its easier to reason about. * Fixed some dep issues causing the Flink UI to not work ('commons-text' was added in the exclusions list but seems to have been dropped as part of the bazel refactoring we did late last week) Tested on our clients. Scenario 1 - GroupBy with only explode (no transform) ``` $ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6 **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 10000 Total Mismatches: 0 Mismatches: ``` Scenario 2 - GroupBy with explode + transform (current our clients/zipline main): ``` zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 0 Total Mismatches: 10000 Mismatches: RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc Is Match: false Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0)) Spark DF Result: WrappedArray() Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0)) ... ``` ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the application’s validation capabilities for streaming and evaluation, including configurable row thresholds and detailed result comparisons. - Introduced a new validation job that gathers comprehensive statistics to ensure data consistency. - **Bug Fixes** - Resolved dependency compatibility issues to improve the stability of streaming operations. - **Tests** - Added extensive unit and integration tests to verify and maintain accurate validation behavior. - **Chores** - Improved interoperability between Java and Scala collections for smoother integration. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds a Flink validation job that we can use to read n rows from Kafka using the standard serde rails the user has configured in their topic info (e.g. avro using schema registry) and runs these rows through catalyst util as well as a Spark DF based approach and compares the results. This is something we can use for quick smoke testing of gaps / missing logic in CU when we have a user group by that needs to be run in streaming. Some notes: * As the Spark DF eval is super slow on a row by row basis (I tried this in single task and distributed task mode), I've gone with invoking these via bulk calls - the existing CU code runs very quickly so the bulk call there is essentially n single calls. * Currently we ask the user for the number of rows they wish to validate against (with a default value) - an alternative here is to read n seconds of Kafka data. Went with the n rows approach as I thought its easier to reason about. * Fixed some dep issues causing the Flink UI to not work ('commons-text' was added in the exclusions list but seems to have been dropped as part of the bazel refactoring we did late last week) Tested on our clients. Scenario 1 - GroupBy with only explode (no transform) ``` $ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6 **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 10000 Total Mismatches: 0 Mismatches: ``` Scenario 2 - GroupBy with explode + transform (current our clients/zipline main): ``` zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 0 Total Mismatches: 10000 Mismatches: RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc Is Match: false Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0)) Spark DF Result: WrappedArray() Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0)) ... ``` ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the application’s validation capabilities for streaming and evaluation, including configurable row thresholds and detailed result comparisons. - Introduced a new validation job that gathers comprehensive statistics to ensure data consistency. - **Bug Fixes** - Resolved dependency compatibility issues to improve the stability of streaming operations. - **Tests** - Added extensive unit and integration tests to verify and maintain accurate validation behavior. - **Chores** - Improved interoperability between Java and Scala collections for smoother integration. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds a Flink validation job that we can use to read n rows from Kafka using the standard serde rails the user has configured in their topic info (e.g. avro using schema registry) and runs these rows through catalyst util as well as a Spark DF based approach and compares the results. This is something we can use for quiour clients smoke testing of gaps / missing logic in CU when we have a user group by that needs to be run in streaming. Some notes: * As the Spark DF eval is super slow on a row by row basis (I tried this in single task and distributed task mode), I've gone with invoking these via bulk calls - the existing CU code runs very quiour clientsly so the bulk call there is essentially n single calls. * Currently we ask the user for the number of rows they wish to validate against (with a default value) - an alternative here is to read n seconds of Kafka data. Went with the n rows approach as I thought its easier to reason about. * Fixed some dep issues causing the Flink UI to not work ('commons-text' was added in the exclusions list but seems to have been dropped as part of the bazel refactoring we did late last week) Tested on our clients. Scenario 1 - GroupBy with only explode (no transform) ``` $ zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... Job has been submitted with JobID af8f0a4c6a095f0cdef6f7aeaa25a5b6 **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 10000 Total Mismatches: 0 Mismatches: ``` Scenario 2 - GroupBy with explode + transform (current our clients/zipline main): ``` zipline run --mode streaming --kafka-bootstrap=kafka.kafka-gke-dev.our clientscloud.com:9092 --groupby-name search.beacons.listing.actions --validate --validate-rows 10000 ... **** Validation stats for search.beacons.listing.actions **** Total Records: 10000 Total Matches: 0 Total Mismatches: 10000 Mismatches: RecordId: 586f204d-b4fb-4f0c-9b69-b44c4d4dc8fc Is Match: false Catalyst Result: ArrayBuffer(Map(favorite -> 0, listing_id -> 962072674332, ts -> 1741704037533, add_cart -> 0, purchase -> 0, view -> 0)) Spark DF Result: WrappedArray() Differences (diff_type -> (catalystValue, sparkDfValue) ) : Map(result_count -> (1,0)) ... ``` ## Cheour clientslist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the application’s validation capabilities for streaming and evaluation, including configurable row thresholds and detailed result comparisons. - Introduced a new validation job that gathers comprehensive statistics to ensure data consistency. - **Bug Fixes** - Resolved dependency compatibility issues to improve the stability of streaming operations. - **Tests** - Added extensive unit and integration tests to verify and maintain accurate validation behavior. - **Chores** - Improved interoperability between Java and Scala collections for smoother integration. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
This PR adds a Flink validation job that we can use to read n rows from Kafka using the standard serde rails the user has configured in their topic info (e.g. avro using schema registry) and runs these rows through catalyst util as well as a Spark DF based approach and compares the results. This is something we can use for quick smoke testing of gaps / missing logic in CU when we have a user group by that needs to be run in streaming.
Some notes:
Tested on Etsy.
Scenario 1 - GroupBy with only explode (no transform)
Scenario 2 - GroupBy with explode + transform (current etsy/zipline main):
Checklist
Summary by CodeRabbit