-
Notifications
You must be signed in to change notification settings - Fork 0
Wire up Flink DataProc job submission #189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2c627a5
8b898b9
cc23b7b
cf66f1c
b6e0bc1
3006552
530cfe2
634a7d4
3af944e
d3f66b1
3571efb
a011b98
b62401a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,12 @@ | ||||||||||||
package ai.chronon.integrations.cloud_gcp | ||||||||||||
import ai.chronon.spark.JobAuth | ||||||||||||
import ai.chronon.spark.JobSubmitter | ||||||||||||
import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI | ||||||||||||
import ai.chronon.spark.JobSubmitterConstants.JarURI | ||||||||||||
import ai.chronon.spark.JobSubmitterConstants.MainClass | ||||||||||||
import ai.chronon.spark.JobType | ||||||||||||
import ai.chronon.spark.{FlinkJob => TypeFlinkJob} | ||||||||||||
import ai.chronon.spark.{SparkJob => TypeSparkJob} | ||||||||||||
import com.google.api.gax.rpc.ApiException | ||||||||||||
import com.google.cloud.dataproc.v1._ | ||||||||||||
import org.json4s._ | ||||||||||||
|
@@ -14,9 +20,7 @@ import collection.JavaConverters._ | |||||||||||
case class SubmitterConf( | ||||||||||||
projectId: String, | ||||||||||||
region: String, | ||||||||||||
clusterName: String, | ||||||||||||
jarUri: String, | ||||||||||||
mainClass: String | ||||||||||||
clusterName: String | ||||||||||||
) { | ||||||||||||
|
||||||||||||
def endPoint: String = s"${region}-dataproc.googleapis.com:443" | ||||||||||||
|
@@ -49,38 +53,67 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte | |||||||||||
job.getDone | ||||||||||||
} | ||||||||||||
|
||||||||||||
override def submit(files: List[String], args: String*): String = { | ||||||||||||
|
||||||||||||
val sparkJob = SparkJob | ||||||||||||
.newBuilder() | ||||||||||||
.setMainClass(conf.mainClass) | ||||||||||||
.addJarFileUris(conf.jarUri) | ||||||||||||
.addAllFileUris(files.asJava) | ||||||||||||
.addAllArgs(args.toIterable.asJava) | ||||||||||||
.build() | ||||||||||||
override def submit(jobType: JobType, | ||||||||||||
jobProperties: Map[String, String], | ||||||||||||
files: List[String], | ||||||||||||
args: String*): String = { | ||||||||||||
val mainClass = jobProperties.getOrElse(MainClass, throw new RuntimeException("Main class not found")) | ||||||||||||
val jarUri = jobProperties.getOrElse(JarURI, throw new RuntimeException("Jar URI not found")) | ||||||||||||
|
||||||||||||
val jobBuilder = jobType match { | ||||||||||||
case TypeSparkJob => buildSparkJob(mainClass, jarUri, files, args: _*) | ||||||||||||
case TypeFlinkJob => | ||||||||||||
val mainJarUri = | ||||||||||||
jobProperties.getOrElse(FlinkMainJarURI, throw new RuntimeException(s"Missing expected $FlinkMainJarURI")) | ||||||||||||
buildFlinkJob(mainClass, mainJarUri, jarUri, args: _*) | ||||||||||||
} | ||||||||||||
|
||||||||||||
val jobPlacement = JobPlacement | ||||||||||||
.newBuilder() | ||||||||||||
.setClusterName(conf.clusterName) | ||||||||||||
.build() | ||||||||||||
|
||||||||||||
try { | ||||||||||||
val job = Job | ||||||||||||
.newBuilder() | ||||||||||||
val job = jobBuilder | ||||||||||||
.setReference(jobReference) | ||||||||||||
.setPlacement(jobPlacement) | ||||||||||||
.setSparkJob(sparkJob) | ||||||||||||
.build() | ||||||||||||
|
||||||||||||
val submittedJob = jobControllerClient.submitJob(conf.projectId, conf.region, job) | ||||||||||||
submittedJob.getReference.getJobId | ||||||||||||
|
||||||||||||
} catch { | ||||||||||||
case e: ApiException => | ||||||||||||
throw new RuntimeException(s"Failed to submit job: ${e.getMessage}") | ||||||||||||
throw new RuntimeException(s"Failed to submit job: ${e.getMessage}", e) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
private def buildSparkJob(mainClass: String, jarUri: String, files: List[String], args: String*): Job.Builder = { | ||||||||||||
val sparkJob = SparkJob | ||||||||||||
.newBuilder() | ||||||||||||
.setMainClass(mainClass) | ||||||||||||
.addJarFileUris(jarUri) | ||||||||||||
.addAllFileUris(files.asJava) | ||||||||||||
.addAllArgs(args.toIterable.asJava) | ||||||||||||
.build() | ||||||||||||
Job.newBuilder().setSparkJob(sparkJob) | ||||||||||||
} | ||||||||||||
|
||||||||||||
private def buildFlinkJob(mainClass: String, mainJarUri: String, jarUri: String, args: String*): Job.Builder = { | ||||||||||||
val envProps = | ||||||||||||
Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST") | ||||||||||||
|
||||||||||||
Comment on lines
+103
to
+105
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Make Flink environment properties configurable Move hardcoded values to configuration. - val envProps =
- Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")
+ val envProps = jobProperties.getOrElse("flink.properties",
+ Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST")) 📝 Committable suggestion
Suggested change
|
||||||||||||
val flinkJob = FlinkJob | ||||||||||||
.newBuilder() | ||||||||||||
.setMainClass(mainClass) | ||||||||||||
.setMainJarFileUri(mainJarUri) | ||||||||||||
.putAllProperties(envProps.asJava) | ||||||||||||
.addJarFileUris(jarUri) | ||||||||||||
.addAllArgs(args.toIterable.asJava) | ||||||||||||
.build() | ||||||||||||
Job.newBuilder().setFlinkJob(flinkJob) | ||||||||||||
} | ||||||||||||
|
||||||||||||
def jobReference: JobReference = JobReference.newBuilder().build() | ||||||||||||
} | ||||||||||||
|
||||||||||||
|
@@ -146,14 +179,14 @@ object DataprocSubmitter { | |||||||||||
val submitterConf = SubmitterConf( | ||||||||||||
projectId, | ||||||||||||
region, | ||||||||||||
clusterName, | ||||||||||||
chrononJarUri, | ||||||||||||
"ai.chronon.spark.Driver" | ||||||||||||
clusterName | ||||||||||||
) | ||||||||||||
|
||||||||||||
val a = DataprocSubmitter(submitterConf) | ||||||||||||
|
||||||||||||
val jobId = a.submit( | ||||||||||||
TypeSparkJob, | ||||||||||||
Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri), | ||||||||||||
gcsFiles.toList, | ||||||||||||
userArgs: _* | ||||||||||||
) | ||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ import ai.chronon.flink.window.FlinkRowAggProcessFunction | |||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.flink.window.FlinkRowAggregationFunction | ||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.flink.window.KeySelector | ||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.flink.window.TimestampedTile | ||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.Api | ||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.GroupByServingInfoParsed | ||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.KVStore.PutRequest | ||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.SparkConversions | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -22,6 +23,9 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner | |||||||||||||||||||||||||||||||||||||||||||
import org.apache.flink.streaming.api.windowing.time.Time | ||||||||||||||||||||||||||||||||||||||||||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | ||||||||||||||||||||||||||||||||||||||||||||
import org.apache.spark.sql.Encoder | ||||||||||||||||||||||||||||||||||||||||||||
import org.rogach.scallop.ScallopConf | ||||||||||||||||||||||||||||||||||||||||||||
import org.rogach.scallop.ScallopOption | ||||||||||||||||||||||||||||||||||||||||||||
import org.rogach.scallop.Serialization | ||||||||||||||||||||||||||||||||||||||||||||
import org.slf4j.LoggerFactory | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -196,3 +200,56 @@ class FlinkJob[T](eventSrc: FlinkSource[T], | |||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
object FlinkJob { | ||||||||||||||||||||||||||||||||||||||||||||
// Pull in the Serialization trait to sidestep: https://github.com/scallop/scallop/issues/137 | ||||||||||||||||||||||||||||||||||||||||||||
class JobArgs(args: Seq[String]) extends ScallopConf(args) with Serialization { | ||||||||||||||||||||||||||||||||||||||||||||
val onlineClass: ScallopOption[String] = | ||||||||||||||||||||||||||||||||||||||||||||
opt[String](required = true, | ||||||||||||||||||||||||||||||||||||||||||||
descr = "Fully qualified Online.Api based class. We expect the jar to be on the class path") | ||||||||||||||||||||||||||||||||||||||||||||
val groupbyName: ScallopOption[String] = | ||||||||||||||||||||||||||||||||||||||||||||
opt[String](required = true, descr = "The name of the groupBy to process") | ||||||||||||||||||||||||||||||||||||||||||||
val mockSource: ScallopOption[Boolean] = | ||||||||||||||||||||||||||||||||||||||||||||
opt[Boolean](required = false, descr = "Use a mocked data source instead of a real source", default = Some(true)) | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API / KV Store") | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
verify() | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
def main(args: Array[String]): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||
val jobArgs = new JobArgs(args) | ||||||||||||||||||||||||||||||||||||||||||||
jobArgs.groupbyName() | ||||||||||||||||||||||||||||||||||||||||||||
val onlineClassName = jobArgs.onlineClass() | ||||||||||||||||||||||||||||||||||||||||||||
val props = jobArgs.apiProps.map(identity) | ||||||||||||||||||||||||||||||||||||||||||||
val useMockedSource = jobArgs.mockSource() | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
val api = buildApi(onlineClassName, props) | ||||||||||||||||||||||||||||||||||||||||||||
val flinkJob = | ||||||||||||||||||||||||||||||||||||||||||||
if (useMockedSource) { | ||||||||||||||||||||||||||||||||||||||||||||
// We will yank this conditional block when we wire up our real sources etc. | ||||||||||||||||||||||||||||||||||||||||||||
TestFlinkJob.buildTestFlinkJob(api) | ||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||
// TODO - what we need to do when we wire this up for real | ||||||||||||||||||||||||||||||||||||||||||||
// lookup groupByServingInfo by groupByName from the kv store | ||||||||||||||||||||||||||||||||||||||||||||
// based on the topic type (e.g. kafka / pubsub) and the schema class name: | ||||||||||||||||||||||||||||||||||||||||||||
// 1. lookup schema object using SchemaProvider (e.g SchemaRegistry / Jar based) | ||||||||||||||||||||||||||||||||||||||||||||
// 2. Create the appropriate Encoder for the given schema type | ||||||||||||||||||||||||||||||||||||||||||||
// 3. Invoke the appropriate source provider to get the source, encoder, parallelism | ||||||||||||||||||||||||||||||||||||||||||||
throw new IllegalArgumentException("We don't support non-mocked sources like Kafka / PubSub yet!") | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||||||||||||||||||||||||||||||||||||||||||||
// TODO add useful configs | ||||||||||||||||||||||||||||||||||||||||||||
flinkJob.runGroupByJob(env).addSink(new PrintSink) // TODO wire up a metrics sink / such | ||||||||||||||||||||||||||||||||||||||||||||
env.execute(s"${flinkJob.groupByName}") | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+242
to
+245
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add essential Flink configurations. Missing critical Flink settings:
|
||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
def buildApi(onlineClass: String, props: Map[String, String]): Api = { | ||||||||||||||||||||||||||||||||||||||||||||
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader | ||||||||||||||||||||||||||||||||||||||||||||
val cls = cl.loadClass(onlineClass) | ||||||||||||||||||||||||||||||||||||||||||||
val constructor = cls.getConstructors.apply(0) | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Constructor lookup could be more robust. Using - val constructor = cls.getConstructors.apply(0)
+ val constructor = cls.getConstructors.find(_.getParameterCount == 1)
+ .getOrElse(throw new IllegalArgumentException(s"No suitable constructor found for $onlineClass")) 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||
val onlineImpl = constructor.newInstance(props) | ||||||||||||||||||||||||||||||||||||||||||||
onlineImpl.asInstanceOf[Api] | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+248
to
+254
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for class loading. Wrap class loading operations in try-catch to handle ClassNotFoundException and InstantiationException. def buildApi(onlineClass: String, props: Map[String, String]): Api = {
+ try {
val cl = Thread.currentThread().getContextClassLoader
val cls = cl.loadClass(onlineClass)
val constructor = cls.getConstructors.apply(0)
val onlineImpl = constructor.newInstance(props)
onlineImpl.asInstanceOf[Api]
+ } catch {
+ case e: ClassNotFoundException =>
+ throw new IllegalArgumentException(s"Class $onlineClass not found", e)
+ case e: InstantiationException =>
+ throw new IllegalArgumentException(s"Failed to instantiate $onlineClass", e)
+ }
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package ai.chronon.flink | ||
|
||
import ai.chronon.online.GroupByServingInfoParsed | ||
import org.apache.spark.sql.Encoder | ||
|
||
/** | ||
* SourceProvider is an abstract class that provides a way to build a source for a Flink job. | ||
* It takes the groupByServingInfo as an argument and based on the configured GB details, configures | ||
* the Flink source (e.g. Kafka or PubSub) with the right parallelism etc. | ||
*/ | ||
abstract class SourceProvider[T](maybeGroupByServingInfoParsed: Option[GroupByServingInfoParsed]) { | ||
// Returns a tuple of the source, parallelism | ||
def buildSource(): (FlinkSource[T], Int) | ||
} | ||
|
||
/** | ||
* EncoderProvider is an abstract class that provides a way to build an Spark encoder for a Flink job. | ||
* These encoders are used in the SparkExprEval Flink function to convert the incoming stream into types | ||
* that are amenable for tiled / untiled processing. | ||
*/ | ||
abstract class EncoderProvider[T] { | ||
def buildEncoder(): Encoder[T] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so pretty much we don't trust any of the transitives matching these prefixes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah for each of those there's a submit / runtime failure of the jobs (even with user jars first..)