Skip to content

Add support to drive Kafka avro load for streaming job testing #271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ val circe = Seq(
val flink_all = Seq(
"org.apache.flink" % "flink-metrics-dropwizard",
"org.apache.flink" % "flink-clients",
"org.apache.flink" % "flink-yarn"
"org.apache.flink" % "flink-yarn",
"org.apache.flink" % "flink-connector-kafka",
"org.apache.flink" % "flink-avro",
).map(_ % flink_1_17)

val vertx_java = Seq(
Expand Down Expand Up @@ -220,6 +222,8 @@ lazy val flink = project
// mark the flink-streaming scala as provided as otherwise we end up with some extra Flink classes in our jar
// and errors at runtime like: java.io.InvalidClassException: org.apache.flink.streaming.api.scala.DataStream$$anon$1; local class incompatible
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % flink_1_17 % "provided",
libraryDependencies += "org.apache.flink" % "flink-connector-files" % flink_1_17 % "provided",
libraryDependencies += "org.apache.spark" %% "spark-avro" % spark_3_5,
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
Expand All @@ -239,13 +243,38 @@ lazy val flink = project
.startsWith("protobuf")
}
},
assembly / packageOptions += Package.ManifestAttributes(
("Main-Class", "ai.chronon.flink.FlinkJob")
),
libraryDependencies += "org.apache.flink" % "flink-test-utils" % flink_1_17 % Test excludeAll (
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-api"),
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-core"),
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-slf4j-impl")
)
)

// We carve out a separate module for the Flink Kafka ingestion job. This isn't included in the main root module list
// for now as we use this for testing adhoc using: sbt "project flink_kafka_ingest" assembly
lazy val flink_kafka_ingest = project
.dependsOn(flink)
.settings(
// Exclude Hadoop & Guava from the assembled JAR
// Else we hit an error - IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its
// superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
// Or: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(...)
// Or: 'com/google/protobuf/MapField' is not assignable to 'com/google/protobuf/MapFieldReflectionAccessor'
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { jar =>
jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName
.startsWith("protobuf")
}
},
assembly / packageOptions += Package.ManifestAttributes(
("Main-Class", "ai.chronon.flink.FlinkKafkaBeaconEventDriver")
),
)

// GCP requires java 11, can't cross compile higher

javacOptions ++= Seq("-source", "11", "-target", "11")
Expand All @@ -266,6 +295,9 @@ lazy val cloud_gcp = project
libraryDependencies += "org.json4s" %% "json4s-core" % "3.7.0-M11",
libraryDependencies += "org.yaml" % "snakeyaml" % "2.3",
libraryDependencies += "io.grpc" % "grpc-netty-shaded" % "1.62.2",
libraryDependencies += "com.google.cloud.hosted.kafka" % "managed-kafka-auth-login-handler" % "1.0.3" excludeAll (
ExclusionRule(organization = "io.confluent", name = "kafka-schema-registry-client")
),
libraryDependencies ++= avro,
libraryDependencies ++= spark_all_provided,
dependencyOverrides ++= jackson,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
}

it should "test flink job locally" ignore {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(spark.FlinkJob,
submitter.submit(spark.FlinkJob,
Map(MainClass -> "ai.chronon.flink.FlinkJob",
FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar",
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"),
Expand All @@ -64,48 +63,61 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
"--groupby-name=e2e-count",
"-ZGCP_PROJECT_ID=bigtable-project-id",
"-ZGCP_INSTANCE_ID=bigtable-instance-id")
println(submittedJobId)

}

it should "Used to iterate locally. Do not enable this in CI/CD!" ignore {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(
spark.SparkJob,
Map(MainClass -> "ai.chronon.spark.Driver",
JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"),
List("gs://zipline-jars/training_set.v1",
"gs://zipline-jars/dataproc-submitter-conf.yaml",
"gs://zipline-jars/additional-confs.yaml"),
"join",
"--end-date=2024-12-10",
"--additional-conf-path=additional-confs.yaml",
"--conf-path=training_set.v1"
)
println(submittedJobId)
}

it should "Used to test GBU bulk load locally. Do not enable this in CI/CD!" ignore {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(
spark.SparkJob,
Map(MainClass -> "ai.chronon.spark.Driver",
JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"),
List.empty,
"groupby-upload-bulk-load",
"-ZGCP_PROJECT_ID=bigtable-project-id",
"-ZGCP_INSTANCE_ID=bigtable-instance-id",
"--online-jar=cloud_gcp-assembly-0.1.0-SNAPSHOT.jar",
"--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl",
"--src-offline-table=data.test_gbu",
"--groupby-name=quickstart.purchases.v1",
"--partition-string=2024-01-01")
println(submittedJobId)
assertEquals(submittedJobId, "mock-job-id")
it should "test flink kafka ingest job locally" ignore {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(spark.FlinkJob,
Map(MainClass -> "ai.chronon.flink.FlinkKafkaBeaconEventDriver",
FlinkMainJarURI -> "gs://zipline-jars/flink_kafka_ingest-assembly-0.1.0-SNAPSHOT.jar",
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"),
List.empty,
"--kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",
"--kafka-topic=test-beacon-main",
"--data-file-name=gs://zl-warehouse/beacon_events/beacon-output.avro",
)
println(submittedJobId)
}

it should "Used to iterate locally. Do not enable this in CI/CD!" ignore {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(
spark.SparkJob,
Map(MainClass -> "ai.chronon.spark.Driver",
JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"),
List("gs://zipline-jars/training_set.v1",
"gs://zipline-jars/dataproc-submitter-conf.yaml",
"gs://zipline-jars/additional-confs.yaml"),
"join",
"--end-date=2024-12-10",
"--additional-conf-path=additional-confs.yaml",
"--conf-path=training_set.v1"
)
println(submittedJobId)
}

it should "Used to test GBU bulk load locally. Do not enable this in CI/CD!" ignore {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(
spark.SparkJob,
Map(MainClass -> "ai.chronon.spark.Driver",
JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"),
List.empty,
"groupby-upload-bulk-load",
"-ZGCP_PROJECT_ID=bigtable-project-id",
"-ZGCP_INSTANCE_ID=bigtable-instance-id",
"--online-jar=cloud_gcp-assembly-0.1.0-SNAPSHOT.jar",
"--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl",
"--src-offline-table=data.test_gbu",
"--groupby-name=quickstart.purchases.v1",
"--partition-string=2024-01-01")
println(submittedJobId)
assertEquals(submittedJobId, "mock-job-id")
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package ai.chronon.flink

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.flink.connector.kafka.sink.KafkaSink
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.avro.AvroInputFormat
import org.apache.flink.formats.avro.AvroSerializationSchema
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.kafka.clients.producer.ProducerConfig
import org.rogach.scallop.ScallopConf
import org.rogach.scallop.ScallopOption
import org.rogach.scallop.Serialization

// Canary test app that can point to a source data file and will emit an event to Kafka periodically with an updated timestamp
object FlinkKafkaBeaconEventDriver {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this isn't going to be part of the main Driver.scala why don't we just make these regular args and kick it off from the unit test framework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am kicking off from the unit test framework (see "test flink kafka ingest job locally"). Did you have something else in mind? Later state might be to trigger this in our 'CI' setup or such but for now this is just manually triggered to create the stream.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, judging by the use of scallopconf i'm assuming you'll want to run it as a CLI command at some point? Should work - just that we won't have any of this hooked up elsewhere just yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I added the scallop conf to allow us to tweak and pass params in the future. For now it could all be hardcoded in the app for sure.

// Pull in the Serialization trait to sidestep: https://github.com/scallop/scallop/issues/137
class JobArgs(args: Seq[String]) extends ScallopConf(args) with Serialization {
val dataFileName: ScallopOption[String] =
opt[String](required = true, descr = "Name of the file on GCS to read data from")
val kafkaBootstrap: ScallopOption[String] =
opt[String](required = true, descr = "Kafka bootstrap server in host:port format")
val kafkaTopic: ScallopOption[String] = opt[String](required = true, descr = "Kafka topic to write to")
val eventDelayMillis: ScallopOption[Int] =
opt[Int](required = false,
descr = "Delay to use between event publishes (dictates the eps)",
default = Some(1000))

verify()
}

def main(args: Array[String]): Unit = {
val jobArgs = new JobArgs(args)
val dataFileName = jobArgs.dataFileName()
val bootstrapServers = jobArgs.kafkaBootstrap()
val kafkaTopic = jobArgs.kafkaTopic()
val eventDelayMillis = jobArgs.eventDelayMillis()

val schema = buildAvroSchema()
// Configure GCS source
val avroFormat = new AvroInputFormat[GenericRecord](
new Path(dataFileName),
classOf[GenericRecord]
)

implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema)

// Set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig
.enableForceKryo() // use kryo for complex types that Flink's default ser system doesn't support (e.g case classes)
env.getConfig.enableGenericTypes() // more permissive type checks
env.addDefaultKryoSerializer(classOf[Schema], classOf[AvroKryoSerializerUtils.AvroSchemaSerializer])

val stream = env
.createInput(avroFormat)
.setParallelism(1)

val transformedStream: DataStream[GenericRecord] = stream
.map(new DelayedSourceTransformFn(eventDelayMillis))
.setParallelism(stream.parallelism)

// Configure Kafka sink
val serializationSchema = KafkaRecordSerializationSchema
.builder()
.setTopic(kafkaTopic)
.setValueSerializationSchema(AvroSerializationSchema.forGeneric(schema))
.build()

val producerConfig = new java.util.Properties()
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "3")
producerConfig.setProperty("security.protocol", "SASL_SSL")
producerConfig.setProperty("sasl.mechanism", "OAUTHBEARER")
producerConfig.setProperty("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
producerConfig.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")

val kafkaSink = KafkaSink
.builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(serializationSchema)
.setKafkaProducerConfig(producerConfig)
.build()

// Write to Kafka
transformedStream
.sinkTo(kafkaSink)
.setParallelism(transformedStream.parallelism)

// Execute program
env.execute("Periodic Kafka Beacon Data Producer")
}

def buildAvroSchema(): Schema = {
new Schema.Parser().parse(
"""
{
"type": "record",
"name": "Beacon",
"namespace": "com.etsy",
"fields": [
{"name": "event_name", "type": ["null", "string"], "default": null},
{"name": "timestamp", "type": "long"},
{"name": "browser_id", "type": ["null", "string"], "default": null},
{"name": "primary_event", "type": "boolean"},
{"name": "guid", "type": ["null", "string"], "default": null},
{"name": "page_guid", "type": ["null", "string"], "default": null},
{"name": "event_logger", "type": ["null", "string"], "default": null},
{"name": "event_source", "type": ["null", "string"], "default": null},
{"name": "ip", "type": ["null", "string"], "default": null},
{"name": "user_agent", "type": ["null", "string"], "default": null},
{"name": "loc", "type": ["null", "string"], "default": null},
{"name": "ref", "type": ["null", "string"], "default": null},
{"name": "cookies", "type": ["null", {"type": "map", "values": ["null", "string"]}], "default": null},
{"name": "ab", "type": ["null", {"type": "map", "values": ["null", {"type": "array", "items": ["null", "string"]}]}], "default": null},
{"name": "user_id", "type": ["null", "long"], "default": null},
{"name": "isMobileRequest", "type": ["null", "boolean"], "default": null},
{"name": "isMobileDevice", "type": ["null", "boolean"], "default": null},
{"name": "isMobileTemplate", "type": ["null", "boolean"], "default": null},
{"name": "detected_currency_code", "type": ["null", "string"], "default": null},
{"name": "detected_language", "type": ["null", "string"], "default": null},
{"name": "detected_region", "type": ["null", "string"], "default": null},
{"name": "listing_ids", "type": ["null", {"type": "array", "items": "long"}], "default": null},
{"name": "event_timestamp", "type": ["null", "long"], "default": null},
{"name": "properties", "type": ["null", {"type": "map", "values": ["null", "string"]}], "default": null}
]
}
""")
}
}

class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] {
override def map(value: GenericRecord): GenericRecord = {
val updatedTimestamp = System.currentTimeMillis()
// Update the timestamp field in the record
value.put("timestamp", updatedTimestamp)
Thread.sleep(delayMs)
value
}
}
Comment on lines +140 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using Thread.sleep() in the map function.

Thread.sleep() blocks processing threads and reduces throughput.

Apply this diff to remove the sleep:

 class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] {
   override def map(value: GenericRecord): GenericRecord = {
     val updatedTimestamp = System.currentTimeMillis()
     // Update the timestamp field in the record
     value.put("timestamp", updatedTimestamp)
-    Thread.sleep(delayMs)
     value
   }
 }

Consider using a non-blocking approach like event time characteristics or timers.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] {
override def map(value: GenericRecord): GenericRecord = {
val updatedTimestamp = System.currentTimeMillis()
// Update the timestamp field in the record
value.put("timestamp", updatedTimestamp)
Thread.sleep(delayMs)
value
}
}
class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] {
override def map(value: GenericRecord): GenericRecord = {
val updatedTimestamp = System.currentTimeMillis()
// Update the timestamp field in the record
value.put("timestamp", updatedTimestamp)
value
}
}

Loading