Skip to content

Commit 1c00dcb

Browse files
authored
Wire up Flink DataProc job submission (#189)
## Summary This PR adds support to submit Flink jobs to DataProc. There's a bit of refactoring of the existing submitter code to support the new Flink job type and its params. For Flink we need two jars - one is the main jar (flink-assembly*.jar) which contains our FlinkJob main() code. The second is the cloud_gcp jar which contains our BigTable classes. Flink requires some infra that doesn't currently exist in our canary like the source Kafka cluster. In the current version of this code, I've created a TestJob (in TestFlinkJob) that sets up an in-mem E2EEvent source along with a mocked GroupBy / GroupByServing info. The rest of the job (spark eval, avro conversion, BT kv store writes) are all wired up. Follow ups are called out in a few places in the code, listing the major ones out: * More prod grade Flink settings (things like checkpointing frequency, watermarking interval, ..) * Support for IDL encoders (will start with proto as that's what the Etsy folks need) * Read GroupByServingInfo from BigTable * Add support for Kafka source and leverage existing Chronon code for inferring parallelism etc from the Kafka topic As we fix some of these up, we can get rid of the TestFlinkJob and the mocked code / classes there. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Kicked off Flink jobs using the added test and confirmed that the job comes up, runs successfully and writes out data to BT that I can query: ``` $ cbt -project=canary-443022 -instance=zipline-canary-instance read GROUPBY_STREAMING E2E_COUNT_STREAMING# test0#1736380800000 cf:value @ 2025/01/09-15:30:41.199000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" cf:value @ 2025/01/09-14:55:04.992000 "\x02\x00\x00\x00\x00\x00\x00\x00\x00" ... ``` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced job submission capabilities with support for Spark and Flink job types. - Introduced flexible configuration for job properties and parameters. - Added command-line argument parsing for Flink jobs. - Introduced utility classes for testing Flink jobs, including event generation and data streaming. - **Improvements** - Refined error logging across multiple components for better clarity. - Updated dependency management and assembly configurations for improved stability. - Improved source and encoder provider abstractions for Flink jobs. - **Infrastructure** - Updated build configurations for better dependency handling and assembly processes. - Removed unnecessary configuration entries from the submission configuration file. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent d285791 commit 1c00dcb

File tree

11 files changed

+373
-30
lines changed

11 files changed

+373
-30
lines changed

build.sbt

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ val circe = Seq(
109109
val flink_all = Seq(
110110
"org.apache.flink" %% "flink-streaming-scala",
111111
"org.apache.flink" % "flink-metrics-dropwizard",
112-
"org.apache.flink" % "flink-clients"
112+
"org.apache.flink" % "flink-clients",
113+
"org.apache.flink" % "flink-yarn"
113114
).map(_ % flink_1_17)
114115

115116
val vertx_java = Seq(
@@ -213,6 +214,22 @@ lazy val flink = project
213214
.settings(
214215
libraryDependencies ++= spark_all,
215216
libraryDependencies ++= flink_all,
217+
assembly / assemblyMergeStrategy := {
218+
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat
219+
case "reference.conf" => MergeStrategy.concat
220+
case "application.conf" => MergeStrategy.concat
221+
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
222+
case _ => MergeStrategy.first
223+
},
224+
// Exclude Hadoop & Guava from the assembled JAR
225+
// Else we hit an error - IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its
226+
// superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
227+
// Or: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(...)
228+
// Or: 'com/google/protobuf/MapField' is not assignable to 'com/google/protobuf/MapFieldReflectionAccessor'
229+
assembly / assemblyExcludedJars := {
230+
val cp = (assembly / fullClasspath).value
231+
cp filter { jar => jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")}
232+
},
216233
libraryDependencies += "org.apache.flink" % "flink-test-utils" % flink_1_17 % Test excludeAll (
217234
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-api"),
218235
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-core"),
@@ -236,13 +253,24 @@ lazy val cloud_gcp = project
236253
libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M11",
237254
libraryDependencies += "org.json4s" %% "json4s-core" % "3.7.0-M11",
238255
libraryDependencies += "org.yaml" % "snakeyaml" % "2.3",
256+
libraryDependencies += "io.grpc" % "grpc-netty-shaded" % "1.62.2",
239257
libraryDependencies ++= avro,
240258
libraryDependencies ++= spark_all_provided,
241259
dependencyOverrides ++= jackson,
260+
// assembly merge settings to allow Flink jobs to kick off
261+
assembly / assemblyMergeStrategy := {
262+
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat // Add to include channel provider
263+
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
264+
case "reference.conf" => MergeStrategy.concat
265+
case "application.conf" => MergeStrategy.concat
266+
case _ => MergeStrategy.first
267+
},
242268
libraryDependencies += "org.mockito" % "mockito-core" % "5.12.0" % Test,
243269
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable-emulator" % "0.178.0" % Test,
244270
// force a newer version of reload4j to sidestep: https://security.snyk.io/vuln/SNYK-JAVA-CHQOSRELOAD4J-5731326
245-
dependencyOverrides += "ch.qos.reload4j" % "reload4j" % "1.2.25"
271+
dependencyOverrides ++= Seq(
272+
"ch.qos.reload4j" % "reload4j" % "1.2.25",
273+
)
246274
)
247275

248276
lazy val cloud_gcp_submitter = project

cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,3 @@
22
projectId: "canary-443022"
33
region: "us-central1"
44
clusterName: "canary-2"
5-
jarUri: "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"
6-
mainClass: "ai.chronon.spark.Driver"

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
package ai.chronon.integrations.cloud_gcp
22
import ai.chronon.spark.JobAuth
33
import ai.chronon.spark.JobSubmitter
4+
import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI
5+
import ai.chronon.spark.JobSubmitterConstants.JarURI
6+
import ai.chronon.spark.JobSubmitterConstants.MainClass
7+
import ai.chronon.spark.JobType
8+
import ai.chronon.spark.{FlinkJob => TypeFlinkJob}
9+
import ai.chronon.spark.{SparkJob => TypeSparkJob}
410
import com.google.api.gax.rpc.ApiException
511
import com.google.cloud.dataproc.v1._
612
import org.json4s._
@@ -14,9 +20,7 @@ import collection.JavaConverters._
1420
case class SubmitterConf(
1521
projectId: String,
1622
region: String,
17-
clusterName: String,
18-
jarUri: String,
19-
mainClass: String
23+
clusterName: String
2024
) {
2125

2226
def endPoint: String = s"${region}-dataproc.googleapis.com:443"
@@ -49,38 +53,67 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
4953
job.getDone
5054
}
5155

52-
override def submit(files: List[String], args: String*): String = {
53-
54-
val sparkJob = SparkJob
55-
.newBuilder()
56-
.setMainClass(conf.mainClass)
57-
.addJarFileUris(conf.jarUri)
58-
.addAllFileUris(files.asJava)
59-
.addAllArgs(args.toIterable.asJava)
60-
.build()
56+
override def submit(jobType: JobType,
57+
jobProperties: Map[String, String],
58+
files: List[String],
59+
args: String*): String = {
60+
val mainClass = jobProperties.getOrElse(MainClass, throw new RuntimeException("Main class not found"))
61+
val jarUri = jobProperties.getOrElse(JarURI, throw new RuntimeException("Jar URI not found"))
62+
63+
val jobBuilder = jobType match {
64+
case TypeSparkJob => buildSparkJob(mainClass, jarUri, files, args: _*)
65+
case TypeFlinkJob =>
66+
val mainJarUri =
67+
jobProperties.getOrElse(FlinkMainJarURI, throw new RuntimeException(s"Missing expected $FlinkMainJarURI"))
68+
buildFlinkJob(mainClass, mainJarUri, jarUri, args: _*)
69+
}
6170

6271
val jobPlacement = JobPlacement
6372
.newBuilder()
6473
.setClusterName(conf.clusterName)
6574
.build()
6675

6776
try {
68-
val job = Job
69-
.newBuilder()
77+
val job = jobBuilder
7078
.setReference(jobReference)
7179
.setPlacement(jobPlacement)
72-
.setSparkJob(sparkJob)
7380
.build()
7481

7582
val submittedJob = jobControllerClient.submitJob(conf.projectId, conf.region, job)
7683
submittedJob.getReference.getJobId
7784

7885
} catch {
7986
case e: ApiException =>
80-
throw new RuntimeException(s"Failed to submit job: ${e.getMessage}")
87+
throw new RuntimeException(s"Failed to submit job: ${e.getMessage}", e)
8188
}
8289
}
8390

91+
private def buildSparkJob(mainClass: String, jarUri: String, files: List[String], args: String*): Job.Builder = {
92+
val sparkJob = SparkJob
93+
.newBuilder()
94+
.setMainClass(mainClass)
95+
.addJarFileUris(jarUri)
96+
.addAllFileUris(files.asJava)
97+
.addAllArgs(args.toIterable.asJava)
98+
.build()
99+
Job.newBuilder().setSparkJob(sparkJob)
100+
}
101+
102+
private def buildFlinkJob(mainClass: String, mainJarUri: String, jarUri: String, args: String*): Job.Builder = {
103+
val envProps =
104+
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")
105+
106+
val flinkJob = FlinkJob
107+
.newBuilder()
108+
.setMainClass(mainClass)
109+
.setMainJarFileUri(mainJarUri)
110+
.putAllProperties(envProps.asJava)
111+
.addJarFileUris(jarUri)
112+
.addAllArgs(args.toIterable.asJava)
113+
.build()
114+
Job.newBuilder().setFlinkJob(flinkJob)
115+
}
116+
84117
def jobReference: JobReference = JobReference.newBuilder().build()
85118
}
86119

@@ -146,14 +179,14 @@ object DataprocSubmitter {
146179
val submitterConf = SubmitterConf(
147180
projectId,
148181
region,
149-
clusterName,
150-
chrononJarUri,
151-
"ai.chronon.spark.Driver"
182+
clusterName
152183
)
153184

154185
val a = DataprocSubmitter(submitterConf)
155186

156187
val jobId = a.submit(
188+
TypeSparkJob,
189+
Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri),
157190
gcsFiles.toList,
158191
userArgs: _*
159192
)

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package ai.chronon.integrations.cloud_gcp
22

3+
import ai.chronon.spark
4+
import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI
5+
import ai.chronon.spark.JobSubmitterConstants.JarURI
6+
import ai.chronon.spark.JobSubmitterConstants.MainClass
37
import com.google.api.gax.rpc.UnaryCallable
48
import com.google.cloud.dataproc.v1._
59
import com.google.cloud.dataproc.v1.stub.JobControllerStub
@@ -37,21 +41,39 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar {
3741

3842
val submitter = new DataprocSubmitter(
3943
mockJobControllerClient,
40-
SubmitterConf("test-project", "test-region", "test-cluster", "test-jar-uri", "test-main-class"))
44+
SubmitterConf("test-project", "test-region", "test-cluster"))
4145

42-
val submittedJobId = submitter.submit(List.empty)
46+
val submittedJobId = submitter.submit(spark.SparkJob, Map(MainClass -> "test-main-class", JarURI -> "test-jar-uri"), List.empty)
4347
assertEquals(submittedJobId, jobId)
4448
}
4549

4650
test("Verify classpath with spark-bigquery-connector") {
4751
BigQueryUtilScala.validateScalaVersionCompatibility()
4852
}
4953

54+
ignore("test flink job locally") {
55+
val submitter = DataprocSubmitter()
56+
val submittedJobId =
57+
submitter.submit(spark.FlinkJob,
58+
Map(MainClass -> "ai.chronon.flink.FlinkJob",
59+
FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar",
60+
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"),
61+
List.empty,
62+
"--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl",
63+
"--groupby-name=e2e-count",
64+
"-ZGCP_PROJECT_ID=bigtable-project-id",
65+
"-ZGCP_INSTANCE_ID=bigtable-instance-id")
66+
println(submittedJobId)
67+
}
68+
5069
ignore("Used to iterate locally. Do not enable this in CI/CD!") {
5170

5271
val submitter = DataprocSubmitter()
5372
val submittedJobId =
5473
submitter.submit(
74+
spark.SparkJob,
75+
Map(MainClass -> "ai.chronon.spark.Driver",
76+
JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"),
5577
List("gs://zipline-jars/training_set.v1",
5678
"gs://zipline-jars/dataproc-submitter-conf.yaml",
5779
"gs://zipline-jars/additional-confs.yaml"),
@@ -67,7 +89,11 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar {
6789

6890
val submitter = DataprocSubmitter()
6991
val submittedJobId =
70-
submitter.submit(List.empty,
92+
submitter.submit(
93+
spark.SparkJob,
94+
Map(MainClass -> "ai.chronon.spark.Driver",
95+
JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"),
96+
List.empty,
7197
"groupby-upload-bulk-load",
7298
"-ZGCP_PROJECT_ID=bigtable-project-id",
7399
"-ZGCP_INSTANCE_ID=bigtable-instance-id",

flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String)
119119
// in the KVStore - we log the exception and skip the object to
120120
// not fail the app
121121
errorCounter.inc()
122-
logger.error(s"Caught exception writing to KVStore for object: $input - $exception")
122+
logger.error(s"Caught exception writing to KVStore for object: $input", exception)
123123
resultFuture.complete(util.Arrays.asList[WriteResponse](WriteResponse(input, status = false)))
124124
}
125125
}

flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
108108
case e: Exception =>
109109
// To improve availability, we don't rethrow the exception. We just drop the event
110110
// and track the errors in a metric. Alerts should be set up on this metric.
111-
logger.error(s"Error converting to Avro bytes - $e")
111+
logger.error("Error converting to Avro bytes", e)
112112
eventProcessingErrorCounter.inc()
113113
avroConversionErrorCounter.inc()
114114
}

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import ai.chronon.flink.window.FlinkRowAggProcessFunction
99
import ai.chronon.flink.window.FlinkRowAggregationFunction
1010
import ai.chronon.flink.window.KeySelector
1111
import ai.chronon.flink.window.TimestampedTile
12+
import ai.chronon.online.Api
1213
import ai.chronon.online.GroupByServingInfoParsed
1314
import ai.chronon.online.KVStore.PutRequest
1415
import ai.chronon.online.SparkConversions
@@ -22,6 +23,9 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
2223
import org.apache.flink.streaming.api.windowing.time.Time
2324
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
2425
import org.apache.spark.sql.Encoder
26+
import org.rogach.scallop.ScallopConf
27+
import org.rogach.scallop.ScallopOption
28+
import org.rogach.scallop.Serialization
2529
import org.slf4j.LoggerFactory
2630

2731
/**
@@ -196,3 +200,56 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
196200
)
197201
}
198202
}
203+
204+
object FlinkJob {
205+
// Pull in the Serialization trait to sidestep: https://github.com/scallop/scallop/issues/137
206+
class JobArgs(args: Seq[String]) extends ScallopConf(args) with Serialization {
207+
val onlineClass: ScallopOption[String] =
208+
opt[String](required = true,
209+
descr = "Fully qualified Online.Api based class. We expect the jar to be on the class path")
210+
val groupbyName: ScallopOption[String] =
211+
opt[String](required = true, descr = "The name of the groupBy to process")
212+
val mockSource: ScallopOption[Boolean] =
213+
opt[Boolean](required = false, descr = "Use a mocked data source instead of a real source", default = Some(true))
214+
215+
val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API / KV Store")
216+
217+
verify()
218+
}
219+
220+
def main(args: Array[String]): Unit = {
221+
val jobArgs = new JobArgs(args)
222+
jobArgs.groupbyName()
223+
val onlineClassName = jobArgs.onlineClass()
224+
val props = jobArgs.apiProps.map(identity)
225+
val useMockedSource = jobArgs.mockSource()
226+
227+
val api = buildApi(onlineClassName, props)
228+
val flinkJob =
229+
if (useMockedSource) {
230+
// We will yank this conditional block when we wire up our real sources etc.
231+
TestFlinkJob.buildTestFlinkJob(api)
232+
} else {
233+
// TODO - what we need to do when we wire this up for real
234+
// lookup groupByServingInfo by groupByName from the kv store
235+
// based on the topic type (e.g. kafka / pubsub) and the schema class name:
236+
// 1. lookup schema object using SchemaProvider (e.g SchemaRegistry / Jar based)
237+
// 2. Create the appropriate Encoder for the given schema type
238+
// 3. Invoke the appropriate source provider to get the source, encoder, parallelism
239+
throw new IllegalArgumentException("We don't support non-mocked sources like Kafka / PubSub yet!")
240+
}
241+
242+
val env = StreamExecutionEnvironment.getExecutionEnvironment
243+
// TODO add useful configs
244+
flinkJob.runGroupByJob(env).addSink(new PrintSink) // TODO wire up a metrics sink / such
245+
env.execute(s"${flinkJob.groupByName}")
246+
}
247+
248+
def buildApi(onlineClass: String, props: Map[String, String]): Api = {
249+
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
250+
val cls = cl.loadClass(onlineClass)
251+
val constructor = cls.getConstructors.apply(0)
252+
val onlineImpl = constructor.newInstance(props)
253+
onlineImpl.asInstanceOf[Api]
254+
}
255+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package ai.chronon.flink
2+
3+
import ai.chronon.online.GroupByServingInfoParsed
4+
import org.apache.spark.sql.Encoder
5+
6+
/**
7+
* SourceProvider is an abstract class that provides a way to build a source for a Flink job.
8+
* It takes the groupByServingInfo as an argument and based on the configured GB details, configures
9+
* the Flink source (e.g. Kafka or PubSub) with the right parallelism etc.
10+
*/
11+
abstract class SourceProvider[T](maybeGroupByServingInfoParsed: Option[GroupByServingInfoParsed]) {
12+
// Returns a tuple of the source, parallelism
13+
def buildSource(): (FlinkSource[T], Int)
14+
}
15+
16+
/**
17+
* EncoderProvider is an abstract class that provides a way to build an Spark encoder for a Flink job.
18+
* These encoders are used in the SparkExprEval Flink function to convert the incoming stream into types
19+
* that are amenable for tiled / untiled processing.
20+
*/
21+
abstract class EncoderProvider[T] {
22+
def buildEncoder(): Encoder[T]
23+
}

flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class SparkExpressionEvalFn[T](encoder: Encoder[T], groupBy: GroupBy) extends Ri
109109
case e: Exception =>
110110
// To improve availability, we don't rethrow the exception. We just drop the event
111111
// and track the errors in a metric. Alerts should be set up on this metric.
112-
logger.error(s"Error evaluating Spark expression - $e")
112+
logger.error("Error evaluating Spark expression", e)
113113
exprEvalErrorCounter.inc()
114114
}
115115
}

0 commit comments

Comments
 (0)