Skip to content

Swap out AvroCodecOutput + KV store output types from case classes to PoJos #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 29, 2025

Conversation

piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Jan 29, 2025

Summary

We use case classes for many of our intermediate types in Flink and a few of these are persisted in Flink state. In the tiled setup the TimestampIR as an example. In untiled & tiled the Input and Output types of the AsyncKVWriter are persisted to state. In the future if we do need to update these intermediate types to include additional fields (like we're thinking of doing to support tiling) it will not be possible as Flink doesn't support state schema evolution for case classes. Due to that we'll need to do a time consuming migration where we spin up parallel operators with the new types, dual write and then cut over in a subsequent job. Instead we can try and set this up before we're in prod to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the tiled aggregations, AvroCodecOutput (as that feeds into KV store writer) and the KV store write response. In a subsequent PR we can update the TimestampTile to include the startTs of the tile (and plumb the latestTs through to the sink so we can track e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with some of our aggregator libs and other Scala code related to these classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of casting needed to interop. The Flink 2.0 migration will likely be a decent sized chunk given all our Flink code is in Scala and I think we can bite it off then)

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested - Confirmed that this works by kicking off our TestFlinkJob on the cluster
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced new data types (AvroCodecOutput, TimestampedTile, TimestampedIR, WriteResponse) for improved data handling in Flink jobs
    • Enhanced schema evolution support for stateful data processing
  • Refactor

    • Replaced PutRequest with AvroCodecOutput across multiple Flink processing components
    • Updated method signatures and data type handling in various Flink-related classes
    • Simplified timestamp retrieval and data structure interactions
  • Chores

    • Reorganized type imports and package structure
    • Updated test cases to align with new data type implementations

Copy link
Contributor

coderabbitai bot commented Jan 29, 2025

Walkthrough

The pull request introduces a comprehensive refactoring of data types in the Flink module, transitioning from PutRequest to AvroCodecOutput across multiple files. This change impacts how data is processed, serialized, and written in asynchronous operations, particularly in the AsyncKVStoreWriter, AvroCodecFn, and related classes. The modifications ensure a more consistent and flexible approach to handling data in Flink jobs, with a focus on improving type safety and reducing complexity.

Changes

File Change Summary
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala Replaced PutRequest with AvroCodecOutput in method signatures and processing logic
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala Updated conversion methods to return AvroCodecOutput instead of PutRequest
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala Modified input stream types from PutRequest to AvroCodecOutput
flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala Added new type definitions: TimestampedIR, TimestampedTile, AvroCodecOutput, WriteResponse

Sequence Diagram

sequenceDiagram
    participant Source
    participant AvroCodecFn
    participant AsyncKVStoreWriter
    participant Sink

    Source->>AvroCodecFn: Input data
    AvroCodecFn->>AsyncKVStoreWriter: AvroCodecOutput
    AsyncKVStoreWriter->>Sink: WriteResponse
Loading

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • tchow-zlai

Poem

🌊 Streams of data, flowing free
From PutRequest to AvroCodec's glee
Flink dances with types anew
Refactoring's magical hue
Code evolves, complexity bows 🚀

Warning

Review ran into problems

🔥 Problems

GitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)

85-92: Consider a more descriptive name.

DummyInitializationContext could be renamed to TestSerializationInitContext to better reflect its purpose.

flink/src/main/scala/ai/chronon/flink/MetricsSink.scala (1)

34-34: Latency calculation is concise; watch clock skew.

flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (2)

20-36: LGTM! Consider adding @BeanProperty annotation.

The class correctly implements deep equality and handles Option type.

+import scala.beans.BeanProperty
 class TimestampedIR(
+  @BeanProperty
   var ir: Array[Any],
+  @BeanProperty
   var latestTsMillis: Option[Long])

96-120: Consider extending AvroCodecOutput.

Avoid field duplication by inheriting from AvroCodecOutput.

-class WriteResponse(
-  var keyBytes: Array[Byte],
-  var valueBytes: Array[Byte],
-  var dataset: String,
-  var tsMillis: Long,
-  var status: Boolean)
+class WriteResponse(
+  keyBytes: Array[Byte],
+  valueBytes: Array[Byte],
+  dataset: String,
+  tsMillis: Long,
+  var status: Boolean)
+  extends AvroCodecOutput(keyBytes, valueBytes, dataset, tsMillis)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 38ea292 and 6bdc043.

📒 Files selected for processing (10)
  • flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (5 hunks)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (8 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4 hunks)
  • flink/src/main/scala/ai/chronon/flink/MetricsSink.scala (2 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (5 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/AsyncKVStoreWriterTest.scala (4 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (6 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (28)
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (3)

19-19: LGTM: Required imports added for new functionality.

Also applies to: 25-26, 28-28, 33-34


80-82: LGTM: Simplified timestamp access.


110-110: LGTM: Proper schema initialization added.

flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (8)

3-4: Imports look good.


28-29: Method signature update is consistent.


72-72: Switching to AvroCodecOutput is clear.


99-99: Timeout log is concise.


102-104: Graceful fallback for timeouts.


107-110: Constructing PutRequest from AvroCodecOutput is straightforward.


119-121: Completes futures correctly.


128-130: Logs failures and moves on.

flink/src/main/scala/ai/chronon/flink/MetricsSink.scala (1)

2-2: New import is fine.

flink/src/test/scala/ai/chronon/flink/test/AsyncKVStoreWriterTest.scala (7)

4-4: AvroCodecOutput import is correct.


24-24: Using AvroCodecOutput in source. Good.


26-26: Mapped range to AvroCodecOutput. OK.


39-39: Timestamp assertion is valid.


46-46: Reused AvroCodecOutput for poison tests. Fine.


48-48: Same range mapping. Looks good.


61-61: Verifies timestamps again.

flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (2)

47-65: LGTM! Well-structured PoJo implementation.

Proper deep equality checks and helpful toString implementation.


71-91: LGTM! Clean and focused implementation.

Correct handling of byte array equality in equals method.

flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (1)

8-9: LGTM! Correct transition to PoJo instantiation.

Clean migration from case class to PoJo syntax.

Also applies to: 54-54, 89-89, 107-107, 175-175

flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (1)

5-7: LGTM! Tests updated correctly for new types.

Proper adaptation to WriteResponse type and timestamp access.

Also applies to: 32-33, 43-44, 95-95, 134-134, 144-144, 148-148

flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)

8-9: LGTM! Clean transition to AvroCodecOutput.

Consistent type updates throughout the class hierarchy.

Also applies to: 92-92, 104-104, 116-116, 120-120, 132-132, 143-143, 155-155, 173-173

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4)

10-12: LGTM! New type imports align with PR objectives.

The imports support the transition from case classes to POJOs.


62-62: LGTM! Constructor parameter type updated.

Changed from PutRequest to AvroCodecOutput to support POJO-based state persistence.


Line range hint 225-230: Verify TiledAvroCodecFn compatibility.

Similar type change in the tiled implementation.

✅ Verification successful

TiledAvroCodecFn implementation is correct and compatible

The class properly extends BaseAvroCodecFn[TimestampedTile, AvroCodecOutput] and implements all required methods with correct type signatures.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if TiledAvroCodecFn handles the new output type correctly
ast-grep --pattern 'class TiledAvroCodecFn {
  $$$
  def apply($$$): $$$AvroCodecOutput$$$
  $$$
}'

Length of output: 101


Script:

#!/bin/bash
# Find TiledAvroCodecFn class and its usage
rg "TiledAvroCodecFn" -A 5
rg "AvroCodecOutput" -A 2

Length of output: 9495


Line range hint 121-126: Verify downstream impact of type change.

The flatMap operation's output type has changed from PutRequest to AvroCodecOutput.

✅ Verification successful

Type change is properly handled throughout the pipeline

The change from PutRequest to AvroCodecOutput is safe. AsyncKVStoreWriter correctly converts AvroCodecOutput to PutRequest before storage, maintaining data flow integrity.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if AvroCodecFn handles the new output type correctly
ast-grep --pattern 'class AvroCodecFn {
  $$$
  def apply($$$): $$$AvroCodecOutput$$$
  $$$
}'

Length of output: 96


Script:

#!/bin/bash
# Find AvroCodecFn definition and usages
rg -t scala "AvroCodecFn|AvroCodecOutput" -A 5

# Find where putRecordDS is used
rg -t scala "putRecordDS" -A 5

Length of output: 18431

@piyush-zlai piyush-zlai merged commit aa990c2 into main Jan 29, 2025
6 checks passed
@piyush-zlai piyush-zlai deleted the piyush/flink_pojos branch January 29, 2025 22:19
nikhil-zlai pushed a commit that referenced this pull request Feb 4, 2025
… PoJos (#294)

## Summary
We use case classes for many of our intermediate types in Flink and a
few of these are persisted in Flink state. In the tiled setup the
TimestampIR as an example. In untiled & tiled the Input and Output types
of the AsyncKVWriter are persisted to state. In the future if we do need
to update these intermediate types to include additional fields (like
we're thinking of doing to support tiling) it will not be possible as
Flink doesn't support [state schema evolution for case
classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/).
Due to that we'll need to do a time consuming migration where we spin up
parallel operators with the new types, dual write and then cut over in a
subsequent job. Instead we can try and set this up before we're in prod
to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the
tiled aggregations, AvroCodecOutput (as that feeds into KV store writer)
and the KV store write response. In a subsequent PR we can update the
TimestampTile to include the startTs of the tile (and plumb the latestTs
through to the sink so we can track e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with
some of our aggregator libs and other Scala code related to these
classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of
casting needed to interop. The Flink 2.0 migration will likely be a
decent sized chunk given all our Flink code is in Scala and I think we
can bite it off then)

## Checklist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested - Confirmed that this works by kicking off our
TestFlinkJob on the cluster
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced new data types (`AvroCodecOutput`, `TimestampedTile`,
`TimestampedIR`, `WriteResponse`) for improved data handling in Flink
jobs
	- Enhanced schema evolution support for stateful data processing

- **Refactor**
- Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink
processing components
- Updated method signatures and data type handling in various
Flink-related classes
	- Simplified timestamp retrieval and data structure interactions

- **Chores**
	- Reorganized type imports and package structure
	- Updated test cases to align with new data type implementations

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@coderabbitai coderabbitai bot mentioned this pull request Feb 13, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
… PoJos (#294)

## Summary
We use case classes for many of our intermediate types in Flink and a
few of these are persisted in Flink state. In the tiled setup the
TimestampIR as an example. In untiled & tiled the Input and Output types
of the AsyncKVWriter are persisted to state. In the future if we do need
to update these intermediate types to include additional fields (like
we're thinking of doing to support tiling) it will not be possible as
Flink doesn't support [state schema evolution for case
classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/).
Due to that we'll need to do a time consuming migration where we spin up
parallel operators with the new types, dual write and then cut over in a
subsequent job. Instead we can try and set this up before we're in prod
to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the
tiled aggregations, AvroCodecOutput (as that feeds into KV store writer)
and the KV store write response. In a subsequent PR we can update the
TimestampTile to include the startTs of the tile (and plumb the latestTs
through to the sink so we can track e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with
some of our aggregator libs and other Scala code related to these
classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of
casting needed to interop. The Flink 2.0 migration will likely be a
decent sized chunk given all our Flink code is in Scala and I think we
can bite it off then)

## Checklist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested - Confirmed that this works by kicking off our
TestFlinkJob on the cluster
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced new data types (`AvroCodecOutput`, `TimestampedTile`,
`TimestampedIR`, `WriteResponse`) for improved data handling in Flink
jobs
	- Enhanced schema evolution support for stateful data processing

- **Refactor**
- Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink
processing components
- Updated method signatures and data type handling in various
Flink-related classes
	- Simplified timestamp retrieval and data structure interactions

- **Chores**
	- Reorganized type imports and package structure
	- Updated test cases to align with new data type implementations

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
… PoJos (#294)

## Summary
We use case classes for many of our intermediate types in Flink and a
few of these are persisted in Flink state. In the tiled setup the
TimestampIR as an example. In untiled & tiled the Input and Output types
of the AsyncKVWriter are persisted to state. In the future if we do need
to update these intermediate types to include additional fields (like
we're thinking of doing to support tiling) it will not be possible as
Flink doesn't support [state schema evolution for case
classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/).
Due to that we'll need to do a time consuming migration where we spin up
parallel operators with the new types, dual write and then cut over in a
subsequent job. Instead we can try and set this up before we're in prod
to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the
tiled aggregations, AvroCodecOutput (as that feeds into KV store writer)
and the KV store write response. In a subsequent PR we can update the
TimestampTile to include the startTs of the tile (and plumb the latestTs
through to the sink so we can track e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with
some of our aggregator libs and other Scala code related to these
classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of
casting needed to interop. The Flink 2.0 migration will likely be a
decent sized chunk given all our Flink code is in Scala and I think we
can bite it off then)

## Checklist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested - Confirmed that this works by kicking off our
TestFlinkJob on the cluster
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced new data types (`AvroCodecOutput`, `TimestampedTile`,
`TimestampedIR`, `WriteResponse`) for improved data handling in Flink
jobs
	- Enhanced schema evolution support for stateful data processing

- **Refactor**
- Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink
processing components
- Updated method signatures and data type handling in various
Flink-related classes
	- Simplified timestamp retrieval and data structure interactions

- **Chores**
	- Reorganized type imports and package structure
	- Updated test cases to align with new data type implementations

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
… PoJos (#294)

## Summary
We use case classes for many of our intermediate types in Flink and a
few of these are persisted in Flink state. In the tiled setup the
TimestampIR as an example. In untiled & tiled the Input and Output types
of the AsyncKVWriter are persisted to state. In the future if we do need
to update these intermediate types to include additional fields (like
we're thinking of doing to support tiling) it will not be possible as
Flink doesn't support [state schema evolution for case
classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/).
Due to that we'll need to do a time consuming migration where we spin up
parallel operators with the new types, dual write and then cut over in a
subsequent job. Instead we can try and set this up before we're in prod
to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the
tiled aggregations, AvroCodecOutput (as that feeds into KV store writer)
and the KV store write response. In a subsequent PR we can update the
TimestampTile to include the startTs of the tile (and plumb the latestTs
through to the sink so we can track e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with
some of our aggregator libs and other Scala code related to these
classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of
casting needed to interop. The Flink 2.0 migration will likely be a
decent sized chunk given all our Flink code is in Scala and I think we
can bite it off then)

## Checklist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested - Confirmed that this works by kicking off our
TestFlinkJob on the cluster
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced new data types (`AvroCodecOutput`, `TimestampedTile`,
`TimestampedIR`, `WriteResponse`) for improved data handling in Flink
jobs
	- Enhanced schema evolution support for stateful data processing

- **Refactor**
- Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink
processing components
- Updated method signatures and data type handling in various
Flink-related classes
	- Simplified timestamp retrieval and data structure interactions

- **Chores**
	- Reorganized type imports and package structure
	- Updated test cases to align with new data type implementations

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
… PoJos (#294)

## Summary
We use case classes for many of our intermediate types in Flink and a
few of these are persisted in Flink state. In the tiled setup the
TimestampIR as an example. In untiled & tiled the Input and Output types
of the AsyncKVWriter are persisted to state. In the future if we do need
to update these intermediate types to include additional fields (like
we're thinking of doing to support tiling) it will not be possible as
Flink doesn't support [state schema evolution for case
classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/).
Due to that we'll need to do a time consuming migration where we spin up
parallel operators with the new types, dual write and then cut over in a
subsequent job. Instead we can try and set this up before we're in prod
to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the
tiled aggregations, AvroCodecOutput (as that feeds into KV store writer)
and the KV store write response. In a subsequent PR we can update the
TimestampTile to include the startTs of the tile (and plumb the latestTs
through to the sink so we can track e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with
some of our aggregator libs and other Scala code related to these
classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of
casting needed to interop. The Flink 2.0 migration will likely be a
decent sized chunk given all our Flink code is in Scala and I think we
can bite it off then)

## Checklist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested - Confirmed that this works by kicking off our
TestFlinkJob on the cluster
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced new data types (`AvroCodecOutput`, `TimestampedTile`,
`TimestampedIR`, `WriteResponse`) for improved data handling in Flink
jobs
	- Enhanced schema evolution support for stateful data processing

- **Refactor**
- Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink
processing components
- Updated method signatures and data type handling in various
Flink-related classes
	- Simplified timestamp retrieval and data structure interactions

- **Chores**
	- Reorganized type imports and package structure
	- Updated test cases to align with new data type implementations

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
… PoJos (#294)

## Summary
We use case classes for many of our intermediate types in Flink and a
few of these are persisted in Flink state. In the tiled setup the
TimestampIR as an example. In untiled & tiled the Input and Output types
of the AsyncKVWriter are persisted to state. In the future if we do need
to update these intermediate types to include additional fields (like
we're thinking of doing to support tiling) it will not be possible as
Flink doesn't support [state schema evolution for case
classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/).
Due to that we'll need to do a time consuming migration where we spin up
parallel operators with the new types, dual write and then cut over in a
subsequent job. Instead we can try and set this up before we're in prod
to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the
tiled aggregations, AvroCodecOutput (as that feeds into KV store writer)
and the KV store write response. In a subsequent PR we can update the
TimestampTile to include the startTs of the tile (and plumb the latestTs
through to the sink so we can traour clients e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with
some of our aggregator libs and other Scala code related to these
classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of
casting needed to interop. The Flink 2.0 migration will likely be a
decent sized chunk given all our Flink code is in Scala and I think we
can bite it off then)

## Cheour clientslist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested - Confirmed that this works by kiour clientsing off our
TestFlinkJob on the cluster
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced new data types (`AvroCodecOutput`, `TimestampedTile`,
`TimestampedIR`, `WriteResponse`) for improved data handling in Flink
jobs
	- Enhanced schema evolution support for stateful data processing

- **Refactor**
- Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink
processing components
- Updated method signatures and data type handling in various
Flink-related classes
	- Simplified timestamp retrieval and data structure interactions

- **Chores**
	- Reorganized type imports and paour clientsage structure
	- Updated test cases to align with new data type implementations

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants