-
Notifications
You must be signed in to change notification settings - Fork 0
Add support to drive Kafka avro load for streaming job testing #271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces significant enhancements to the project's build configuration and data processing capabilities, focusing on Apache Flink and Kafka integration. The changes include adding new dependencies for Flink connectors, creating a new Changes
Possibly Related PRs
Suggested Reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
⏰ Context from checks skipped due to timeout of 90000ms (7)
🔇 Additional comments (5)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
build.sbt
(4 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(2 hunks)flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: no_spark_scala_tests
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (6)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1)
67-81
: No issues found in the test case.The test setup is appropriate for local testing.
build.sbt (5)
112-114
: Dependencies added correctly.The added Flink dependencies enhance Kafka integration.
225-226
: Included necessary dependencies.The Flink and Spark dependencies are appropriate.
246-248
: Main-Class specified appropriately.The
Main-Class
for the Flink project is set correctly.
256-277
: Configuration forflink_kafka_ingest
project looks good.The project settings are appropriate.
298-300
: Kafka authentication dependency added appropriately.The exclusion of the schema registry client is correct.
class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] { | ||
override def map(value: GenericRecord): GenericRecord = { | ||
val updatedTimestamp = System.currentTimeMillis() | ||
// Update the timestamp field in the record | ||
value.put("timestamp", updatedTimestamp) | ||
Thread.sleep(delayMs) | ||
value | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using Thread.sleep()
in the map
function.
Thread.sleep()
blocks processing threads and reduces throughput.
Apply this diff to remove the sleep:
class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] {
override def map(value: GenericRecord): GenericRecord = {
val updatedTimestamp = System.currentTimeMillis()
// Update the timestamp field in the record
value.put("timestamp", updatedTimestamp)
- Thread.sleep(delayMs)
value
}
}
Consider using a non-blocking approach like event time characteristics or timers.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] { | |
override def map(value: GenericRecord): GenericRecord = { | |
val updatedTimestamp = System.currentTimeMillis() | |
// Update the timestamp field in the record | |
value.put("timestamp", updatedTimestamp) | |
Thread.sleep(delayMs) | |
value | |
} | |
} | |
class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] { | |
override def map(value: GenericRecord): GenericRecord = { | |
val updatedTimestamp = System.currentTimeMillis() | |
// Update the timestamp field in the record | |
value.put("timestamp", updatedTimestamp) | |
value | |
} | |
} |
import org.rogach.scallop.Serialization | ||
|
||
// Canary test app that can point to a source data file and will emit an event to Kafka periodically with an updated timestamp | ||
object FlinkKafkaBeaconEventDriver { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this isn't going to be part of the main Driver.scala
why don't we just make these regular args and kick it off from the unit test framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am kicking off from the unit test framework (see "test flink kafka ingest job locally"). Did you have something else in mind? Later state might be to trigger this in our 'CI' setup or such but for now this is just manually triggered to create the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, judging by the use of scallopconf i'm assuming you'll want to run it as a CLI command at some point? Should work - just that we won't have any of this hooked up elsewhere just yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I added the scallop conf to allow us to tweak and pass params in the future. For now it could all be hardcoded in the app for sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
build.sbt (1)
225-226
: Consider removing spark-avro dependency.The Flink project already has flink-avro. The Spark Avro dependency might be unnecessary.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
build.sbt
(5 hunks)cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: fetcher_spark_tests
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: no_spark_scala_tests
- GitHub Check: other_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: mutation_spark_tests
🔇 Additional comments (4)
build.sbt (3)
112-114
: LGTM: Required Flink dependencies added.Dependencies for Kafka connectivity and Avro serialization are correctly versioned.
314-319
: LGTM: Security vulnerabilities addressed.Appropriate version overrides for reload4j and kafka-clients to fix CVEs.
256-276
: LGTM: Well-structured Flink Kafka project.Project configuration looks good with proper jar exclusions and main class setup.
Let's verify the main class exists:
✅ Verification successful
✓ Main class exists and is properly implemented
Found at
flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scala
- implements a complete Flink streaming job with Kafka sink configuration.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify FlinkKafkaBeaconEventDriver implementation ast-grep --pattern 'class FlinkKafkaBeaconEventDriver'Length of output: 56
Script:
#!/bin/bash # Try multiple search strategies echo "=== Searching with ripgrep ===" rg -i "class.*FlinkKafkaBeaconEventDriver" echo -e "\n=== Searching Scala files ===" fd -e scala -x rg -l "FlinkKafkaBeaconEventDriver" {} echo -e "\n=== Searching for partial match ===" rg -i "FlinkKafkaBeacon"Length of output: 1108
Script:
#!/bin/bash # Check main class implementation cat flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scalaLength of output: 6991
cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (1)
4-4
: LGTM: Cluster name updated.Configuration change aligns with the PR objective.
it should "test flink kafka ingest job locally" ignore { | ||
|
||
val submitter = DataprocSubmitter() | ||
val submittedJobId = | ||
submitter.submit(spark.FlinkJob, | ||
Map(MainClass -> "ai.chronon.flink.FlinkJob", | ||
FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar", | ||
Map(MainClass -> "ai.chronon.flink.FlinkKafkaBeaconEventDriver", | ||
FlinkMainJarURI -> "gs://zipline-jars/flink_kafka_ingest-assembly-0.1.0-SNAPSHOT.jar", | ||
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"), | ||
List.empty, | ||
"--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl", | ||
"--groupby-name=e2e-count", | ||
"-ZGCP_PROJECT_ID=bigtable-project-id", | ||
"-ZGCP_INSTANCE_ID=bigtable-instance-id") | ||
"--kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092", | ||
"--kafka-topic=test-beacon-main", | ||
"--data-file-name=gs://zl-warehouse/beacon_events/beacon-output.avro", | ||
) | ||
println(submittedJobId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enable and enhance the Flink Kafka test.
The test is currently ignored. Consider:
- Removing the ignore annotation
- Adding assertions beyond just printing the job ID
- Adding error case tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming it's tested on dataproc through the ignorable unit tests.
## Summary Set up a Flink job that can take beacon data as avro (configured in gcs) and emit it at a configurable rate to Kafka. We can use this stream in our GB streaming jobs ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested Kicked off the job and you can see events flowing in topic [test-beacon-main](https://console.cloud.google.com/managedkafka/us-central1/clusters/zipline-kafka-cluster/topics/test-beacon-main?hl=en&invt=Abnpeg&project=canary-443022) - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added Kafka data ingestion capabilities using Apache Flink. - Introduced a new driver for streaming events from GCS to Kafka with configurable delay. - **Dependencies** - Added Apache Flink connectors for Kafka, Avro, and file integration. - Integrated managed Kafka authentication handler for cloud environments. - **Infrastructure** - Created new project configuration for Kafka data ingestion. - Updated build settings to support advanced streaming workflows. - Updated cluster name configuration for Dataproc submitter. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Set up a Flink job that can take beacon data as avro (configured in gcs) and emit it at a configurable rate to Kafka. We can use this stream in our GB streaming jobs ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested Kicked off the job and you can see events flowing in topic [test-beacon-main](https://console.cloud.google.com/managedkafka/us-central1/clusters/zipline-kafka-cluster/topics/test-beacon-main?hl=en&invt=Abnpeg&project=canary-443022) - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added Kafka data ingestion capabilities using Apache Flink. - Introduced a new driver for streaming events from GCS to Kafka with configurable delay. - **Dependencies** - Added Apache Flink connectors for Kafka, Avro, and file integration. - Integrated managed Kafka authentication handler for cloud environments. - **Infrastructure** - Created new project configuration for Kafka data ingestion. - Updated build settings to support advanced streaming workflows. - Updated cluster name configuration for Dataproc submitter. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Set up a Flink job that can take beacon data as avro (configured in gcs) and emit it at a configurable rate to Kafka. We can use this stream in our GB streaming jobs ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested Kicked off the job and you can see events flowing in topic [test-beacon-main](https://console.cloud.google.com/managedkafka/us-central1/clusters/zipline-kafka-cluster/topics/test-beacon-main?hl=en&invt=Abnpeg&project=canary-443022) - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added Kafka data ingestion capabilities using Apache Flink. - Introduced a new driver for streaming events from GCS to Kafka with configurable delay. - **Dependencies** - Added Apache Flink connectors for Kafka, Avro, and file integration. - Integrated managed Kafka authentication handler for cloud environments. - **Infrastructure** - Created new project configuration for Kafka data ingestion. - Updated build settings to support advanced streaming workflows. - Updated cluster name configuration for Dataproc submitter. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Set up a Flink job that can take beacon data as avro (configured in gcs) and emit it at a configurable rate to Kafka. We can use this stream in our GB streaming jobs ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested Kicked off the job and you can see events flowing in topic [test-beacon-main](https://console.cloud.google.com/managedkafka/us-central1/clusters/zipline-kafka-cluster/topics/test-beacon-main?hl=en&invt=Abnpeg&project=canary-443022) - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added Kafka data ingestion capabilities using Apache Flink. - Introduced a new driver for streaming events from GCS to Kafka with configurable delay. - **Dependencies** - Added Apache Flink connectors for Kafka, Avro, and file integration. - Integrated managed Kafka authentication handler for cloud environments. - **Infrastructure** - Created new project configuration for Kafka data ingestion. - Updated build settings to support advanced streaming workflows. - Updated cluster name configuration for Dataproc submitter. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Set up a Flink job that can take beacon data as avro (configured in gcs) and emit it at a configurable rate to Kafka. We can use this stream in our GB streaming jobs ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested Kiour clientsed off the job and you can see events flowing in topic [test-beacon-main](https://console.cloud.google.com/managedkafka/us-central1/clusters/zipline-kafka-cluster/topics/test-beacon-main?hl=en&invt=Abnpeg&project=canary-443022) - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added Kafka data ingestion capabilities using Apache Flink. - Introduced a new driver for streaming events from GCS to Kafka with configurable delay. - **Dependencies** - Added Apache Flink connectors for Kafka, Avro, and file integration. - Integrated managed Kafka authentication handler for cloud environments. - **Infrastructure** - Created new project configuration for Kafka data ingestion. - Updated build settings to support advanced streaming workflows. - Updated cluster name configuration for Dataproc submitter. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Set up a Flink job that can take beacon data as avro (configured in gcs) and emit it at a configurable rate to Kafka. We can use this stream in our GB streaming jobs
Checklist
Added Unit Tests
Covered by existing CI
Integration tested
Kicked off the job and you can see events flowing in topic test-beacon-main
Documentation update
Summary by CodeRabbit
Release Notes
New Features
Dependencies
Infrastructure