Skip to content

Wire up Flink DataProc job submission #189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 15, 2025
Merged

Conversation

piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Jan 9, 2025

Summary

This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes.

Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a mocked GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up.

Follow ups are called out in a few places in the code, listing the major ones out:

  • More prod grade Flink settings (things like checkpointing frequency, watermarking interval, ..)
  • Support for IDL encoders (will start with proto as that's what the Etsy folks need)
  • Read GroupByServingInfo from BigTable
  • Add support for Kafka source and leverage existing Chronon code for inferring parallelism etc from the Kafka topic

As we fix some of these up, we can get rid of the TestFlinkJob and the mocked code / classes there.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Kicked off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query:

$ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING
E2E_COUNT_STREAMING#
test0#1736380800000
  cf:value                                 @ 2025/01/09-15:30:41.199000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
  cf:value                                 @ 2025/01/09-14:55:04.992000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
...

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced job submission capabilities with support for Spark and Flink job types.
    • Introduced flexible configuration for job properties and parameters.
    • Added command-line argument parsing for Flink jobs.
    • Introduced utility classes for testing Flink jobs, including event generation and data streaming.
  • Improvements

    • Refined error logging across multiple components for better clarity.
    • Updated dependency management and assembly configurations for improved stability.
    • Improved source and encoder provider abstractions for Flink jobs.
  • Infrastructure

    • Updated build configurations for better dependency handling and assembly processes.
    • Removed unnecessary configuration entries from the submission configuration file.

Copy link

coderabbitai bot commented Jan 9, 2025

Walkthrough

This pull request introduces enhancements to the project's build configuration, job submission mechanisms, and Flink integration. Key changes include improved dependency management in build.sbt, a refactored DataprocSubmitter to support both Spark and Flink job types, and the addition of utility classes for Flink job processing. The modifications aim to increase flexibility, improve error handling, and provide robust job submission capabilities across different job types.

Changes

File Change Summary
build.sbt - Added flink-yarn dependency
- Updated dependency overrides
- Enhanced assembly merge strategies and exclusions
cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml - Removed jarUri and mainClass configurations
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala - Refactored submit method for job type support
- Added buildSparkJob and buildFlinkJob methods
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala - Updated tests for new submit method signatures and added Flink job tests
flink/* - Improved error logging in multiple files
- Added FlinkJob, SourceProvider, and TestFlinkJob classes
- Enhanced job submission and testing utilities
spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala - Introduced JobType trait
- Added job submission constants
- Updated submit method signature

Possibly related PRs

Suggested Reviewers

  • nikhil-zlai
  • tchow-zlai
  • david-zlai

Poem

🚀 Flink and Spark, dancing in the cloud,
Dependencies aligned, configurations proud.
Code refactored with surgical might,
Job submission now shines so bright!
Builders rejoice, the system's complete 🌟

Warning

Review ran into problems

🔥 Problems

GitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)

112-112: Consider adding context to error message.

-        logger.error("Error evaluating Spark expression", e)
+        logger.error(s"Error evaluating Spark expression for event: $inputEvent", e)
flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (2)

7-15: Consider sealed trait pattern for source types

Clean design! Consider using sealed trait/enum for supported source types (Kafka, PubSub) to enforce type safety.

sealed trait SourceType
case object KafkaSource extends SourceType
case object PubSubSource extends SourceType

17-24: Add encoder validation

Add validation to ensure encoder compatibility with Flink types.

def buildEncoder(): Either[String, Encoder[T]]
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

56-57: Unify exception messages for consistency.

Use consistent error messages when keys are missing.

Apply this diff:

-throw new RuntimeException("Main class not found")
+throw new RuntimeException(s"Missing expected $MainClass")
-throw new RuntimeException("Jar URI not found")
+throw new RuntimeException(s"Missing expected $JarURI")

Also applies to: 63-63

build.sbt (2)

217-220: Assembly exclusions could be more specific.

The exclusion pattern could accidentally exclude non-Hadoop/Guava/Protobuf JARs.

-      cp filter { jar =>  jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")}
+      cp filter { jar => 
+        val name = jar.data.getName
+        name.matches("hadoop-.*\\.jar") || 
+        name.matches("guava-.*\\.jar") || 
+        name.matches("protobuf-.*\\.jar")
+      }

259-261: Consider using a variable for reload4j version.

The version should be defined at the top with other versions.

+lazy val reload4jVersion = "1.2.25"
+
 dependencyOverrides ++= Seq(
-  "ch.qos.reload4j" % "reload4j" % "1.2.25",
+  "ch.qos.reload4j" % "reload4j" % reload4jVersion,
 )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 30eee10 and 33bf165.

📒 Files selected for processing (11)
  • build.sbt (3 hunks)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (0 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2 hunks)
💤 Files with no reviewable changes (1)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
✅ Files skipped from review due to trivial changes (1)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: other_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: no_spark_scala_tests
🔇 Additional comments (13)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (2)

122-122: LGTM! Better error logging.

Separating exception from message improves stack trace visibility.


Line range hint 1-124: Solid async I/O implementation.

Well-structured implementation with:

  • Proper error handling and metrics
  • Configurable timeout and concurrency
  • Clean separation of concerns
flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (1)

3-5: Verify Spark-Flink integration approach

Using Spark's Encoder in Flink jobs is unconventional. Confirm this hybrid approach is intentional and tested.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)

4-9: Imports added appropriately.

Necessary imports for job types and constants are included.


87-112: Helper methods enhance modularity.

buildSparkJob and buildFlinkJob improve code organization.

spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (3)

3-6: JobType trait improves type safety.

Defining job types enhances robustness.


9-9: submit method signature updated effectively.

Including jobType and jobProperties increases flexibility.


20-24: Constants centralize configuration.

JobSubmitterConstants improves maintainability.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3)

3-6: Imports updated appropriately.

Necessary imports for constants and job types added.


46-46: Test updated to use new submit method.

The submit call reflects the new signature.


54-67: Flink job test added.

Testing Flink job submission ensures functionality.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)

Line range hint 244-246: TODO comments need to be addressed.

The code contains TODO comments about configurations and metrics sink.

Would you like me to help implement these configurations and metrics sink?

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

234-240: TODO block needs implementation.

The non-mocked source path is not implemented yet.

Would you like me to help implement the real source integration?

),
wheres = filters,
timeColumn = "created",
startPartition = "20231106"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Hardcoded partition date should be configurable.

The start partition is hardcoded to "20231106".

-            startPartition = "20231106"
+            startPartition = jobConfig.getOrElse("startPartition", LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")))

Committable suggestion skipped: line range outside the PR's diff.


override def buildSource(): (FlinkSource[E2ETestEvent], Int) = {
val eventSrc = makeSource()
val parallelism = 2 // TODO - take parallelism as a job param
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Hardcoded parallelism should be configurable.

The parallelism value is hardcoded to 2.

-    val parallelism = 2 // TODO - take parallelism as a job param
+    val parallelism = jobConfig.getOrElse("parallelism", "2").toInt

Committable suggestion skipped: line range outside the PR's diff.

def buildApi(onlineClass: String, props: Map[String, String]): Api = {
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
val cls = cl.loadClass(onlineClass)
val constructor = cls.getConstructors.apply(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Constructor lookup could be more robust.

Using apply(0) assumes only one constructor exists.

-    val constructor = cls.getConstructors.apply(0)
+    val constructor = cls.getConstructors.find(_.getParameterCount == 1)
+      .getOrElse(throw new IllegalArgumentException(s"No suitable constructor found for $onlineClass"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val constructor = cls.getConstructors.apply(0)
val constructor = cls.getConstructors.find(_.getParameterCount == 1)
.getOrElse(throw new IllegalArgumentException(s"No suitable constructor found for $onlineClass"))

@piyush-zlai piyush-zlai force-pushed the piyush/flink_dataproc branch from 33bf165 to 581fddc Compare January 13, 2025 15:14
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

248-254: ⚠️ Potential issue

Add error handling and improve constructor lookup.

Constructor lookup needs to be more robust and reflection operations need proper error handling.

🧹 Nitpick comments (6)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

233-239: Implement production features as outlined in TODOs.

Would you like me to help create GitHub issues to track the implementation of:

  • GroupByServingInfo lookup
  • Schema handling
  • Source provider integration
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)

52-65: Add exhaustive pattern matching

Add a case for unhandled job types to future-proof the code.

   val jobBuilder = jobType match {
     case TypeSparkJob => buildSparkJob(mainClass, jarUri, files, args: _*)
     case TypeFlinkJob =>
       val mainJarUri =
         jobProperties.getOrElse(FlinkMainJarURI, throw new RuntimeException(s"Missing expected $FlinkMainJarURI"))
       buildFlinkJob(mainClass, mainJarUri, jarUri, args: _*)
+    case _ => throw new IllegalArgumentException(s"Unsupported job type: $jobType")
   }

56-57: Use more specific exceptions

Replace RuntimeException with IllegalArgumentException for missing properties.

-    val mainClass = jobProperties.getOrElse(MainClass, throw new RuntimeException("Main class not found"))
-    val jarUri = jobProperties.getOrElse(JarURI, throw new RuntimeException("Jar URI not found"))
+    val mainClass = jobProperties.getOrElse(MainClass, throw new IllegalArgumentException(s"Required property '$MainClass' not found"))
+    val jarUri = jobProperties.getOrElse(JarURI, throw new IllegalArgumentException(s"Required property '$JarURI' not found"))

184-185: Make main class configurable

Allow main class override through environment variable.

-      Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri),
+      Map(MainClass -> sys.env.getOrElse("CHRONON_MAIN_CLASS", "ai.chronon.spark.Driver"), JarURI -> chrononJarUri),
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (2)

34-42: Add timestamp validation in E2ETestEvent.

Consider validating that created is not in the future.

 case class E2ETestEvent(id: String, int_val: Int, double_val: Double, created: Long) {
+  require(created <= System.currentTimeMillis(), "Event timestamp cannot be in the future")
 }

90-96: Simplify schema lookup with getOrElse.

Replace pattern matching with getOrElse for cleaner code.

-          val keyColStructType = outputSchema.fields.find(field => field.name == keyCol)
-          keyColStructType match {
-            case Some(col) => col
-            case None =>
-              throw new IllegalArgumentException(s"Missing key col from output schema: $keyCol")
-          }
+          outputSchema.fields.find(_.name == keyCol).getOrElse(
+            throw new IllegalArgumentException(s"Missing key col from output schema: $keyCol")
+          )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 33bf165 and 581fddc.

📒 Files selected for processing (11)
  • build.sbt (3 hunks)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (0 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2 hunks)
💤 Files with no reviewable changes (1)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
🚧 Files skipped from review as they are similar to previous changes (7)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
  • flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
  • build.sbt
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: join_spark_tests
🔇 Additional comments (7)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

206-218: LGTM! Well-structured command line argument parsing.

Good use of Scallop with clear parameter descriptions and proper validation.


220-246: Add error handling and production configurations.

Missing critical production features:

  • Error handling for argument parsing
  • Flink configurations (checkpointing, watermarking)
  • Proper sink implementation
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

4-9: Clean import organization

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (4)

1-33: LGTM! Well-organized imports and clear file documentation.


62-62: Hardcoded parallelism should be configurable.


129-129: Hardcoded partition date should be configurable.


149-167: Consider adding error handling for API calls.

The AsyncKVStoreWriter might fail. Consider wrapping with error handling.

Comment on lines +99 to +105
val envProps =
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make Flink environment properties configurable

Move hardcoded values to configuration.

-    val envProps =
-      Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")
+    val envProps = jobProperties.getOrElse("flink.properties", 
+      Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val envProps =
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")
val envProps = jobProperties.getOrElse("flink.properties",
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST"))

Comment on lines +44 to +51
class PrintSink extends SinkFunction[WriteResponse] {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)

override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = {
val elapsedTime = System.currentTimeMillis() - value.putRequest.tsMillis.get
logger.info(s"Received write response with status ${value.status}; elapsedTime = $elapsedTime ms")
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential null timestamp in PrintSink.

Add null check for value.putRequest.tsMillis.

-    val elapsedTime = System.currentTimeMillis() - value.putRequest.tsMillis.get
+    val elapsedTime = value.putRequest.tsMillis.map(ts => System.currentTimeMillis() - ts).getOrElse(-1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class PrintSink extends SinkFunction[WriteResponse] {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = {
val elapsedTime = System.currentTimeMillis() - value.putRequest.tsMillis.get
logger.info(s"Received write response with status ${value.status}; elapsedTime = $elapsedTime ms")
}
}
class PrintSink extends SinkFunction[WriteResponse] {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = {
val elapsedTime = value.putRequest.tsMillis.map(ts => System.currentTimeMillis() - ts).getOrElse(-1)
logger.info(s"Received write response with status ${value.status}; elapsedTime = $elapsedTime ms")
}
}

Copy link
Collaborator

@tchow-zlai tchow-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// Or: 'com/google/protobuf/MapField' is not assignable to 'com/google/protobuf/MapFieldReflectionAccessor'
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { jar => jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so pretty much we don't trust any of the transitives matching these prefixes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah for each of those there's a submit / runtime failure of the jobs (even with user jars first..)

@piyush-zlai piyush-zlai force-pushed the piyush/flink_dataproc branch from 581fddc to b62401a Compare January 15, 2025 14:25
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1)

54-67: Externalize test configuration.

Move hardcoded GCS paths and configuration values to a test configuration file.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

233-239: Create JIRA tickets for TODO items.

These TODOs represent critical production features:

  • GroupByServingInfo lookup
  • Schema handling
  • Source configuration

Would you like me to help create GitHub issues for tracking these tasks?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 581fddc and b62401a.

📒 Files selected for processing (11)
  • build.sbt (3 hunks)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (0 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2 hunks)
💤 Files with no reviewable changes (1)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
🚧 Files skipped from review as they are similar to previous changes (6)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
  • flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
  • build.sbt
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: other_spark_tests
🔇 Additional comments (12)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3)

3-6: LGTM! Clean refactoring of imports and constructor.

Also applies to: 44-44


46-46: LGTM! Test updated to match new submit method signature.


74-76: LGTM! Ignored tests properly updated.

Also applies to: 92-95

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)

56-69: LGTM! Improved type safety and validation.


87-87: LGTM! Better error context with cause.


91-100: LGTM! Clean encapsulation of Spark job building.


102-115: Make Flink environment properties configurable.

Move hardcoded values to configuration.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

212-213: Consider defaulting mockSource to false.

Setting mockSource=true by default could lead to accidental use of mock data in production.


251-251: Constructor lookup could be more robust.

Using apply(0) assumes only one constructor exists.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (3)

48-48: Handle possible null timestamp

Add null check for value.putRequest.tsMillis to avoid exceptions.


62-62: Make parallelism configurable

Avoid hardcoding; make parallelism a configurable parameter.


129-129: Make startPartition configurable

Avoid hardcoding; make startPartition configurable or use the current date.

Comment on lines +242 to +245
val env = StreamExecutionEnvironment.getExecutionEnvironment
// TODO add useful configs
flinkJob.runGroupByJob(env).addSink(new PrintSink) // TODO wire up a metrics sink / such
env.execute(s"${flinkJob.groupByName}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add essential Flink configurations.

Missing critical Flink settings:

  • Checkpointing
  • Restart strategy
  • State backend
  • Watermark strategy

Comment on lines +248 to +254
def buildApi(onlineClass: String, props: Map[String, String]): Api = {
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
val cls = cl.loadClass(onlineClass)
val constructor = cls.getConstructors.apply(0)
val onlineImpl = constructor.newInstance(props)
onlineImpl.asInstanceOf[Api]
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for class loading.

Wrap class loading operations in try-catch to handle ClassNotFoundException and InstantiationException.

 def buildApi(onlineClass: String, props: Map[String, String]): Api = {
+  try {
     val cl = Thread.currentThread().getContextClassLoader
     val cls = cl.loadClass(onlineClass)
     val constructor = cls.getConstructors.apply(0)
     val onlineImpl = constructor.newInstance(props)
     onlineImpl.asInstanceOf[Api]
+  } catch {
+    case e: ClassNotFoundException => 
+      throw new IllegalArgumentException(s"Class $onlineClass not found", e)
+    case e: InstantiationException => 
+      throw new IllegalArgumentException(s"Failed to instantiate $onlineClass", e)
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def buildApi(onlineClass: String, props: Map[String, String]): Api = {
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
val cls = cl.loadClass(onlineClass)
val constructor = cls.getConstructors.apply(0)
val onlineImpl = constructor.newInstance(props)
onlineImpl.asInstanceOf[Api]
}
def buildApi(onlineClass: String, props: Map[String, String]): Api = {
try {
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
val cls = cl.loadClass(onlineClass)
val constructor = cls.getConstructors.apply(0)
val onlineImpl = constructor.newInstance(props)
onlineImpl.asInstanceOf[Api]
} catch {
case e: ClassNotFoundException =>
throw new IllegalArgumentException(s"Class $onlineClass not found", e)
case e: InstantiationException =>
throw new IllegalArgumentException(s"Failed to instantiate $onlineClass", e)
}
}

kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary
This PR adds support to submit Flink jobs to DataProc. There's a bit of
refactoring of the existing submitter code to support the new Flink job
type and its params. For Flink we need two jars - one is the main jar
(flink-assembly*.jar) which contains our FlinkJob main() code. The
second is the cloud_gcp jar which contains our BigTable classes.

Flink requires some infra that doesn't currently exist in our canary
like the source Kafka cluster. In the current version of this code, I've
created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent
source along with a mocked GroupBy / GroupByServing info. The rest of
the job (spark eval, avro conversion, BT kv store writes) are all wired
up.

Follow ups are called out in a few places in the code, listing the major
ones out:
* More prod grade Flink settings (things like checkpointing frequency,
watermarking interval, ..)
* Support for IDL encoders (will start with proto as that's what the
Etsy folks need)
* Read GroupByServingInfo from BigTable
* Add support for Kafka source and leverage existing Chronon code for
inferring parallelism etc from the Kafka topic

As we fix some of these up, we can get rid of the TestFlinkJob and the
mocked code / classes there.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update

Kicked off Flink jobs using the added test and confirmed that the job
comes up, runs successfully and writes out data to BT that I can query:
```
$ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING
E2E_COUNT_STREAMING#
test0#1736380800000
  cf:value                                 @ 2025/01/09-15:30:41.199000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
  cf:value                                 @ 2025/01/09-14:55:04.992000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
...
```


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Enhanced job submission capabilities with support for Spark and Flink
job types.
  - Introduced flexible configuration for job properties and parameters.
  - Added command-line argument parsing for Flink jobs.
- Introduced utility classes for testing Flink jobs, including event
generation and data streaming.

- **Improvements**
  - Refined error logging across multiple components for better clarity.
- Updated dependency management and assembly configurations for improved
stability.
  - Improved source and encoder provider abstractions for Flink jobs.

- **Infrastructure**
- Updated build configurations for better dependency handling and
assembly processes.
- Removed unnecessary configuration entries from the submission
configuration file.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary
This PR adds support to submit Flink jobs to DataProc. There's a bit of
refactoring of the existing submitter code to support the new Flink job
type and its params. For Flink we need two jars - one is the main jar
(flink-assembly*.jar) which contains our FlinkJob main() code. The
second is the cloud_gcp jar which contains our BigTable classes.

Flink requires some infra that doesn't currently exist in our canary
like the source Kafka cluster. In the current version of this code, I've
created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent
source along with a mocked GroupBy / GroupByServing info. The rest of
the job (spark eval, avro conversion, BT kv store writes) are all wired
up.

Follow ups are called out in a few places in the code, listing the major
ones out:
* More prod grade Flink settings (things like checkpointing frequency,
watermarking interval, ..)
* Support for IDL encoders (will start with proto as that's what the
Etsy folks need)
* Read GroupByServingInfo from BigTable
* Add support for Kafka source and leverage existing Chronon code for
inferring parallelism etc from the Kafka topic

As we fix some of these up, we can get rid of the TestFlinkJob and the
mocked code / classes there.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update

Kicked off Flink jobs using the added test and confirmed that the job
comes up, runs successfully and writes out data to BT that I can query:
```
$ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING
E2E_COUNT_STREAMING#
test0#1736380800000
  cf:value                                 @ 2025/01/09-15:30:41.199000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
  cf:value                                 @ 2025/01/09-14:55:04.992000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
...
```


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Enhanced job submission capabilities with support for Spark and Flink
job types.
  - Introduced flexible configuration for job properties and parameters.
  - Added command-line argument parsing for Flink jobs.
- Introduced utility classes for testing Flink jobs, including event
generation and data streaming.

- **Improvements**
  - Refined error logging across multiple components for better clarity.
- Updated dependency management and assembly configurations for improved
stability.
  - Improved source and encoder provider abstractions for Flink jobs.

- **Infrastructure**
- Updated build configurations for better dependency handling and
assembly processes.
- Removed unnecessary configuration entries from the submission
configuration file.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
This PR adds support to submit Flink jobs to DataProc. There's a bit of
refactoring of the existing submitter code to support the new Flink job
type and its params. For Flink we need two jars - one is the main jar
(flink-assembly*.jar) which contains our FlinkJob main() code. The
second is the cloud_gcp jar which contains our BigTable classes.

Flink requires some infra that doesn't currently exist in our canary
like the source Kafka cluster. In the current version of this code, I've
created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent
source along with a mocked GroupBy / GroupByServing info. The rest of
the job (spark eval, avro conversion, BT kv store writes) are all wired
up.

Follow ups are called out in a few places in the code, listing the major
ones out:
* More prod grade Flink settings (things like checkpointing frequency,
watermarking interval, ..)
* Support for IDL encoders (will start with proto as that's what the
our clients folks need)
* Read GroupByServingInfo from BigTable
* Add support for Kafka source and leverage existing Chronon code for
inferring parallelism etc from the Kafka topic

As we fix some of these up, we can get rid of the TestFlinkJob and the
mocked code / classes there.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update

Kicked off Flink jobs using the added test and confirmed that the job
comes up, runs successfully and writes out data to BT that I can query:
```
$ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING
E2E_COUNT_STREAMING#
test0#1736380800000
  cf:value                                 @ 2025/01/09-15:30:41.199000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
  cf:value                                 @ 2025/01/09-14:55:04.992000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
...
```


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Enhanced job submission capabilities with support for Spark and Flink
job types.
  - Introduced flexible configuration for job properties and parameters.
  - Added command-line argument parsing for Flink jobs.
- Introduced utility classes for testing Flink jobs, including event
generation and data streaming.

- **Improvements**
  - Refined error logging across multiple components for better clarity.
- Updated dependency management and assembly configurations for improved
stability.
  - Improved source and encoder provider abstractions for Flink jobs.

- **Infrastructure**
- Updated build configurations for better dependency handling and
assembly processes.
- Removed unnecessary configuration entries from the submission
configuration file.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
This PR adds support to submit Flink jobs to DataProc. There's a bit of
refactoring of the existing submitter code to support the new Flink job
type and its params. For Flink we need two jars - one is the main jar
(flink-assembly*.jar) which contains our FlinkJob main() code. The
second is the cloud_gcp jar which contains our BigTable classes.

Flink requires some infra that doesn't currently exist in our canary
like the source Kafka cluster. In the current version of this code, I've
created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent
source along with a mocked GroupBy / GroupByServing info. The rest of
the job (spark eval, avro conversion, BT kv store writes) are all wired
up.

Follow ups are called out in a few places in the code, listing the major
ones out:
* More prod grade Flink settings (things like checkpointing frequency,
watermarking interval, ..)
* Support for IDL encoders (will start with proto as that's what the
our clients folks need)
* Read GroupByServingInfo from BigTable
* Add support for Kafka source and leverage existing Chronon code for
inferring parallelism etc from the Kafka topic

As we fix some of these up, we can get rid of the TestFlinkJob and the
mocked code / classes there.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update

Kicked off Flink jobs using the added test and confirmed that the job
comes up, runs successfully and writes out data to BT that I can query:
```
$ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING
E2E_COUNT_STREAMING#
test0#1736380800000
  cf:value                                 @ 2025/01/09-15:30:41.199000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
  cf:value                                 @ 2025/01/09-14:55:04.992000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
...
```


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Enhanced job submission capabilities with support for Spark and Flink
job types.
  - Introduced flexible configuration for job properties and parameters.
  - Added command-line argument parsing for Flink jobs.
- Introduced utility classes for testing Flink jobs, including event
generation and data streaming.

- **Improvements**
  - Refined error logging across multiple components for better clarity.
- Updated dependency management and assembly configurations for improved
stability.
  - Improved source and encoder provider abstractions for Flink jobs.

- **Infrastructure**
- Updated build configurations for better dependency handling and
assembly processes.
- Removed unnecessary configuration entries from the submission
configuration file.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
This PR adds support to submit Flink jobs to DataProc. There's a bit of
refactoring of the existing submitter code to support the new Flink job
type and its params. For Flink we need two jars - one is the main jar
(flink-assembly*.jar) which contains our FlinkJob main() code. The
second is the cloud_gcp jar which contains our BigTable classes.

Flink requires some infra that doesn't currently exist in our canary
like the source Kafka cluster. In the current version of this code, I've
created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent
source along with a moour clientsed GroupBy / GroupByServing info. The rest of
the job (spark eval, avro conversion, BT kv store writes) are all wired
up.

Follow ups are called out in a few places in the code, listing the major
ones out:
* More prod grade Flink settings (things like cheour clientspointing frequency,
watermarking interval, ..)
* Support for IDL encoders (will start with proto as that's what the
our clients folks need)
* Read GroupByServingInfo from BigTable
* Add support for Kafka source and leverage existing Chronon code for
inferring parallelism etc from the Kafka topic

As we fix some of these up, we can get rid of the TestFlinkJob and the
moour clientsed code / classes there.

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update

Kiour clientsed off Flink jobs using the added test and confirmed that the job
comes up, runs successfully and writes out data to BT that I can query:
```
$ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING
E2E_COUNT_STREAMING#
test0#1736380800000
  cf:value                                 @ 2025/01/09-15:30:41.199000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
  cf:value                                 @ 2025/01/09-14:55:04.992000
    "\x02\x00\x00\x00\x00\x00\x00\x00\x00"
...
```


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Enhanced job submission capabilities with support for Spark and Flink
job types.
  - Introduced flexible configuration for job properties and parameters.
  - Added command-line argument parsing for Flink jobs.
- Introduced utility classes for testing Flink jobs, including event
generation and data streaming.

- **Improvements**
  - Refined error logging across multiple components for better clarity.
- Updated dependency management and assembly configurations for improved
stability.
  - Improved source and encoder provider abstractions for Flink jobs.

- **Infrastructure**
- Updated build configurations for better dependency handling and
assembly processes.
- Removed unnecessary configuration entries from the submission
configuration file.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@coderabbitai coderabbitai bot mentioned this pull request May 28, 2025
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants