Skip to content

Add a Flink canary app that can be run on demand #762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 13, 2025

Conversation

piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented May 12, 2025

Summary

Builds on top of PR: #751.

This PR adds a streaming GroupBy that can be run as a canary to sanity check and test things out while making Flink changes. I used this to sanity check the creation & use of a Mock schema serde that some users have been asking for.

Can be submitted via:

$ CHRONON_ROOT=`pwd`/api/python/test/canary
$ zipline compile --chronon-root=$CHRONON_ROOT
$ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate

(Needs the Flink event driver to be running - triggered via DataProcSubmitterTest)

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced a new group-by aggregation for item event actions, supporting real-time analytics by listing ID with data sourced from GCP Kafka and BigQuery.
    • Added a mock schema provider for testing item event ingestion.
  • Bug Fixes

    • Updated test configurations to use new event schemas, topics, and data paths for improved accuracy in Flink Kafka ingest job tests.
  • Refactor

    • Renamed and restructured the event driver to focus on item events, with a streamlined schema and updated job naming.
  • Chores

    • Added new environment variable for Flink state storage configuration.
    • Updated build configuration to reference the renamed event driver.

Copy link

coderabbitai bot commented May 12, 2025

Walkthrough

This update introduces a new Python canary group-by test for GCP item events, refactors the Flink Kafka event driver and schema to focus on item events, updates related Scala and Python test configurations, and adds a mock custom schema provider for deserialization. Environment variables and job parameters are revised to match the new item event flow.

Changes

File(s) Change Summary
api/python/test/canary/teams.py Added FLINK_STATE_URI environment variable to the canary team config.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala Updated Flink Kafka ingest job test: uses new project/region/cluster, main class now FlinkKafkaItemEventDriver, new Kafka topic, data file path, and added event delay argument.
flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala Renamed object to FlinkKafkaItemEventDriver, replaced and simplified Avro schema to focus on item events, updated all references and job naming accordingly.
flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala Added MockCustomSchemaProvider class for item event schema, using Avro-to-Chronon conversion and Avro deserialization; updated example topic config comment.
api/python/test/canary/group_bys/gcp/item_event_canary.py New file: defines action event constants, builds a BigQuery-backed Source, configures Kafka topic with GCP security and schema provider, and sets up a GroupBy for item event aggregations by listing ID.
flink/BUILD.bazel Updated flink_kafka_assembly target main class from FlinkKafkaBeaconEventDriver to FlinkKafkaItemEventDriver.

Sequence Diagram(s)

sequenceDiagram
    participant Test as Canary Python Test
    participant Source as BigQuery Source
    participant Kafka as GCP Kafka
    participant Flink as FlinkKafkaItemEventDriver
    participant SerDe as MockCustomSchemaProvider

    Test->>Source: Query item events (BigQuery)
    Test->>Kafka: Connect with TLS/OAuth, subscribe to topic
    Kafka->>Flink: Stream item event messages
    Flink->>SerDe: Deserialize Avro bytes to Mutation
    Flink->>Test: Aggregate and output group-by results
Loading

Possibly related PRs

Suggested reviewers

  • chewy-zlai
  • nikhil-zlai

Poem

In the land of events, items take the stage,
Flink and Python join, their schemas to engage.
Kafka streams with vigor, BigQuery stands tall,
Canary flies onward, now item events for all!
🦜🪄

Tip

⚡️ Faster reviews with caching
  • CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.

Enjoy the performance boost—your workflow just got faster.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 497d48b and 7fa9c19.

📒 Files selected for processing (1)
  • api/python/test/canary/group_bys/gcp/item_event_canary.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/python/test/canary/group_bys/gcp/item_event_canary.py
⏰ Context from checks skipped due to timeout of 90000ms (18)
  • GitHub Check: service_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: service_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: flink_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: api_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: api_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: python_tests
  • GitHub Check: enforce_triggered_workflows

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (10)
flink/src/test/scala/ai/chronon/flink/test/deser/SourceIdentityDeSerializationSupportSpec.scala (3)

32-32: Consider adding null/empty check before accessing head.

This could throw if deserialization fails.

-  val row = resultList.toScala.head
+  assert(resultList.nonEmpty, "Deserialized result list shouldn't be empty")
+  val row = resultList.toScala.head

43-46: Add assertion messages for easier debugging.

These assertions lack descriptive messages.

-  assert(schema.fieldNames sameElements avroSchemaFields.map(_.name()))
-  schema.fieldNames.foreach(name => assert(schema(name).dataType == avroSchemaToSparkDataTypes(name)))
-  // spot check a couple of fields
-  assert(row.get(0) == 12345)
-  assert(row.getString(1) == "johndoe")
+  assert(schema.fieldNames sameElements avroSchemaFields.map(_.name()), "Field names should match")
+  schema.fieldNames.foreach(name => assert(schema(name).dataType == avroSchemaToSparkDataTypes(name), s"Data type mismatch for field $name"))
+  // spot check a couple of fields
+  assert(row.get(0) == 12345, "ID field should match expected value")
+  assert(row.getString(1) == "johndoe", "Username field should match expected value")

65-65: Use consistent assertion style.

Switch from JUnit to ScalaTest assertion.

-  assertTrue(resultList.isEmpty)
+  assert(resultList.isEmpty, "Result list should be empty for corrupted data")
flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala (2)

28-30: Hard-coded schema guard limits reuse.

require(schemaName == "beacon") blocks other schemas. Consider accepting any name or validating against a list.


31-40: Redundant schema conversion each JVM.

Both chrononSchema and avroSerDe recompute on every class-loader instantiation. Cache in companion object or pass prebuilt schema if many topics share it.

api/python/test/canary/group_bys/gcp/beacon_canary.py (1)

37-40: String-built URI is fragile.

Manual / & ; concatenation is error-prone and obscures param parsing. Prefer urllib.parse or a helper builder to encode/escape values.

from urllib.parse import quote_plus
kafka_topic = (
    f"kafka://test-beacon-main/"
    f"{quote_plus(schema_provider_cfgs)}/"
    f"{quote_plus(google_kafka_cfgs)}"
)
flink/src/main/scala/ai/chronon/flink/deser/SchemaRegistrySerDe.scala (1)

49-69: Network call executed during lazy val may block job start.

retrieveTopicSchema hits the registry in object construction. Consider asynchronous fetch or retry back-off to avoid long startup stalls.

flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1)

29-38: Consider making the builder return ChrononDeserializationSchema[_] instead of concrete types.
Returning the generic parent type allows for future polymorphism (e.g., passing either schema to a common method) without an explicit cast.

flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)

71-75: rowSerializer is set but never used.
Dead code adds cognitive load; remove unless a follow-up change will reference it.

-  @transient private var rowSerializer: ExpressionEncoder.Serializer[Row] = _
...
-    rowSerializer = eventExprEncoder.createSerializer()

77-82: Avoid recreating heavy SparkExpressionEval on every call.
projectedSchema instantiates a new evaluator each invocation; cache the result to save driver & TM memory/CPU.

-private def projectedSchemaLazy =
-  new SparkExpressionEval[Row](sourceEventEncoder, groupBy).getOutputSchema
-override def projectedSchema = projectedSchemaLazy.fields.map( ... )
+lazy val outputSchema = new SparkExpressionEval[Row](sourceEventEncoder, groupBy).getOutputSchema
+override def projectedSchema: Array[(String, DataType)] =
+  outputSchema.fields.map(f => (f.name, SparkConversions.toChrononType(f.name, f.dataType)))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 0545a42 and 9de5069.

📒 Files selected for processing (33)
  • api/python/test/canary/group_bys/gcp/beacon_canary.py (1 hunks)
  • api/python/test/canary/teams.py (1 hunks)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (1 hunks)
  • cloud_gcp/BUILD.bazel (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (2 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/deser/FlinkSerDeProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/deser/SchemaRegistrySerDe.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (3 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (0 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (0 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/deser/AvroDeSerTestUtils.scala (2 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/deser/SchemaRegistryDeSerSchemaProviderSpec.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/deser/SourceIdentityDeSerializationSupportSpec.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/deser/SourceProjectionDeSerializationSupportSpec.scala (4 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceIdentityDeSerializationSupportSpec.scala (0 hunks)
  • online/src/main/scala/ai/chronon/online/Api.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/metrics/Metrics.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/serde/AvroSerDe.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/serde/SerDe.scala (2 hunks)
  • service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1 hunks)
💤 Files with no reviewable changes (5)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceIdentityDeSerializationSupportSpec.scala
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
🧰 Additional context used
🧬 Code Graph Analysis (10)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1)
  • ChrononDeserializationSchema (16-20)
online/src/main/scala/ai/chronon/online/Api.scala (3)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (1)
  • streamDecoder (45-45)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
  • streamDecoder (42-45)
spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1)
  • streamDecoder (106-111)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
  • streamDecoder (42-45)
spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1)
  • streamDecoder (106-111)
spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (4)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (4)
  • get (32-32)
  • SparkConversions (56-159)
  • fromChrononSchema (126-131)
  • fromChrononSchema (131-136)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (2)
  • fromChrononSchema (82-134)
  • AvroConversions (32-274)
online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (1)
  • streamChrononSchema (140-145)
online/src/main/scala/ai/chronon/online/Extensions.scala (1)
  • catalogString (27-29)
online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)
online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala (1)
  • SparkInternalRowConversions (32-184)
online/src/main/scala/ai/chronon/online/metrics/Metrics.scala (1)
online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala (4)
  • OtelMetricsReporter (19-79)
  • OtelMetricsReporter (81-149)
  • buildOtelMetricReader (97-118)
  • buildOpenTelemetryClient (120-148)
online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala (1)
api/src/main/java/ai/chronon/api/thrift/Option.java (2)
  • Option (25-143)
  • Some (93-111)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
online/src/main/scala/ai/chronon/online/CatalystUtil.scala (4)
  • performSql (97-99)
  • performSql (131-136)
  • performSql (138-141)
  • performSql (143-147)
flink/src/main/scala/ai/chronon/flink/deser/SchemaRegistrySerDe.scala (3)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
  • AvroConversions (32-274)
online/src/main/scala/ai/chronon/online/serde/AvroSerDe.scala (3)
  • AvroSerDe (11-38)
  • schema (37-37)
  • fromBytes (24-35)
online/src/main/scala/ai/chronon/online/serde/SerDe.scala (2)
  • Mutation (51-51)
  • SerDe (11-18)
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (3)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (5)
  • SparkExpressionEval (37-191)
  • getOutputSchema (116-118)
  • initialize (65-84)
  • performSql (86-93)
  • performSql (95-98)
online/src/main/scala/ai/chronon/online/serde/SerDe.scala (2)
  • Mutation (51-51)
  • SerDe (11-18)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (3)
  • SparkConversions (56-159)
  • toSparkRow (136-158)
  • toChrononType (61-87)
🔇 Additional comments (38)
service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java (3)

30-30: Improved defensive check

Adding URL validation prevents metric initialization with invalid configuration.


32-32: Enhanced condition for metrics initialization

Now properly checks both if metrics are enabled and URL is defined.


39-39: Safe access to optional URL

This change properly accesses the URL using .get() after verification, avoiding potential NPE.

online/src/main/scala/ai/chronon/online/metrics/Metrics.scala (1)

138-143: Improved handling of optional metric reader

Now safely handles cases when metric reader isn't available by:

  1. Using map for transformation
  2. Falling back to no-op client

This makes metrics initialization more robust against missing configurations.

online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala (4)

93-95: Better exporter URL handling

Returns Option[String] for safer null handling.


97-97: Enhanced return type safety

Return type Option[MetricReader] propagates configuration optionality up the call stack.


101-106: Conditional metric reader creation

Creates metric reader only when URL is defined using functional map approach.


111-114: Consistent option wrapping

Prometheus server properly wrapped in Some() to match return type.

cloud_gcp/BUILD.bazel (1)

38-38: Added Avro dependency correctly.

Maven artifact for Apache Avro is properly inserted in alphabetical order.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

3-3: Added necessary import.

Import aligns with existing class usage in KafkaFlinkSource and ProjectedKafkaFlinkSource.

api/python/test/canary/teams.py (1)

28-28: Added Flink state URI configuration.

GCS path for Flink state storage supports the canary app requirements.

flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)

95-98: Good method overload implementation.

Cleanly converts Array[Any] to InternalRow before delegating to existing method.

online/src/main/scala/ai/chronon/online/Api.scala (1)

175-175: Method return type updated to SerDe

Method signature now returns SerDe instead of Serde, aligning with similar changes in implementing classes.

cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (1)

45-45: Return type updated to match parent class

Changed return type from Serde to SerDe to match the abstract method in Api.scala.

online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)

122-122: Made inputArrEncoder public with explicit type

Changed from private val to public val with explicit Any => Any type, enabling access from Flink deserialization schemas.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (2)

79-80: Updated DataprocSubmitter initialization

Now uses explicit SubmitterConf with canary project parameters instead of default constructor.


95-96: Updated test parameters for Flink canary app

Changed data file path to canary events location and added event delay parameter, supporting the new Flink canary application.

spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1)

106-110: Updated SerDe interface.

Correctly updates return type and implementation to match the renamed SerDe interface and AvroSerDe implementation with explicit schema conversion.

flink/src/test/scala/ai/chronon/flink/test/deser/AvroDeSerTestUtils.scala (2)

1-5: Relocated package and updated imports.

Package changed from Apache Spark to Chronon's namespace with appropriate imports.


22-28: Added SerDe test implementation.

New test utility class properly implements SerDe interface by delegating to AvroSerDe.

flink/src/main/scala/ai/chronon/flink/deser/FlinkSerDeProvider.scala (1)

9-26: Good factory pattern implementation.

This provider centralizes SerDe creation logic with proper validation for mutually exclusive configuration options.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2)

10-10: Updated import statement.

Consolidated imports for serialization-related classes.


42-43: Updated SerDe interface implementation.

Similar to changes in MockApi, correctly implements the renamed SerDe interface with explicit schema conversion.

flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (3)

21-21: Clean import for new SerDe abstractions.

Appropriate import for the refactored deserialization components.


142-142: Better abstraction with FlinkSerDeProvider.

Centralizes schema provider creation logic, improving maintainability.


144-145: Simplified deserialization schema creation.

Uses builder pattern for deserialization schema creation, making the code more readable.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3)

11-11: Updated imports for new deserialization framework.

Adds required imports for the refactored components.


315-315: Improved schema provider creation.

Uses centralized provider for consistent schema handling.


317-318: Standardized deserialization schema creation.

Uses builder pattern for projection-based deserialization, maintaining consistency with ValidationFlinkJob.

flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scala (3)

101-101: Improved efficiency with lazy val.

Avoids repeated schema parsing by caching the parsed schema.


105-107: Updated schema identity.

Changed record name and namespace for better organization.


108-120: Enhanced schema definition.

Updated field definitions to support canary app requirements.

online/src/main/scala/ai/chronon/online/serde/AvroSerDe.scala (1)

11-38: Well-designed SerDe implementation.

Clean implementation with:

  • Lazy initialization for expensive conversions
  • Clear separation of concerns
  • Proper Mutation object construction

The class properly handles reversal column checks for before/after state determination.

online/src/main/scala/ai/chronon/online/serde/SerDe.scala (1)

5-11: Rename looks good.

The renaming and improved documentation clarify the role of SerDe implementations.

flink/src/test/scala/ai/chronon/flink/test/deser/SourceProjectionDeSerializationSupportSpec.scala (1)

26-28: Builder pattern adoption looks good.

Clean transition to the new deserialization schema builder.

flink/src/test/scala/ai/chronon/flink/test/deser/SchemaRegistryDeSerSchemaProviderSpec.scala (2)

14-20: Good mock design.

Clean mock implementation for testing.


62-73: Test failure path for unsupported schema types.

Good coverage of expected failure case.

flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)

54-57: Verify toSparkRow signature.
toSparkRow is invoked with a StructType; earlier code shows it requires an api.DataType. Confirm an overload exists; otherwise the call will not compile.

val recordBytes = createDummyRecordBytes(schemaStr)
deserSchema.deserialize(recordBytes, listCollector)

// sanity check projected schemas is what we expect
val projectedSchema = deserSchema.projectedSchema
val projectedSchema = deserSchema.asInstanceOf[SourceProjectionDeserializationSchema].projectedSchema
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid unsafe cast.

This cast could fail if implementation changes.

-  val projectedSchema = deserSchema.asInstanceOf[SourceProjectionDeserializationSchema].projectedSchema
+  deserSchema match {
+    case schema: SourceProjectionDeserializationSchema =>
+      val projectedSchema = schema.projectedSchema
+      // rest of the code
+    case _ => fail("Expected a SourceProjectionDeserializationSchema")
+  }

Comment on lines 76 to 84
override def fromBytes(message: Array[Byte]): Mutation = {
val messageBytes =
if (schemaRegistryWireFormat) {
// schema id is set, we skip the first byte and read the schema id based on the wire format:
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format
// unfortunately we need to drop the first 5 bytes (and thus copy the rest of the byte array) as the AvroDataToCatalyst
// interface takes a byte array and the methods to do the Row conversion etc are all private so we can't reach in
message.drop(5)
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard against short messages, avoid copy.

message.drop(5) allocates and will throw if message.length < 5.

-    val messageBytes =
-      if (schemaRegistryWireFormat) {
-        message.drop(5)
-      } else message
+    val messageBytes =
+      if (schemaRegistryWireFormat) {
+        require(message.length >= 5, "Message too short for wire-format")
+        java.util.Arrays.copyOfRange(message, 5, message.length)
+      } else message

Comment on lines 24 to 26
override def sourceEventEncoder: Encoder[Row] =
Encoders.row(SparkConversions.fromChrononSchema(deserSchemaProvider.schema))

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

fromChrononSchema likely takes a Chronon schema, but a Spark StructType is being passed.
This mismatched type will not compile unless an overloaded method exists. Prefer supplying the existing Spark schema directly:

-  override def sourceEventEncoder: Encoder[Row] =
-    Encoders.row(SparkConversions.fromChrononSchema(deserSchemaProvider.schema))
+  override def sourceEventEncoder: Encoder[Row] =
+    Encoders.row(deserSchemaProvider.schema)

Committable suggestion skipped: line range outside the PR's diff.

@piyush-zlai piyush-zlai force-pushed the piyush/flink_event_canary branch from 9de5069 to eeb6ae2 Compare May 12, 2025 20:38
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala (1)

126-133: 🛠️ Refactor suggestion

Thread.sleep inside MapFunction blocks the operator thread.

Sleeping per record throttles throughput but also stalls checkpointing & watermark progress.
Prefer a ProcessFunction that uses timers or throttle upstream reading speed instead.

🧹 Nitpick comments (2)
flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala (2)

68-74: Consider specifying a key serializer as well.

With no key serializer, all records will have a null key which can hamper partitioning & compaction in Kafka.
If you do not actually need keyed partitioning, please add a short comment clarifying that intent.


98-99: Job name now mismatches the new purpose.

"Periodic Kafka Data Producer" still references the old beacon-style driver. Renaming keeps the Flink Web-UI and logs self-describing.

-    env.execute("Periodic Kafka Data Producer")
+    env.execute("Periodic Kafka Item-Event Producer")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 9de5069 and eeb6ae2.

📒 Files selected for processing (5)
  • api/python/test/canary/group_bys/gcp/item_event_canary.py (1 hunks)
  • api/python/test/canary/teams.py (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (2 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala (4 hunks)
  • flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • api/python/test/canary/teams.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala
🔇 Additional comments (1)
api/python/test/canary/group_bys/gcp/item_event_canary.py (1)

37-40: Slash-delimited property string can confuse the URI parser.

Several Kafka clients interpret / literally, causing one long property key.
Consider using ; or & separators, or supply properties via extra_configs rather than the URI.

Comment on lines +24 to +28
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)",
view="IF(event_type = 'view_listing', 1, 0)",
purchase="IF(event_type = 'backend_cart_payment', 1, 0)",
favorite="IF(event_type = 'backend_favorite_item2', 1, 0)",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

BigQuery type alias LONG is invalid – use INT64.

Standard SQL accepts INT64, INTEGER, or BIGINT; LONG will fail at parse time.

-                    listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
+                    listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS INT64)))",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)",
view="IF(event_type = 'view_listing', 1, 0)",
purchase="IF(event_type = 'backend_cart_payment', 1, 0)",
favorite="IF(event_type = 'backend_favorite_item2', 1, 0)",
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS INT64)))",
add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)",
view="IF(event_type = 'view_listing', 1, 0)",
purchase="IF(event_type = 'backend_cart_payment', 1, 0)",
favorite="IF(event_type = 'backend_favorite_item2', 1, 0)",

@piyush-zlai piyush-zlai force-pushed the piyush/flink_event_canary branch from eeb6ae2 to f21ba4b Compare May 13, 2025 13:59
@piyush-zlai piyush-zlai merged commit 998add7 into main May 13, 2025
21 checks passed
@piyush-zlai piyush-zlai deleted the piyush/flink_event_canary branch May 13, 2025 20:42
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
Builds on top of PR: #751. 

This PR adds a streaming GroupBy that can be run as a canary to sanity
check and test things out while making Flink changes. I used this to
sanity check the creation & use of a Mock schema serde that some users
have been asking for.

Can be submitted via:
```
$ CHRONON_ROOT=`pwd`/api/python/test/canary
$ zipline compile --chronon-root=$CHRONON_ROOT
$ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate
```

(Needs the Flink event driver to be running - triggered via
DataProcSubmitterTest)

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new group-by aggregation for item event actions,
supporting real-time analytics by listing ID with data sourced from GCP
Kafka and BigQuery.
  - Added a mock schema provider for testing item event ingestion.

- **Bug Fixes**
- Updated test configurations to use new event schemas, topics, and data
paths for improved accuracy in Flink Kafka ingest job tests.

- **Refactor**
- Renamed and restructured the event driver to focus on item events,
with a streamlined schema and updated job naming.

- **Chores**
- Added new environment variable for Flink state storage configuration.
  - Updated build configuration to reference the renamed event driver.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
Builds on top of PR: #751. 

This PR adds a streaming GroupBy that can be run as a canary to sanity
check and test things out while making Flink changes. I used this to
sanity check the creation & use of a Mock schema serde that some users
have been asking for.

Can be submitted via:
```
$ CHRONON_ROOT=`pwd`/api/python/test/canary
$ zipline compile --chronon-root=$CHRONON_ROOT
$ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate
```

(Needs the Flink event driver to be running - triggered via
DataProcSubmitterTest)

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new group-by aggregation for item event actions,
supporting real-time analytics by listing ID with data sourced from GCP
Kafka and BigQuery.
  - Added a mock schema provider for testing item event ingestion.

- **Bug Fixes**
- Updated test configurations to use new event schemas, topics, and data
paths for improved accuracy in Flink Kafka ingest job tests.

- **Refactor**
- Renamed and restructured the event driver to focus on item events,
with a streamlined schema and updated job naming.

- **Chores**
- Added new environment variable for Flink state storage configuration.
  - Updated build configuration to reference the renamed event driver.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
Builds on top of PR: #751. 

This PR adds a streaming GroupBy that can be run as a canary to sanity
cheour clients and test things out while making Flink changes. I used this to
sanity cheour clients the creation & use of a Moour clients schema serde that some users
have been asking for.

Can be submitted via:
```
$ CHRONON_ROOT=`pwd`/api/python/test/canary
$ zipline compile --chronon-root=$CHRONON_ROOT
$ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate
```

(Needs the Flink event driver to be running - triggered via
DataProcSubmitterTest)

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new group-by aggregation for item event actions,
supporting real-time analytics by listing ID with data sourced from GCP
Kafka and BigQuery.
  - Added a moour clients schema provider for testing item event ingestion.

- **Bug Fixes**
- Updated test configurations to use new event schemas, topics, and data
paths for improved accuracy in Flink Kafka ingest job tests.

- **Refactor**
- Renamed and restructured the event driver to focus on item events,
with a streamlined schema and updated job naming.

- **Chores**
- Added new environment variable for Flink state storage configuration.
  - Updated build configuration to reference the renamed event driver.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants