|
| 1 | +package ai.chronon.flink |
| 2 | + |
| 3 | +import org.apache.avro.Schema |
| 4 | +import org.apache.avro.generic.GenericRecord |
| 5 | +import org.apache.flink.api.common.functions.MapFunction |
| 6 | +import org.apache.flink.api.common.typeinfo.TypeInformation |
| 7 | +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema |
| 8 | +import org.apache.flink.connector.kafka.sink.KafkaSink |
| 9 | +import org.apache.flink.core.fs.Path |
| 10 | +import org.apache.flink.formats.avro.AvroInputFormat |
| 11 | +import org.apache.flink.formats.avro.AvroSerializationSchema |
| 12 | +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo |
| 13 | +import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils |
| 14 | +import org.apache.flink.streaming.api.scala.DataStream |
| 15 | +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
| 16 | +import org.apache.kafka.clients.producer.ProducerConfig |
| 17 | +import org.rogach.scallop.ScallopConf |
| 18 | +import org.rogach.scallop.ScallopOption |
| 19 | +import org.rogach.scallop.Serialization |
| 20 | + |
| 21 | +// Canary test app that can point to a source data file and will emit an event to Kafka periodically with an updated timestamp |
| 22 | +object FlinkKafkaBeaconEventDriver { |
| 23 | + // Pull in the Serialization trait to sidestep: https://github.com/scallop/scallop/issues/137 |
| 24 | + class JobArgs(args: Seq[String]) extends ScallopConf(args) with Serialization { |
| 25 | + val dataFileName: ScallopOption[String] = |
| 26 | + opt[String](required = true, descr = "Name of the file on GCS to read data from") |
| 27 | + val kafkaBootstrap: ScallopOption[String] = |
| 28 | + opt[String](required = true, descr = "Kafka bootstrap server in host:port format") |
| 29 | + val kafkaTopic: ScallopOption[String] = opt[String](required = true, descr = "Kafka topic to write to") |
| 30 | + val eventDelayMillis: ScallopOption[Int] = |
| 31 | + opt[Int](required = false, |
| 32 | + descr = "Delay to use between event publishes (dictates the eps)", |
| 33 | + default = Some(1000)) |
| 34 | + |
| 35 | + verify() |
| 36 | + } |
| 37 | + |
| 38 | + def main(args: Array[String]): Unit = { |
| 39 | + val jobArgs = new JobArgs(args) |
| 40 | + val dataFileName = jobArgs.dataFileName() |
| 41 | + val bootstrapServers = jobArgs.kafkaBootstrap() |
| 42 | + val kafkaTopic = jobArgs.kafkaTopic() |
| 43 | + val eventDelayMillis = jobArgs.eventDelayMillis() |
| 44 | + |
| 45 | + val schema = buildAvroSchema() |
| 46 | + // Configure GCS source |
| 47 | + val avroFormat = new AvroInputFormat[GenericRecord]( |
| 48 | + new Path(dataFileName), |
| 49 | + classOf[GenericRecord] |
| 50 | + ) |
| 51 | + |
| 52 | + implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema) |
| 53 | + |
| 54 | + // Set up the streaming execution environment |
| 55 | + val env = StreamExecutionEnvironment.getExecutionEnvironment |
| 56 | + env.getConfig |
| 57 | + .enableForceKryo() // use kryo for complex types that Flink's default ser system doesn't support (e.g case classes) |
| 58 | + env.getConfig.enableGenericTypes() // more permissive type checks |
| 59 | + env.addDefaultKryoSerializer(classOf[Schema], classOf[AvroKryoSerializerUtils.AvroSchemaSerializer]) |
| 60 | + |
| 61 | + val stream = env |
| 62 | + .createInput(avroFormat) |
| 63 | + .setParallelism(1) |
| 64 | + |
| 65 | + val transformedStream: DataStream[GenericRecord] = stream |
| 66 | + .map(new DelayedSourceTransformFn(eventDelayMillis)) |
| 67 | + .setParallelism(stream.parallelism) |
| 68 | + |
| 69 | + // Configure Kafka sink |
| 70 | + val serializationSchema = KafkaRecordSerializationSchema |
| 71 | + .builder() |
| 72 | + .setTopic(kafkaTopic) |
| 73 | + .setValueSerializationSchema(AvroSerializationSchema.forGeneric(schema)) |
| 74 | + .build() |
| 75 | + |
| 76 | + val producerConfig = new java.util.Properties() |
| 77 | + producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") |
| 78 | + producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "3") |
| 79 | + producerConfig.setProperty("security.protocol", "SASL_SSL") |
| 80 | + producerConfig.setProperty("sasl.mechanism", "OAUTHBEARER") |
| 81 | + producerConfig.setProperty("sasl.login.callback.handler.class", |
| 82 | + "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler") |
| 83 | + producerConfig.setProperty("sasl.jaas.config", |
| 84 | + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;") |
| 85 | + |
| 86 | + val kafkaSink = KafkaSink |
| 87 | + .builder() |
| 88 | + .setBootstrapServers(bootstrapServers) |
| 89 | + .setRecordSerializer(serializationSchema) |
| 90 | + .setKafkaProducerConfig(producerConfig) |
| 91 | + .build() |
| 92 | + |
| 93 | + // Write to Kafka |
| 94 | + transformedStream |
| 95 | + .sinkTo(kafkaSink) |
| 96 | + .setParallelism(transformedStream.parallelism) |
| 97 | + |
| 98 | + // Execute program |
| 99 | + env.execute("Periodic Kafka Beacon Data Producer") |
| 100 | + } |
| 101 | + |
| 102 | + def buildAvroSchema(): Schema = { |
| 103 | + new Schema.Parser().parse( |
| 104 | + """ |
| 105 | + { |
| 106 | + "type": "record", |
| 107 | + "name": "Beacon", |
| 108 | + "namespace": "com.etsy", |
| 109 | + "fields": [ |
| 110 | + {"name": "event_name", "type": ["null", "string"], "default": null}, |
| 111 | + {"name": "timestamp", "type": "long"}, |
| 112 | + {"name": "browser_id", "type": ["null", "string"], "default": null}, |
| 113 | + {"name": "primary_event", "type": "boolean"}, |
| 114 | + {"name": "guid", "type": ["null", "string"], "default": null}, |
| 115 | + {"name": "page_guid", "type": ["null", "string"], "default": null}, |
| 116 | + {"name": "event_logger", "type": ["null", "string"], "default": null}, |
| 117 | + {"name": "event_source", "type": ["null", "string"], "default": null}, |
| 118 | + {"name": "ip", "type": ["null", "string"], "default": null}, |
| 119 | + {"name": "user_agent", "type": ["null", "string"], "default": null}, |
| 120 | + {"name": "loc", "type": ["null", "string"], "default": null}, |
| 121 | + {"name": "ref", "type": ["null", "string"], "default": null}, |
| 122 | + {"name": "cookies", "type": ["null", {"type": "map", "values": ["null", "string"]}], "default": null}, |
| 123 | + {"name": "ab", "type": ["null", {"type": "map", "values": ["null", {"type": "array", "items": ["null", "string"]}]}], "default": null}, |
| 124 | + {"name": "user_id", "type": ["null", "long"], "default": null}, |
| 125 | + {"name": "isMobileRequest", "type": ["null", "boolean"], "default": null}, |
| 126 | + {"name": "isMobileDevice", "type": ["null", "boolean"], "default": null}, |
| 127 | + {"name": "isMobileTemplate", "type": ["null", "boolean"], "default": null}, |
| 128 | + {"name": "detected_currency_code", "type": ["null", "string"], "default": null}, |
| 129 | + {"name": "detected_language", "type": ["null", "string"], "default": null}, |
| 130 | + {"name": "detected_region", "type": ["null", "string"], "default": null}, |
| 131 | + {"name": "listing_ids", "type": ["null", {"type": "array", "items": "long"}], "default": null}, |
| 132 | + {"name": "event_timestamp", "type": ["null", "long"], "default": null}, |
| 133 | + {"name": "properties", "type": ["null", {"type": "map", "values": ["null", "string"]}], "default": null} |
| 134 | + ] |
| 135 | + } |
| 136 | + """) |
| 137 | + } |
| 138 | +} |
| 139 | + |
| 140 | +class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] { |
| 141 | + override def map(value: GenericRecord): GenericRecord = { |
| 142 | + val updatedTimestamp = System.currentTimeMillis() |
| 143 | + // Update the timestamp field in the record |
| 144 | + value.put("timestamp", updatedTimestamp) |
| 145 | + Thread.sleep(delayMs) |
| 146 | + value |
| 147 | + } |
| 148 | +} |
0 commit comments