-
Notifications
You must be signed in to change notification settings - Fork 0
Wire up Flink DataProc job submission #189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces enhancements to the project's build configuration, job submission mechanisms, and Flink integration. Key changes include improved dependency management in Changes
Possibly related PRs
Suggested Reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (6)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
112-112
: Consider adding context to error message.- logger.error("Error evaluating Spark expression", e) + logger.error(s"Error evaluating Spark expression for event: $inputEvent", e)flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (2)
7-15
: Consider sealed trait pattern for source typesClean design! Consider using sealed trait/enum for supported source types (Kafka, PubSub) to enforce type safety.
sealed trait SourceType case object KafkaSource extends SourceType case object PubSubSource extends SourceType
17-24
: Add encoder validationAdd validation to ensure encoder compatibility with Flink types.
def buildEncoder(): Either[String, Encoder[T]]cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
56-57
: Unify exception messages for consistency.Use consistent error messages when keys are missing.
Apply this diff:
-throw new RuntimeException("Main class not found") +throw new RuntimeException(s"Missing expected $MainClass") -throw new RuntimeException("Jar URI not found") +throw new RuntimeException(s"Missing expected $JarURI")Also applies to: 63-63
build.sbt (2)
217-220
: Assembly exclusions could be more specific.The exclusion pattern could accidentally exclude non-Hadoop/Guava/Protobuf JARs.
- cp filter { jar => jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")} + cp filter { jar => + val name = jar.data.getName + name.matches("hadoop-.*\\.jar") || + name.matches("guava-.*\\.jar") || + name.matches("protobuf-.*\\.jar") + }
259-261
: Consider using a variable for reload4j version.The version should be defined at the top with other versions.
+lazy val reload4jVersion = "1.2.25" + dependencyOverrides ++= Seq( - "ch.qos.reload4j" % "reload4j" % "1.2.25", + "ch.qos.reload4j" % "reload4j" % reload4jVersion, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (11)
build.sbt
(3 hunks)cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
(0 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
(2 hunks)
💤 Files with no reviewable changes (1)
- cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
✅ Files skipped from review due to trivial changes (1)
- flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: other_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: no_spark_scala_tests
🔇 Additional comments (13)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (2)
122-122
: LGTM! Better error logging.Separating exception from message improves stack trace visibility.
Line range hint
1-124
: Solid async I/O implementation.Well-structured implementation with:
- Proper error handling and metrics
- Configurable timeout and concurrency
- Clean separation of concerns
flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (1)
3-5
: Verify Spark-Flink integration approachUsing Spark's Encoder in Flink jobs is unconventional. Confirm this hybrid approach is intentional and tested.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)
4-9
: Imports added appropriately.Necessary imports for job types and constants are included.
87-112
: Helper methods enhance modularity.
buildSparkJob
andbuildFlinkJob
improve code organization.spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (3)
3-6
:JobType
trait improves type safety.Defining job types enhances robustness.
9-9
:submit
method signature updated effectively.Including
jobType
andjobProperties
increases flexibility.
20-24
: Constants centralize configuration.
JobSubmitterConstants
improves maintainability.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3)
3-6
: Imports updated appropriately.Necessary imports for constants and job types added.
46-46
: Test updated to use newsubmit
method.The
submit
call reflects the new signature.
54-67
: Flink job test added.Testing Flink job submission ensures functionality.
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)
Line range hint
244-246
: TODO comments need to be addressed.The code contains TODO comments about configurations and metrics sink.
Would you like me to help implement these configurations and metrics sink?
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
234-240
: TODO block needs implementation.The non-mocked source path is not implemented yet.
Would you like me to help implement the real source integration?
), | ||
wheres = filters, | ||
timeColumn = "created", | ||
startPartition = "20231106" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Hardcoded partition date should be configurable.
The start partition is hardcoded to "20231106".
- startPartition = "20231106"
+ startPartition = jobConfig.getOrElse("startPartition", LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")))
Committable suggestion skipped: line range outside the PR's diff.
|
||
override def buildSource(): (FlinkSource[E2ETestEvent], Int) = { | ||
val eventSrc = makeSource() | ||
val parallelism = 2 // TODO - take parallelism as a job param |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Hardcoded parallelism should be configurable.
The parallelism value is hardcoded to 2.
- val parallelism = 2 // TODO - take parallelism as a job param
+ val parallelism = jobConfig.getOrElse("parallelism", "2").toInt
Committable suggestion skipped: line range outside the PR's diff.
def buildApi(onlineClass: String, props: Map[String, String]): Api = { | ||
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader | ||
val cls = cl.loadClass(onlineClass) | ||
val constructor = cls.getConstructors.apply(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Constructor lookup could be more robust.
Using apply(0)
assumes only one constructor exists.
- val constructor = cls.getConstructors.apply(0)
+ val constructor = cls.getConstructors.find(_.getParameterCount == 1)
+ .getOrElse(throw new IllegalArgumentException(s"No suitable constructor found for $onlineClass"))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
val constructor = cls.getConstructors.apply(0) | |
val constructor = cls.getConstructors.find(_.getParameterCount == 1) | |
.getOrElse(throw new IllegalArgumentException(s"No suitable constructor found for $onlineClass")) |
33bf165
to
581fddc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
248-254
:⚠️ Potential issueAdd error handling and improve constructor lookup.
Constructor lookup needs to be more robust and reflection operations need proper error handling.
🧹 Nitpick comments (6)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
233-239
: Implement production features as outlined in TODOs.Would you like me to help create GitHub issues to track the implementation of:
- GroupByServingInfo lookup
- Schema handling
- Source provider integration
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)
52-65
: Add exhaustive pattern matchingAdd a case for unhandled job types to future-proof the code.
val jobBuilder = jobType match { case TypeSparkJob => buildSparkJob(mainClass, jarUri, files, args: _*) case TypeFlinkJob => val mainJarUri = jobProperties.getOrElse(FlinkMainJarURI, throw new RuntimeException(s"Missing expected $FlinkMainJarURI")) buildFlinkJob(mainClass, mainJarUri, jarUri, args: _*) + case _ => throw new IllegalArgumentException(s"Unsupported job type: $jobType") }
56-57
: Use more specific exceptionsReplace RuntimeException with IllegalArgumentException for missing properties.
- val mainClass = jobProperties.getOrElse(MainClass, throw new RuntimeException("Main class not found")) - val jarUri = jobProperties.getOrElse(JarURI, throw new RuntimeException("Jar URI not found")) + val mainClass = jobProperties.getOrElse(MainClass, throw new IllegalArgumentException(s"Required property '$MainClass' not found")) + val jarUri = jobProperties.getOrElse(JarURI, throw new IllegalArgumentException(s"Required property '$JarURI' not found"))
184-185
: Make main class configurableAllow main class override through environment variable.
- Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri), + Map(MainClass -> sys.env.getOrElse("CHRONON_MAIN_CLASS", "ai.chronon.spark.Driver"), JarURI -> chrononJarUri),flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (2)
34-42
: Add timestamp validation in E2ETestEvent.Consider validating that
created
is not in the future.case class E2ETestEvent(id: String, int_val: Int, double_val: Double, created: Long) { + require(created <= System.currentTimeMillis(), "Event timestamp cannot be in the future") }
90-96
: Simplify schema lookup with getOrElse.Replace pattern matching with getOrElse for cleaner code.
- val keyColStructType = outputSchema.fields.find(field => field.name == keyCol) - keyColStructType match { - case Some(col) => col - case None => - throw new IllegalArgumentException(s"Missing key col from output schema: $keyCol") - } + outputSchema.fields.find(_.name == keyCol).getOrElse( + throw new IllegalArgumentException(s"Missing key col from output schema: $keyCol") + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (11)
build.sbt
(3 hunks)cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
(0 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(4 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
(2 hunks)
💤 Files with no reviewable changes (1)
- cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
🚧 Files skipped from review as they are similar to previous changes (7)
- flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
- flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
- flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
- build.sbt
- flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
- spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: fetcher_spark_tests
- GitHub Check: join_spark_tests
🔇 Additional comments (7)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
206-218
: LGTM! Well-structured command line argument parsing.Good use of Scallop with clear parameter descriptions and proper validation.
220-246
: Add error handling and production configurations.Missing critical production features:
- Error handling for argument parsing
- Flink configurations (checkpointing, watermarking)
- Proper sink implementation
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
4-9
: Clean import organizationflink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (4)
1-33
: LGTM! Well-organized imports and clear file documentation.
62-62
: Hardcoded parallelism should be configurable.
129-129
: Hardcoded partition date should be configurable.
149-167
: Consider adding error handling for API calls.The AsyncKVStoreWriter might fail. Consider wrapping with error handling.
val envProps = | ||
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make Flink environment properties configurable
Move hardcoded values to configuration.
- val envProps =
- Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")
+ val envProps = jobProperties.getOrElse("flink.properties",
+ Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST"))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
val envProps = | |
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST") | |
val envProps = jobProperties.getOrElse("flink.properties", | |
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")) | |
class PrintSink extends SinkFunction[WriteResponse] { | ||
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) | ||
|
||
override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = { | ||
val elapsedTime = System.currentTimeMillis() - value.putRequest.tsMillis.get | ||
logger.info(s"Received write response with status ${value.status}; elapsedTime = $elapsedTime ms") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential null timestamp in PrintSink.
Add null check for value.putRequest.tsMillis
.
- val elapsedTime = System.currentTimeMillis() - value.putRequest.tsMillis.get
+ val elapsedTime = value.putRequest.tsMillis.map(ts => System.currentTimeMillis() - ts).getOrElse(-1)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class PrintSink extends SinkFunction[WriteResponse] { | |
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) | |
override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = { | |
val elapsedTime = System.currentTimeMillis() - value.putRequest.tsMillis.get | |
logger.info(s"Received write response with status ${value.status}; elapsedTime = $elapsedTime ms") | |
} | |
} | |
class PrintSink extends SinkFunction[WriteResponse] { | |
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) | |
override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = { | |
val elapsedTime = value.putRequest.tsMillis.map(ts => System.currentTimeMillis() - ts).getOrElse(-1) | |
logger.info(s"Received write response with status ${value.status}; elapsedTime = $elapsedTime ms") | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// Or: 'com/google/protobuf/MapField' is not assignable to 'com/google/protobuf/MapFieldReflectionAccessor' | ||
assembly / assemblyExcludedJars := { | ||
val cp = (assembly / fullClasspath).value | ||
cp filter { jar => jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so pretty much we don't trust any of the transitives matching these prefixes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah for each of those there's a submit / runtime failure of the jobs (even with user jars first..)
581fddc
to
b62401a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1)
54-67
: Externalize test configuration.Move hardcoded GCS paths and configuration values to a test configuration file.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
233-239
: Create JIRA tickets for TODO items.These TODOs represent critical production features:
- GroupByServingInfo lookup
- Schema handling
- Source configuration
Would you like me to help create GitHub issues for tracking these tasks?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (11)
build.sbt
(3 hunks)cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
(0 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(4 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
(2 hunks)
💤 Files with no reviewable changes (1)
- cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
🚧 Files skipped from review as they are similar to previous changes (6)
- flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
- flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
- flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
- build.sbt
- spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
- flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: no_spark_scala_tests
- GitHub Check: other_spark_tests
🔇 Additional comments (12)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3)
3-6
: LGTM! Clean refactoring of imports and constructor.Also applies to: 44-44
46-46
: LGTM! Test updated to match new submit method signature.
74-76
: LGTM! Ignored tests properly updated.Also applies to: 92-95
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)
56-69
: LGTM! Improved type safety and validation.
87-87
: LGTM! Better error context with cause.
91-100
: LGTM! Clean encapsulation of Spark job building.
102-115
: Make Flink environment properties configurable.Move hardcoded values to configuration.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
212-213
: Consider defaulting mockSource to false.Setting mockSource=true by default could lead to accidental use of mock data in production.
251-251
: Constructor lookup could be more robust.Using
apply(0)
assumes only one constructor exists.flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (3)
48-48
: Handle possible null timestampAdd null check for
value.putRequest.tsMillis
to avoid exceptions.
62-62
: Make parallelism configurableAvoid hardcoding; make
parallelism
a configurable parameter.
129-129
: Make startPartition configurableAvoid hardcoding; make
startPartition
configurable or use the current date.
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
// TODO add useful configs | ||
flinkJob.runGroupByJob(env).addSink(new PrintSink) // TODO wire up a metrics sink / such | ||
env.execute(s"${flinkJob.groupByName}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add essential Flink configurations.
Missing critical Flink settings:
- Checkpointing
- Restart strategy
- State backend
- Watermark strategy
def buildApi(onlineClass: String, props: Map[String, String]): Api = { | ||
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader | ||
val cls = cl.loadClass(onlineClass) | ||
val constructor = cls.getConstructors.apply(0) | ||
val onlineImpl = constructor.newInstance(props) | ||
onlineImpl.asInstanceOf[Api] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for class loading.
Wrap class loading operations in try-catch to handle ClassNotFoundException and InstantiationException.
def buildApi(onlineClass: String, props: Map[String, String]): Api = {
+ try {
val cl = Thread.currentThread().getContextClassLoader
val cls = cl.loadClass(onlineClass)
val constructor = cls.getConstructors.apply(0)
val onlineImpl = constructor.newInstance(props)
onlineImpl.asInstanceOf[Api]
+ } catch {
+ case e: ClassNotFoundException =>
+ throw new IllegalArgumentException(s"Class $onlineClass not found", e)
+ case e: InstantiationException =>
+ throw new IllegalArgumentException(s"Failed to instantiate $onlineClass", e)
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def buildApi(onlineClass: String, props: Map[String, String]): Api = { | |
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader | |
val cls = cl.loadClass(onlineClass) | |
val constructor = cls.getConstructors.apply(0) | |
val onlineImpl = constructor.newInstance(props) | |
onlineImpl.asInstanceOf[Api] | |
} | |
def buildApi(onlineClass: String, props: Map[String, String]): Api = { | |
try { | |
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader | |
val cls = cl.loadClass(onlineClass) | |
val constructor = cls.getConstructors.apply(0) | |
val onlineImpl = constructor.newInstance(props) | |
onlineImpl.asInstanceOf[Api] | |
} catch { | |
case e: ClassNotFoundException => | |
throw new IllegalArgumentException(s"Class $onlineClass not found", e) | |
case e: InstantiationException => | |
throw new IllegalArgumentException(s"Failed to instantiate $onlineClass", e) | |
} | |
} |
## Summary This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes. Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a mocked GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up. Follow ups are called out in a few places in the code, listing the major ones out: * More prod grade Flink settings (things like checkpointing frequency, watermarking interval, ..) * Support for IDL encoders (will start with proto as that's what the Etsy folks need) * Read GroupByServingInfo from BigTable * Add support for Kafka source and leverage existing Chronon code for inferring parallelism etc from the Kafka topic As we fix some of these up, we can get rid of the TestFlinkJob and the mocked code / classes there. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Kicked off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query: ``` $ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING E2E_COUNT_STREAMING# test0#1736380800000 cf:value @ 2025/01/09-15:30:41.199000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" cf:value @ 2025/01/09-14:55:04.992000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" ... ``` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced job submission capabilities with support for Spark and Flink job types. - Introduced flexible configuration for job properties and parameters. - Added command-line argument parsing for Flink jobs. - Introduced utility classes for testing Flink jobs, including event generation and data streaming. - **Improvements** - Refined error logging across multiple components for better clarity. - Updated dependency management and assembly configurations for improved stability. - Improved source and encoder provider abstractions for Flink jobs. - **Infrastructure** - Updated build configurations for better dependency handling and assembly processes. - Removed unnecessary configuration entries from the submission configuration file. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes. Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a mocked GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up. Follow ups are called out in a few places in the code, listing the major ones out: * More prod grade Flink settings (things like checkpointing frequency, watermarking interval, ..) * Support for IDL encoders (will start with proto as that's what the Etsy folks need) * Read GroupByServingInfo from BigTable * Add support for Kafka source and leverage existing Chronon code for inferring parallelism etc from the Kafka topic As we fix some of these up, we can get rid of the TestFlinkJob and the mocked code / classes there. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Kicked off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query: ``` $ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING E2E_COUNT_STREAMING# test0#1736380800000 cf:value @ 2025/01/09-15:30:41.199000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" cf:value @ 2025/01/09-14:55:04.992000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" ... ``` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced job submission capabilities with support for Spark and Flink job types. - Introduced flexible configuration for job properties and parameters. - Added command-line argument parsing for Flink jobs. - Introduced utility classes for testing Flink jobs, including event generation and data streaming. - **Improvements** - Refined error logging across multiple components for better clarity. - Updated dependency management and assembly configurations for improved stability. - Improved source and encoder provider abstractions for Flink jobs. - **Infrastructure** - Updated build configurations for better dependency handling and assembly processes. - Removed unnecessary configuration entries from the submission configuration file. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes. Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a mocked GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up. Follow ups are called out in a few places in the code, listing the major ones out: * More prod grade Flink settings (things like checkpointing frequency, watermarking interval, ..) * Support for IDL encoders (will start with proto as that's what the our clients folks need) * Read GroupByServingInfo from BigTable * Add support for Kafka source and leverage existing Chronon code for inferring parallelism etc from the Kafka topic As we fix some of these up, we can get rid of the TestFlinkJob and the mocked code / classes there. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Kicked off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query: ``` $ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING E2E_COUNT_STREAMING# test0#1736380800000 cf:value @ 2025/01/09-15:30:41.199000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" cf:value @ 2025/01/09-14:55:04.992000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" ... ``` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced job submission capabilities with support for Spark and Flink job types. - Introduced flexible configuration for job properties and parameters. - Added command-line argument parsing for Flink jobs. - Introduced utility classes for testing Flink jobs, including event generation and data streaming. - **Improvements** - Refined error logging across multiple components for better clarity. - Updated dependency management and assembly configurations for improved stability. - Improved source and encoder provider abstractions for Flink jobs. - **Infrastructure** - Updated build configurations for better dependency handling and assembly processes. - Removed unnecessary configuration entries from the submission configuration file. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes. Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a mocked GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up. Follow ups are called out in a few places in the code, listing the major ones out: * More prod grade Flink settings (things like checkpointing frequency, watermarking interval, ..) * Support for IDL encoders (will start with proto as that's what the our clients folks need) * Read GroupByServingInfo from BigTable * Add support for Kafka source and leverage existing Chronon code for inferring parallelism etc from the Kafka topic As we fix some of these up, we can get rid of the TestFlinkJob and the mocked code / classes there. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Kicked off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query: ``` $ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING E2E_COUNT_STREAMING# test0#1736380800000 cf:value @ 2025/01/09-15:30:41.199000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" cf:value @ 2025/01/09-14:55:04.992000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" ... ``` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced job submission capabilities with support for Spark and Flink job types. - Introduced flexible configuration for job properties and parameters. - Added command-line argument parsing for Flink jobs. - Introduced utility classes for testing Flink jobs, including event generation and data streaming. - **Improvements** - Refined error logging across multiple components for better clarity. - Updated dependency management and assembly configurations for improved stability. - Improved source and encoder provider abstractions for Flink jobs. - **Infrastructure** - Updated build configurations for better dependency handling and assembly processes. - Removed unnecessary configuration entries from the submission configuration file. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes. Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a moour clientsed GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up. Follow ups are called out in a few places in the code, listing the major ones out: * More prod grade Flink settings (things like cheour clientspointing frequency, watermarking interval, ..) * Support for IDL encoders (will start with proto as that's what the our clients folks need) * Read GroupByServingInfo from BigTable * Add support for Kafka source and leverage existing Chronon code for inferring parallelism etc from the Kafka topic As we fix some of these up, we can get rid of the TestFlinkJob and the moour clientsed code / classes there. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Kiour clientsed off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query: ``` $ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING E2E_COUNT_STREAMING# test0#1736380800000 cf:value @ 2025/01/09-15:30:41.199000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" cf:value @ 2025/01/09-14:55:04.992000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" ... ``` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced job submission capabilities with support for Spark and Flink job types. - Introduced flexible configuration for job properties and parameters. - Added command-line argument parsing for Flink jobs. - Introduced utility classes for testing Flink jobs, including event generation and data streaming. - **Improvements** - Refined error logging across multiple components for better clarity. - Updated dependency management and assembly configurations for improved stability. - Improved source and encoder provider abstractions for Flink jobs. - **Infrastructure** - Updated build configurations for better dependency handling and assembly processes. - Removed unnecessary configuration entries from the submission configuration file. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes.
Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a mocked GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up.
Follow ups are called out in a few places in the code, listing the major ones out:
As we fix some of these up, we can get rid of the TestFlinkJob and the mocked code / classes there.
Checklist
Kicked off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query:
Summary by CodeRabbit
Release Notes
New Features
Improvements
Infrastructure