Skip to content

Commit 5c3abf2

Browse files
authored
Rework Flink Schema providers to use existing SerDe and Mutation interfaces (#751)
## Summary Refactor some of the schema provider shaped code to - * Use the existing SerDe class interfaces we have * Work with Mutation types via the SerDe classes * Primary shuffling is around pulling the Avro deser out of the existing BaseAvroDeserializationSchema and delegating that to the SerDe to get a Mutation back as well as shifting things a bit to call CatalystUtil with the Mutation Array[Any] types. * Provide rails for users to provide a custom schema provider. I used this to test a version of the beacon app out in canary - I'll put up a separate PR for the test job in a follow up. * Other misc piled up fixes - Check that GBUs don't compute empty results; fix our Otel metrics code to be turned off by default to reduce log spam. ## Checklist - [X] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested -- Tested via canary on our env / cust env and confirmed we pass the validation piece as well as see the jobs come up and write out data to BT. - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added Avro serialization and deserialization support for online data processing. - Introduced flexible schema registry and custom schema provider selection for Flink streaming sources. - **Refactor** - Unified and renamed the serialization/deserialization interface to `SerDe` across modules. - Centralized and simplified schema provider and deserialization logic for Flink jobs. - Improved visibility and type safety for internal utilities. - **Bug Fixes** - Enhanced error handling and robustness in metrics initialization and deserialization workflows. - **Tests** - Added and updated tests for Avro deserialization and schema registry integration. - Removed outdated or redundant test suites. - **Chores** - Updated external dependencies to include Avro support. - Cleaned up unused files and legacy code. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent f21d0cc commit 5c3abf2

27 files changed

+569
-538
lines changed

cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class AwsApiImpl(conf: Map[String, String]) extends Api(conf) {
4242
/** The stream decoder method in the AwsApi is currently unimplemented. This needs to be implemented before
4343
* we can spin up the Aws streaming Chronon stack
4444
*/
45-
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): Serde = ???
45+
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): SerDe = ???
4646

4747
/** The external registry extension is currently unimplemented. We'll need to implement this prior to spinning up
4848
* a fully functional Chronon serving stack in Aws

cloud_gcp/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ shared_deps = [
3535
maven_artifact("ch.qos.reload4j:reload4j"),
3636
maven_artifact("org.threeten:threetenbp"),
3737
maven_artifact("org.apache.kafka:kafka-clients"),
38+
maven_artifact("org.apache.avro:avro"),
3839
maven_artifact("com.google.cloud.spark:spark-3.5-bigquery"),
3940
maven_artifact_with_suffix("org.apache.iceberg:iceberg-spark-runtime-3.5"),
4041
maven_artifact("org.objenesis:objenesis"),

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import ai.chronon.online.FlagStoreConstants
77
import ai.chronon.online.GroupByServingInfoParsed
88
import ai.chronon.online.KVStore
99
import ai.chronon.online.LoggableResponse
10-
import ai.chronon.online.serde.Serde
11-
import ai.chronon.online.serde.AvroSerde
10+
import ai.chronon.online.serde.{AvroConversions, AvroSerDe, SerDe}
1211
import com.google.api.gax.core.{InstantiatingExecutorProvider, NoCredentialsProvider}
1312
import com.google.api.gax.retrying.RetrySettings
1413
import com.google.cloud.bigquery.BigQueryOptions
@@ -40,8 +39,8 @@ class GcpApiImpl(conf: Map[String, String]) extends Api(conf) {
4039
// We set the flag store to always return true for tiling enabled
4140
setFlagStore(tilingEnabledFlagStore)
4241

43-
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): Serde =
44-
new AvroSerde(groupByServingInfoParsed.streamChrononSchema)
42+
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): SerDe =
43+
new AvroSerDe(AvroConversions.fromChrononSchema(groupByServingInfoParsed.streamChrononSchema))
4544

4645
override def genKvStore: KVStore = {
4746

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
7676

7777
it should "test flink kafka ingest job locally" ignore {
7878

79-
val submitter = DataprocSubmitter()
79+
val submitterConf = SubmitterConf("canary-443022", "us-central1", "zipline-canary-cluster")
80+
val submitter = DataprocSubmitter(submitterConf)
8081
val submittedJobId =
8182
submitter.submit(
8283
spark.submission.FlinkJob,
@@ -91,7 +92,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
9192
List.empty,
9293
"--kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",
9394
"--kafka-topic=test-beacon-main",
94-
"--data-file-name=gs://zl-warehouse/beacon_events/beacon-output.avro"
95+
"--data-file-name=gs://zl-warehouse/beacon_events/beacon-output.avro",
96+
"--event-delay-millis=10",
9597
)
9698
println(submittedJobId)
9799
}

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import ai.chronon.api.Extensions.GroupByOps
88
import ai.chronon.api.Extensions.SourceOps
99
import ai.chronon.api.ScalaJavaConversions._
1010
import ai.chronon.flink.FlinkJob.watermarkStrategy
11-
import ai.chronon.flink.SourceIdentitySchemaRegistrySchemaProvider.RegistryHostKey
11+
import ai.chronon.flink.deser.{DeserializationSchemaBuilder, FlinkSerDeProvider, SourceProjection}
1212
import ai.chronon.flink.types.AvroCodecOutput
1313
import ai.chronon.flink.types.TimestampedTile
1414
import ai.chronon.flink.types.WriteResponse
@@ -312,15 +312,10 @@ object FlinkJob {
312312
val topicUri = servingInfo.groupBy.streamingSource.get.topic
313313
val topicInfo = TopicInfo.parse(topicUri)
314314

315-
val schemaProvider =
316-
topicInfo.params.get(RegistryHostKey) match {
317-
case Some(_) => new ProjectedSchemaRegistrySchemaProvider(topicInfo.params)
318-
case None =>
319-
throw new IllegalArgumentException(
320-
s"We only support schema registry based schema lookups. Missing $RegistryHostKey in topic config")
321-
}
315+
val schemaProvider = FlinkSerDeProvider.build(topicInfo)
322316

323-
val deserializationSchema = schemaProvider.buildDeserializationSchema(servingInfo.groupBy)
317+
val deserializationSchema =
318+
DeserializationSchemaBuilder.buildSourceProjectionDeserSchema(schemaProvider, servingInfo.groupBy)
324319
require(
325320
deserializationSchema.isInstanceOf[SourceProjection],
326321
s"Expect created deserialization schema for groupBy: $groupByName with $topicInfo to mixin SourceProjection. " +

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package ai.chronon.flink
22

3+
import ai.chronon.flink.deser.ChrononDeserializationSchema
34
import ai.chronon.online.TopicChecker
45
import ai.chronon.online.TopicInfo
56
import org.apache.flink.api.common.eventtime.WatermarkStrategy

flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala

Lines changed: 0 additions & 111 deletions
This file was deleted.

flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ class SparkExpressionEval[EventType](encoder: Encoder[EventType], groupBy: Group
9292
result
9393
}
9494

95+
def performSql(row: Array[Any]): Seq[Map[String, Any]] = {
96+
val internalRow = catalystUtil.inputArrEncoder(row).asInstanceOf[InternalRow]
97+
performSql(internalRow)
98+
}
99+
95100
def evaluateExpressions(inputEvent: EventType,
96101
rowSerializer: ExpressionEncoder.Serializer[EventType]): Seq[Map[String, Any]] = {
97102
try {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package ai.chronon.flink.deser
2+
3+
import ai.chronon.api
4+
import ai.chronon.api.GroupBy
5+
import ai.chronon.online.serde.SerDe
6+
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema
7+
import org.apache.spark.sql.{Encoder, Row}
8+
9+
/** DeserializationSchema for use within Chronon. Includes details such as the source event encoder and if projection is
10+
* enabled, the projected schema. This is used to both build the Flink sources as well as in the downstream processing
11+
* operators (e.g. SparkExprEval).
12+
*
13+
* @tparam T - Type of the object returned after deserialization. Can be event type (no projection)
14+
* or Map[String, Any] (with projection)
15+
*/
16+
abstract class ChrononDeserializationSchema[T] extends AbstractDeserializationSchema[T] {
17+
def sourceProjectionEnabled: Boolean
18+
19+
def sourceEventEncoder: Encoder[Row]
20+
}
21+
22+
/** Trait that is mixed in with DeserializationSchemas that support projection pushdown. This trait provides the projected
23+
* schema that the source event will be projected to.
24+
*/
25+
trait SourceProjection {
26+
def projectedSchema: Array[(String, api.DataType)]
27+
}
28+
29+
object DeserializationSchemaBuilder {
30+
def buildSourceIdentityDeserSchema(provider: SerDe, groupBy: GroupBy): ChrononDeserializationSchema[Row] = {
31+
new SourceIdentityDeserializationSchema(provider, groupBy)
32+
}
33+
34+
def buildSourceProjectionDeserSchema(provider: SerDe,
35+
groupBy: GroupBy): ChrononDeserializationSchema[Map[String, Any]] = {
36+
new SourceProjectionDeserializationSchema(provider, groupBy)
37+
}
38+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package ai.chronon.flink.deser
2+
3+
import ai.chronon.online.TopicInfo
4+
import ai.chronon.online.serde.SerDe
5+
6+
// Configured in topic config in this fashion:
7+
// kafka://test-beacon-main/provider_class=ai.chronon.flink.deser.MockCustomSchemaProvider/schema_name=beacon
8+
object CustomSchemaSerDe {
9+
val ProviderClass = "provider_class"
10+
val SchemaName = "schema_name"
11+
12+
def buildCustomSchemaSerDe(topicInfo: TopicInfo): SerDe = {
13+
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
14+
val providerClass =
15+
topicInfo.params.getOrElse(ProviderClass, throw new IllegalArgumentException(s"$ProviderClass not set"))
16+
val cls = cl.loadClass(providerClass)
17+
val constructor = cls.getConstructors.apply(0)
18+
val provider = constructor.newInstance(topicInfo)
19+
provider.asInstanceOf[SerDe]
20+
}
21+
}

0 commit comments

Comments
 (0)