-
Notifications
You must be signed in to change notification settings - Fork 0
Swap out AvroCodecOutput + KV store output types from case classes to PoJos #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces a comprehensive refactoring of data types in the Flink module, transitioning from Changes
Sequence DiagramsequenceDiagram
participant Source
participant AvroCodecFn
participant AsyncKVStoreWriter
participant Sink
Source->>AvroCodecFn: Input data
AvroCodecFn->>AsyncKVStoreWriter: AvroCodecOutput
AsyncKVStoreWriter->>Sink: WriteResponse
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)
85-92
: Consider a more descriptive name.
DummyInitializationContext
could be renamed toTestSerializationInitContext
to better reflect its purpose.flink/src/main/scala/ai/chronon/flink/MetricsSink.scala (1)
34-34
: Latency calculation is concise; watch clock skew.flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (2)
20-36
: LGTM! Consider adding @BeanProperty annotation.The class correctly implements deep equality and handles Option type.
+import scala.beans.BeanProperty class TimestampedIR( + @BeanProperty var ir: Array[Any], + @BeanProperty var latestTsMillis: Option[Long])
96-120
: Consider extending AvroCodecOutput.Avoid field duplication by inheriting from AvroCodecOutput.
-class WriteResponse( - var keyBytes: Array[Byte], - var valueBytes: Array[Byte], - var dataset: String, - var tsMillis: Long, - var status: Boolean) +class WriteResponse( + keyBytes: Array[Byte], + valueBytes: Array[Byte], + dataset: String, + tsMillis: Long, + var status: Boolean) + extends AvroCodecOutput(keyBytes, valueBytes, dataset, tsMillis)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (10)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
(5 hunks)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
(8 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(4 hunks)flink/src/main/scala/ai/chronon/flink/MetricsSink.scala
(2 hunks)flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala
(5 hunks)flink/src/test/scala/ai/chronon/flink/test/AsyncKVStoreWriterTest.scala
(4 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala
(6 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (28)
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (3)
19-19
: LGTM: Required imports added for new functionality.Also applies to: 25-26, 28-28, 33-34
80-82
: LGTM: Simplified timestamp access.
110-110
: LGTM: Proper schema initialization added.flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (8)
3-4
: Imports look good.
28-29
: Method signature update is consistent.
72-72
: Switching to AvroCodecOutput is clear.
99-99
: Timeout log is concise.
102-104
: Graceful fallback for timeouts.
107-110
: Constructing PutRequest from AvroCodecOutput is straightforward.
119-121
: Completes futures correctly.
128-130
: Logs failures and moves on.flink/src/main/scala/ai/chronon/flink/MetricsSink.scala (1)
2-2
: New import is fine.flink/src/test/scala/ai/chronon/flink/test/AsyncKVStoreWriterTest.scala (7)
4-4
: AvroCodecOutput import is correct.
24-24
: Using AvroCodecOutput in source. Good.
26-26
: Mapped range to AvroCodecOutput. OK.
39-39
: Timestamp assertion is valid.
46-46
: Reused AvroCodecOutput for poison tests. Fine.
48-48
: Same range mapping. Looks good.
61-61
: Verifies timestamps again.flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (2)
47-65
: LGTM! Well-structured PoJo implementation.Proper deep equality checks and helpful toString implementation.
71-91
: LGTM! Clean and focused implementation.Correct handling of byte array equality in equals method.
flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (1)
8-9
: LGTM! Correct transition to PoJo instantiation.Clean migration from case class to PoJo syntax.
Also applies to: 54-54, 89-89, 107-107, 175-175
flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (1)
5-7
: LGTM! Tests updated correctly for new types.Proper adaptation to WriteResponse type and timestamp access.
Also applies to: 32-33, 43-44, 95-95, 134-134, 144-144, 148-148
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)
8-9
: LGTM! Clean transition to AvroCodecOutput.Consistent type updates throughout the class hierarchy.
Also applies to: 92-92, 104-104, 116-116, 120-120, 132-132, 143-143, 155-155, 173-173
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4)
10-12
: LGTM! New type imports align with PR objectives.The imports support the transition from case classes to POJOs.
62-62
: LGTM! Constructor parameter type updated.Changed from
PutRequest
toAvroCodecOutput
to support POJO-based state persistence.
Line range hint
225-230
: Verify TiledAvroCodecFn compatibility.Similar type change in the tiled implementation.
✅ Verification successful
TiledAvroCodecFn implementation is correct and compatible
The class properly extends
BaseAvroCodecFn[TimestampedTile, AvroCodecOutput]
and implements all required methods with correct type signatures.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if TiledAvroCodecFn handles the new output type correctly ast-grep --pattern 'class TiledAvroCodecFn { $$$ def apply($$$): $$$AvroCodecOutput$$$ $$$ }'Length of output: 101
Script:
#!/bin/bash # Find TiledAvroCodecFn class and its usage rg "TiledAvroCodecFn" -A 5 rg "AvroCodecOutput" -A 2Length of output: 9495
Line range hint
121-126
: Verify downstream impact of type change.The flatMap operation's output type has changed from
PutRequest
toAvroCodecOutput
.✅ Verification successful
Type change is properly handled throughout the pipeline
The change from
PutRequest
toAvroCodecOutput
is safe.AsyncKVStoreWriter
correctly convertsAvroCodecOutput
toPutRequest
before storage, maintaining data flow integrity.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if AvroCodecFn handles the new output type correctly ast-grep --pattern 'class AvroCodecFn { $$$ def apply($$$): $$$AvroCodecOutput$$$ $$$ }'Length of output: 96
Script:
#!/bin/bash # Find AvroCodecFn definition and usages rg -t scala "AvroCodecFn|AvroCodecOutput" -A 5 # Find where putRecordDS is used rg -t scala "putRecordDS" -A 5Length of output: 18431
… PoJos (#294) ## Summary We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support [state schema evolution for case classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/). Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line. This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can track e2e lag). (Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then) ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - Confirmed that this works by kicking off our TestFlinkJob on the cluster - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new data types (`AvroCodecOutput`, `TimestampedTile`, `TimestampedIR`, `WriteResponse`) for improved data handling in Flink jobs - Enhanced schema evolution support for stateful data processing - **Refactor** - Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink processing components - Updated method signatures and data type handling in various Flink-related classes - Simplified timestamp retrieval and data structure interactions - **Chores** - Reorganized type imports and package structure - Updated test cases to align with new data type implementations <!-- end of auto-generated comment: release notes by coderabbit.ai -->
… PoJos (#294) ## Summary We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support [state schema evolution for case classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/). Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line. This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can track e2e lag). (Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then) ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - Confirmed that this works by kicking off our TestFlinkJob on the cluster - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new data types (`AvroCodecOutput`, `TimestampedTile`, `TimestampedIR`, `WriteResponse`) for improved data handling in Flink jobs - Enhanced schema evolution support for stateful data processing - **Refactor** - Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink processing components - Updated method signatures and data type handling in various Flink-related classes - Simplified timestamp retrieval and data structure interactions - **Chores** - Reorganized type imports and package structure - Updated test cases to align with new data type implementations <!-- end of auto-generated comment: release notes by coderabbit.ai -->
… PoJos (#294) ## Summary We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support [state schema evolution for case classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/). Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line. This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can track e2e lag). (Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then) ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - Confirmed that this works by kicking off our TestFlinkJob on the cluster - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new data types (`AvroCodecOutput`, `TimestampedTile`, `TimestampedIR`, `WriteResponse`) for improved data handling in Flink jobs - Enhanced schema evolution support for stateful data processing - **Refactor** - Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink processing components - Updated method signatures and data type handling in various Flink-related classes - Simplified timestamp retrieval and data structure interactions - **Chores** - Reorganized type imports and package structure - Updated test cases to align with new data type implementations <!-- end of auto-generated comment: release notes by coderabbit.ai -->
… PoJos (#294) ## Summary We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support [state schema evolution for case classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/). Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line. This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can track e2e lag). (Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then) ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - Confirmed that this works by kicking off our TestFlinkJob on the cluster - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new data types (`AvroCodecOutput`, `TimestampedTile`, `TimestampedIR`, `WriteResponse`) for improved data handling in Flink jobs - Enhanced schema evolution support for stateful data processing - **Refactor** - Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink processing components - Updated method signatures and data type handling in various Flink-related classes - Simplified timestamp retrieval and data structure interactions - **Chores** - Reorganized type imports and package structure - Updated test cases to align with new data type implementations <!-- end of auto-generated comment: release notes by coderabbit.ai -->
… PoJos (#294) ## Summary We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support [state schema evolution for case classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/). Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line. This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can track e2e lag). (Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then) ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - Confirmed that this works by kicking off our TestFlinkJob on the cluster - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new data types (`AvroCodecOutput`, `TimestampedTile`, `TimestampedIR`, `WriteResponse`) for improved data handling in Flink jobs - Enhanced schema evolution support for stateful data processing - **Refactor** - Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink processing components - Updated method signatures and data type handling in various Flink-related classes - Simplified timestamp retrieval and data structure interactions - **Chores** - Reorganized type imports and package structure - Updated test cases to align with new data type implementations <!-- end of auto-generated comment: release notes by coderabbit.ai -->
… PoJos (#294) ## Summary We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support [state schema evolution for case classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/). Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line. This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can traour clients e2e lag). (Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then) ## Cheour clientslist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - Confirmed that this works by kiour clientsing off our TestFlinkJob on the cluster - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new data types (`AvroCodecOutput`, `TimestampedTile`, `TimestampedIR`, `WriteResponse`) for improved data handling in Flink jobs - Enhanced schema evolution support for stateful data processing - **Refactor** - Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink processing components - Updated method signatures and data type handling in various Flink-related classes - Simplified timestamp retrieval and data structure interactions - **Chores** - Reorganized type imports and paour clientsage structure - Updated test cases to align with new data type implementations <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support state schema evolution for case classes. Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line.
This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can track e2e lag).
(Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then)
Checklist
Summary by CodeRabbit
New Features
AvroCodecOutput
,TimestampedTile
,TimestampedIR
,WriteResponse
) for improved data handling in Flink jobsRefactor
PutRequest
withAvroCodecOutput
across multiple Flink processing componentsChores