Skip to content

Commit 811bc41

Browse files
authored
Add a Flink canary app that can be run on demand (#762)
## Summary Builds on top of PR: #751. This PR adds a streaming GroupBy that can be run as a canary to sanity check and test things out while making Flink changes. I used this to sanity check the creation & use of a Mock schema serde that some users have been asking for. Can be submitted via: ``` $ CHRONON_ROOT=`pwd`/api/python/test/canary $ zipline compile --chronon-root=$CHRONON_ROOT $ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate ``` (Needs the Flink event driver to be running - triggered via DataProcSubmitterTest) ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Introduced a new group-by aggregation for item event actions, supporting real-time analytics by listing ID with data sourced from GCP Kafka and BigQuery. - Added a mock schema provider for testing item event ingestion. - **Bug Fixes** - Updated test configurations to use new event schemas, topics, and data paths for improved accuracy in Flink Kafka ingest job tests. - **Refactor** - Renamed and restructured the event driver to focus on item events, with a streamlined schema and updated job naming. - **Chores** - Added new environment variable for Flink state storage configuration. - Updated build configuration to reference the renamed event driver. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 02a10f4 commit 811bc41

File tree

6 files changed

+106
-37
lines changed

6 files changed

+106
-37
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from ai.chronon.api.ttypes import EventSource, Source
2+
from ai.chronon.group_by import Aggregation, GroupBy, Operation
3+
from ai.chronon.query import Query, selects
4+
from ai.chronon.types import ConfigProperties
5+
6+
_action_events = [
7+
"backend_add_to_cart",
8+
"view_listing",
9+
"backend_cart_payment",
10+
"backend_favorite_item2",
11+
]
12+
_action_events_csv = ", ".join([f"'{event}'" for event in _action_events])
13+
_action_events_filter = f"event_type in ({_action_events_csv})"
14+
15+
def build_source(topic: str) -> Source:
16+
return Source(
17+
events=EventSource(
18+
# This source table contains a custom struct ('attributes') that enables
19+
# attributes['key'] style access pattern in a BQ native table.
20+
table="data.item_events_parquet_compat",
21+
topic=topic,
22+
query=Query(
23+
selects=selects(
24+
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
25+
add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)",
26+
view="IF(event_type = 'view_listing', 1, 0)",
27+
purchase="IF(event_type = 'backend_cart_payment', 1, 0)",
28+
favorite="IF(event_type = 'backend_favorite_item2', 1, 0)",
29+
),
30+
wheres=[_action_events_filter],
31+
time_column="timestamp",
32+
),
33+
)
34+
)
35+
36+
# GCP Kafka clusters require TLS
37+
google_kafka_cfgs = "security.protocol=SASL_SSL/sasl.mechanism=OAUTHBEARER/sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler/sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
38+
schema_provider_cfgs = "provider_class=ai.chronon.flink.deser.MockCustomSchemaProvider/schema_name=item_event"
39+
kafka_topic = f"kafka://test-item-event-data/{schema_provider_cfgs}/{google_kafka_cfgs}"
40+
actions_source = build_source(kafka_topic)
41+
42+
actions_v1 = GroupBy(
43+
sources=[actions_source],
44+
keys=["listing_id"],
45+
online=True,
46+
aggregations=[
47+
Aggregation(
48+
input_column="add_cart", operation=Operation.SUM, windows=["1d"]
49+
),
50+
Aggregation(input_column="view", operation=Operation.SUM, windows=["1d"]),
51+
Aggregation(input_column="purchase", operation=Operation.SUM, windows=["7d"]),
52+
Aggregation(
53+
input_column="favorite", operation=Operation.SUM, windows=["1d"]
54+
),
55+
],
56+
conf=ConfigProperties(
57+
common={
58+
"spark.chronon.partition.column": "_DATE",
59+
}
60+
),
61+
)

api/python/test/canary/teams.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"GCP_REGION": "us-central1",
2626
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster",
2727
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance",
28+
"FLINK_STATE_URI": "gs://zipline-warehouse-canary/flink-state",
2829
},
2930
),
3031
)

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
8282
submitter.submit(
8383
spark.submission.FlinkJob,
8484
Map(
85-
MainClass -> "ai.chronon.flink.FlinkKafkaBeaconEventDriver",
85+
MainClass -> "ai.chronon.flink.FlinkKafkaItemEventDriver",
8686
FlinkMainJarURI -> "gs://zipline-jars/flink_kafka_ingest-assembly-0.1.0-SNAPSHOT.jar",
8787
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar",
8888
// This is where we write out checkpoints / persist state while the job is running
@@ -91,8 +91,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
9191
Map.empty,
9292
List.empty,
9393
"--kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",
94-
"--kafka-topic=test-beacon-main",
95-
"--data-file-name=gs://zl-warehouse/beacon_events/beacon-output.avro",
94+
"--kafka-topic=test-item-event-data",
95+
"--data-file-name=gs://zl-warehouse/canary_item_events/events-output.avro",
9696
"--event-delay-millis=10",
9797
)
9898
println(submittedJobId)

flink/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,6 @@ jvm_binary(
8888
# To exclude runtime dependencies not needed for flink environment in the cluster
8989
# otherwise we run into version conflict errors
9090
deploy_env = ["//tools/build_rules/flink:flink"],
91-
main_class = "ai.chronon.flink.FlinkKafkaBeaconEventDriver",
91+
main_class = "ai.chronon.flink.FlinkKafkaItemEventDriver",
9292
runtime_deps = [":lib"],
9393
)

flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scala renamed to flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import org.rogach.scallop.ScallopOption
1919
import org.rogach.scallop.Serialization
2020

2121
// Canary test app that can point to a source data file and will emit an event to Kafka periodically with an updated timestamp
22-
object FlinkKafkaBeaconEventDriver {
22+
object FlinkKafkaItemEventDriver {
2323
// Pull in the Serialization trait to sidestep: https://github.com/scallop/scallop/issues/137
2424
class JobArgs(args: Seq[String]) extends ScallopConf(args) with Serialization {
2525
val dataFileName: ScallopOption[String] =
@@ -42,14 +42,13 @@ object FlinkKafkaBeaconEventDriver {
4242
val kafkaTopic = jobArgs.kafkaTopic()
4343
val eventDelayMillis = jobArgs.eventDelayMillis()
4444

45-
val schema = buildAvroSchema()
4645
// Configure GCS source
4746
val avroFormat = new AvroInputFormat[GenericRecord](
4847
new Path(dataFileName),
4948
classOf[GenericRecord]
5049
)
5150

52-
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema)
51+
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
5352

5453
// Set up the streaming execution environment
5554
val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -70,7 +69,7 @@ object FlinkKafkaBeaconEventDriver {
7069
val serializationSchema = KafkaRecordSerializationSchema
7170
.builder()
7271
.setTopic(kafkaTopic)
73-
.setValueSerializationSchema(AvroSerializationSchema.forGeneric(schema))
72+
.setValueSerializationSchema(AvroSerializationSchema.forGeneric(avroSchema))
7473
.build()
7574

7675
val producerConfig = new java.util.Properties()
@@ -96,40 +95,28 @@ object FlinkKafkaBeaconEventDriver {
9695
.setParallelism(transformedStream.getParallelism)
9796

9897
// Execute program
99-
env.execute("Periodic Kafka Beacon Data Producer")
98+
env.execute("Periodic Kafka Data Producer")
10099
}
101100

102-
def buildAvroSchema(): Schema = {
101+
lazy val avroSchema: Schema = {
103102
new Schema.Parser().parse("""
104103
{
105104
"type": "record",
106-
"name": "Beacon",
107-
"namespace": "com.customer",
105+
"name": "Event",
106+
"namespace": "ai.chronon",
108107
"fields": [
109-
{"name": "event_name", "type": ["null", "string"], "default": null},
108+
{"name": "event_type", "type": ["null", "string"], "default": null},
110109
{"name": "timestamp", "type": "long"},
111-
{"name": "browser_id", "type": ["null", "string"], "default": null},
112-
{"name": "primary_event", "type": "boolean"},
113-
{"name": "guid", "type": ["null", "string"], "default": null},
114-
{"name": "page_guid", "type": ["null", "string"], "default": null},
115-
{"name": "event_logger", "type": ["null", "string"], "default": null},
116-
{"name": "event_source", "type": ["null", "string"], "default": null},
117-
{"name": "ip", "type": ["null", "string"], "default": null},
118-
{"name": "user_agent", "type": ["null", "string"], "default": null},
119-
{"name": "loc", "type": ["null", "string"], "default": null},
120-
{"name": "ref", "type": ["null", "string"], "default": null},
121-
{"name": "cookies", "type": ["null", {"type": "map", "values": ["null", "string"]}], "default": null},
122-
{"name": "ab", "type": ["null", {"type": "map", "values": ["null", {"type": "array", "items": ["null", "string"]}]}], "default": null},
123-
{"name": "user_id", "type": ["null", "long"], "default": null},
124-
{"name": "isMobileRequest", "type": ["null", "boolean"], "default": null},
125-
{"name": "isMobileDevice", "type": ["null", "boolean"], "default": null},
126-
{"name": "isMobileTemplate", "type": ["null", "boolean"], "default": null},
127-
{"name": "detected_currency_code", "type": ["null", "string"], "default": null},
128-
{"name": "detected_language", "type": ["null", "string"], "default": null},
129-
{"name": "detected_region", "type": ["null", "string"], "default": null},
130-
{"name": "listing_ids", "type": ["null", {"type": "array", "items": "long"}], "default": null},
131-
{"name": "event_timestamp", "type": ["null", "long"], "default": null},
132-
{"name": "properties", "type": ["null", {"type": "map", "values": ["null", "string"]}], "default": null}
110+
{"name": "visitor_id", "type": ["null", "string"], "default": null},
111+
{"name": "is_primary", "type": "boolean"},
112+
{"name": "logger_name", "type": ["null", "string"], "default": null},
113+
{"name": "source", "type": ["null", "string"], "default": null},
114+
{"name": "is_mobile_req", "type": ["null", "boolean"], "default": null},
115+
{"name": "is_mobile_device", "type": ["null", "boolean"], "default": null},
116+
{"name": "is_mobile_view", "type": ["null", "boolean"], "default": null},
117+
{"name": "item_ids", "type": ["null", {"type": "array", "items": "long"}], "default": null},
118+
{"name": "created_at", "type": ["null", "long"], "default": null},
119+
{"name": "attributes", "type": ["null", {"type": "map", "values": ["null", "string"]}], "default": null}
133120
]
134121
}
135122
""")

flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package ai.chronon.flink.deser
22

3+
import ai.chronon.api.StructType
4+
import ai.chronon.flink.FlinkKafkaItemEventDriver
35
import ai.chronon.online.TopicInfo
4-
import ai.chronon.online.serde.SerDe
6+
import ai.chronon.online.serde.{AvroConversions, AvroSerDe, Mutation, SerDe}
57

68
// Configured in topic config in this fashion:
7-
// kafka://test-beacon-main/provider_class=ai.chronon.flink.deser.MockCustomSchemaProvider/schema_name=beacon
9+
// kafka://my-test-topic/provider_class=ai.chronon.flink.deser.MockCustomSchemaProvider/schema_name=item_event
810
object CustomSchemaSerDe {
911
val ProviderClass = "provider_class"
1012
val SchemaName = "schema_name"
@@ -19,3 +21,21 @@ object CustomSchemaSerDe {
1921
provider.asInstanceOf[SerDe]
2022
}
2123
}
24+
25+
/** Mock custom schema provider that vends out a custom hardcoded event schema
26+
*/
27+
class MockCustomSchemaProvider(topicInfo: TopicInfo) extends SerDe {
28+
private val schemaName = topicInfo.params.getOrElse(CustomSchemaSerDe.SchemaName, "item_event")
29+
require(schemaName == "item_event", s"Schema name must be 'item_event', but got $schemaName")
30+
31+
lazy val chrononSchema: StructType =
32+
AvroConversions.toChrononSchema(FlinkKafkaItemEventDriver.avroSchema).asInstanceOf[StructType]
33+
34+
lazy val avroSerDe = new AvroSerDe(FlinkKafkaItemEventDriver.avroSchema)
35+
36+
override def schema: StructType = chrononSchema
37+
38+
override def fromBytes(messageBytes: Array[Byte]): Mutation = {
39+
avroSerDe.fromBytes(messageBytes)
40+
}
41+
}

0 commit comments

Comments
 (0)