Skip to content

Commit 0ee2991

Browse files
authored
refactor: split fetcher logic into multiple files (#425)
## Summary fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement. Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith Made Metadata methods more explicit FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler Added fetch context - to replace 10 constructor params ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling. - Added a new `FetchContext` class to manage fetching operations and execution contexts. - Implemented a new `GroupByFetcher` class for efficient group-by data retrieval. - **Refactor** - Upgraded serialization and deserialization to use a more efficient, compact protocol. - Standardized API definitions and type declarations across modules to improve clarity and maintainability. - Enhanced error handling in various methods to provide more informative messages. - **Chores** - Removed outdated utilities and reorganized dependency imports. - Updated test suites to align with the refactored architecture. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 7311b00 commit 0ee2991

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1182
-1067
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package ai.chronon.api
2+
3+
import ai.chronon.api.thrift.protocol.{TBinaryProtocol, TCompactProtocol}
4+
import ai.chronon.api.thrift.{TDeserializer, TSerializer}
5+
6+
object SerdeUtils {
7+
@transient
8+
lazy val compactSerializer: ThreadLocal[TSerializer] = new ThreadLocal[TSerializer] {
9+
override def initialValue(): TSerializer = new TSerializer(new TCompactProtocol.Factory())
10+
}
11+
12+
@transient
13+
lazy val compactDeserializer: ThreadLocal[TDeserializer] = new ThreadLocal[TDeserializer] {
14+
override def initialValue(): TDeserializer = new TDeserializer(new TCompactProtocol.Factory())
15+
}
16+
}

api/src/main/scala/ai/chronon/api/TilingUtils.scala

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,21 @@
11
package ai.chronon.api
22

3-
import ai.chronon.api.thrift.TDeserializer
4-
import ai.chronon.api.thrift.TSerializer
5-
import ai.chronon.api.thrift.protocol.TBinaryProtocol
6-
import ai.chronon.api.thrift.protocol.TProtocolFactory
73
import ai.chronon.fetcher.TileKey
4+
import ai.chronon.api.SerdeUtils
85

96
import java.io.Serializable
107
import scala.jdk.CollectionConverters._
118

129
// Convenience functions for working with tiling
1310
object TilingUtils {
14-
class SerializableSerializer(factory: TProtocolFactory) extends TSerializer(factory) with Serializable
15-
16-
// crazy bug in compact protocol - do not change to compact
17-
18-
@transient
19-
lazy val binarySerializer: ThreadLocal[TSerializer] = new ThreadLocal[TSerializer] {
20-
override def initialValue(): TSerializer = new TSerializer(new TBinaryProtocol.Factory())
21-
}
22-
23-
@transient
24-
lazy val binaryDeserializer: ThreadLocal[TDeserializer] = new ThreadLocal[TDeserializer] {
25-
override def initialValue(): TDeserializer = new TDeserializer(new TBinaryProtocol.Factory())
26-
}
2711

2812
def serializeTileKey(key: TileKey): Array[Byte] = {
29-
binarySerializer.get().serialize(key)
13+
SerdeUtils.compactSerializer.get().serialize(key)
3014
}
3115

3216
def deserializeTileKey(bytes: Array[Byte]): TileKey = {
3317
val key = new TileKey()
34-
binaryDeserializer.get().deserialize(key, bytes)
18+
SerdeUtils.compactDeserializer.get().deserialize(key, bytes)
3519
key
3620
}
3721

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ import ai.chronon.flink.window.KeySelectorBuilder
1919
import ai.chronon.online.Api
2020
import ai.chronon.online.FlagStoreConstants
2121
import ai.chronon.online.GroupByServingInfoParsed
22-
import ai.chronon.online.MetadataStore
2322
import ai.chronon.online.SparkConversions
2423
import ai.chronon.online.TopicInfo
24+
import ai.chronon.online.fetcher.{FetchContext, MetadataStore}
2525
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
2626
import org.apache.flink.api.common.eventtime.WatermarkStrategy
2727
import org.apache.flink.configuration.CheckpointingOptions
@@ -243,7 +243,7 @@ object FlinkJob {
243243
// we set an explicit max parallelism to ensure if we do make parallelism setting updates, there's still room
244244
// to restore the job from prior state. Number chosen does have perf ramifications if too high (can impact rocksdb perf)
245245
// so we've chosen one that should allow us to scale to jobs in the 10K-50K events / s range.
246-
val MaxParallelism = 1260 // highly composite number
246+
val MaxParallelism: Int = 1260 // highly composite number
247247

248248
// We choose to checkpoint frequently to ensure the incremental checkpoints are small in size
249249
// as well as ensuring the catch-up backlog is fairly small in case of failures
@@ -254,11 +254,11 @@ object FlinkJob {
254254
val CheckpointTimeout: FiniteDuration = 5.minutes
255255

256256
// We use incremental checkpoints and we cap how many we keep around
257-
val MaxRetainedCheckpoints = 10
257+
val MaxRetainedCheckpoints: Int = 10
258258

259259
// how many consecutive checkpoint failures can we tolerate - default is 0, we choose a more lenient value
260260
// to allow us a few tries before we give up
261-
val TolerableCheckpointFailures = 5
261+
val TolerableCheckpointFailures: Int = 5
262262

263263
// Keep windows open for a bit longer before closing to ensure we don't lose data due to late arrivals (needed in case of
264264
// tiling implementation)
@@ -306,7 +306,7 @@ object FlinkJob {
306306
val kafkaBootstrap = jobArgs.kafkaBootstrap.toOption
307307

308308
val api = buildApi(onlineClassName, props)
309-
val metadataStore = new MetadataStore(api.genKvStore, MetadataDataset, timeoutMillis = 10000)
309+
val metadataStore = new MetadataStore(FetchContext(api.genKvStore, MetadataDataset))
310310

311311
val flinkJob =
312312
if (useMockedSource) {

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ import ai.chronon.api.{StructType => ApiStructType}
1919
import ai.chronon.api.ScalaJavaConversions._
2020
import ai.chronon.flink.types.WriteResponse
2121
import ai.chronon.online.Api
22-
import ai.chronon.online.AvroCodec
2322
import ai.chronon.online.AvroConversions
2423
import ai.chronon.online.Extensions.StructTypeOps
2524
import ai.chronon.online.GroupByServingInfoParsed
25+
import ai.chronon.online.serde.AvroCodec
2626
import org.apache.flink.api.common.serialization.DeserializationSchema
2727
import org.apache.flink.api.common.serialization.SerializationSchema
2828
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup

flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import ai.chronon.api.Row
88
import ai.chronon.api.ScalaJavaConversions.ListOps
99
import ai.chronon.flink.types.TimestampedIR
1010
import ai.chronon.flink.types.TimestampedTile
11-
import ai.chronon.online.ArrayRow
1211
import ai.chronon.online.TileCodec
12+
import ai.chronon.online.serde.ArrayRow
1313
import org.apache.flink.api.common.functions.AggregateFunction
1414
import org.apache.flink.configuration.Configuration
1515
import org.apache.flink.metrics.Counter

flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package org.apache.spark.sql.avro
22

33
import ai.chronon.api.{StructType => ChrononStructType}
44
import ai.chronon.flink.test.UserAvroSchema
5-
import ai.chronon.online.{AvroCodec, AvroConversions, CatalystUtil}
5+
import ai.chronon.online.serde.AvroCodec
6+
import ai.chronon.online.{AvroConversions, CatalystUtil}
67
import org.apache.avro.Schema
78
import org.apache.avro.generic.GenericData
89
import org.apache.flink.api.common.serialization.DeserializationSchema

online/src/main/scala/ai/chronon/online/Api.scala

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,26 +79,33 @@ trait KVStore {
7979

8080
// helper method to blocking read a string - used for fetching metadata & not in hotpath.
8181
def getString(key: String, dataset: String, timeoutMillis: Long): Try[String] = {
82-
val response = getResponse(key, dataset, timeoutMillis)
83-
if (response.values.isFailure) {
84-
Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get))
85-
} else {
86-
response.values.get.length match {
87-
case 0 => {
88-
Failure(new RuntimeException(s"Empty response from KVStore for key=${key} in dataset=${dataset}."))
89-
}
90-
case _ => Success(new String(response.latest.get.bytes, Constants.UTF8))
82+
83+
getResponse(key, dataset, timeoutMillis).values
84+
.recoverWith { case ex =>
85+
// wrap with more info
86+
Failure(new RuntimeException(s"Request for key $key in dataset $dataset failed", ex))
87+
}
88+
.flatMap { values =>
89+
if (values.isEmpty)
90+
Failure(new RuntimeException(s"Empty response from KVStore for key=$key in dataset=$dataset."))
91+
else
92+
Success(new String(values.maxBy(_.millis).bytes, Constants.UTF8))
9193
}
92-
}
9394
}
9495

9596
def getStringArray(key: String, dataset: String, timeoutMillis: Long): Try[Seq[String]] = {
9697
val response = getResponse(key, dataset, timeoutMillis)
97-
if (response.values.isFailure) {
98-
Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get))
99-
} else {
100-
Success(StringArrayConverter.bytesToStrings(response.latest.get.bytes))
101-
}
98+
99+
response.values
100+
.map { values =>
101+
val latestBytes = values.maxBy(_.millis).bytes
102+
StringArrayConverter.bytesToStrings(latestBytes)
103+
}
104+
.recoverWith { case ex =>
105+
// Wrap with more info
106+
Failure(new RuntimeException(s"Request for key $key in dataset $dataset failed", ex))
107+
}
108+
102109
}
103110

104111
private def getResponse(key: String, dataset: String, timeoutMillis: Long): GetResponse = {

online/src/main/scala/ai/chronon/online/AvroConversions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ object AvroConversions {
183183
}
184184

185185
def encodeBytes(schema: StructType, extraneousRecord: Any => Array[Any] = null): Any => Array[Byte] = {
186-
val codec: AvroCodec = new AvroCodec(fromChrononSchema(schema).toString(true));
186+
val codec: serde.AvroCodec = new serde.AvroCodec(fromChrononSchema(schema).toString(true));
187187
{ data: Any =>
188188
val record =
189189
fromChrononRow(data, codec.chrononSchema, codec.schema, extraneousRecord).asInstanceOf[GenericData.Record]
@@ -193,7 +193,7 @@ object AvroConversions {
193193
}
194194

195195
def encodeJson(schema: StructType, extraneousRecord: Any => Array[Any] = null): Any => String = {
196-
val codec: AvroCodec = new AvroCodec(fromChrononSchema(schema).toString(true));
196+
val codec: serde.AvroCodec = new serde.AvroCodec(fromChrononSchema(schema).toString(true));
197197
{ data: Any =>
198198
val record =
199199
fromChrononRow(data, codec.chrononSchema, codec.schema, extraneousRecord).asInstanceOf[GenericData.Record]

online/src/main/scala/ai/chronon/online/CompatParColls.scala

Lines changed: 0 additions & 32 deletions
This file was deleted.

online/src/main/scala/ai/chronon/online/Extensions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ object Extensions {
4848

4949
def toAvroSchema(name: String = null): Schema = AvroConversions.fromChrononSchema(toChrononSchema(name))
5050

51-
def toAvroCodec(name: String = null): AvroCodec = new AvroCodec(toAvroSchema(name).toString())
51+
def toAvroCodec(name: String = null): serde.AvroCodec = new serde.AvroCodec(toAvroSchema(name).toString())
5252
}
5353
}

online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
package ai.chronon.online
1818

1919
import ai.chronon.api.Constants
20-
import ai.chronon.online.fetcher.Fetcher.Request
21-
import ai.chronon.online.fetcher.Fetcher.Response
20+
import ai.chronon.online.fetcher.Fetcher.{Request, Response}
2221

2322
import scala.collection.Seq
2423
import scala.collection.mutable

online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ import ai.chronon.api.Constants.ReversalField
2121
import ai.chronon.api.Constants.TimeField
2222
import ai.chronon.api.Extensions.GroupByOps
2323
import ai.chronon.api.Extensions.MetadataOps
24+
import ai.chronon.api.ScalaJavaConversions.ListOps
2425
import ai.chronon.api._
2526
import ai.chronon.online.OnlineDerivationUtil.DerivationFunc
2627
import ai.chronon.online.OnlineDerivationUtil.buildDerivationFunction
28+
import ai.chronon.online.serde.AvroCodec
29+
2730
import org.apache.avro.Schema
2831

2932
import scala.collection.JavaConverters.asScalaBufferConverter
@@ -43,7 +46,7 @@ class GroupByServingInfoParsed(val groupByServingInfo: GroupByServingInfo, parti
4346

4447
lazy val aggregator: SawtoothOnlineAggregator = {
4548
new SawtoothOnlineAggregator(batchEndTsMillis,
46-
groupByServingInfo.groupBy.aggregations.asScala.toSeq,
49+
groupByServingInfo.groupBy.aggregations.toScala,
4750
valueChrononSchema.fields.map(sf => (sf.name, sf.fieldType)))
4851
}
4952

@@ -77,11 +80,11 @@ class GroupByServingInfoParsed(val groupByServingInfo: GroupByServingInfo, parti
7780
AvroConversions.fromChrononSchema(valueChrononSchema).toString()
7881
}
7982

80-
def valueAvroCodec: AvroCodec = AvroCodec.of(valueAvroSchema)
81-
def selectedCodec: AvroCodec = AvroCodec.of(selectedAvroSchema)
83+
def valueAvroCodec: serde.AvroCodec = serde.AvroCodec.of(valueAvroSchema)
84+
def selectedCodec: serde.AvroCodec = serde.AvroCodec.of(selectedAvroSchema)
8285
lazy val irAvroSchema: String = AvroConversions.fromChrononSchema(irChrononSchema).toString()
83-
def irCodec: AvroCodec = AvroCodec.of(irAvroSchema)
84-
def outputCodec: AvroCodec = AvroCodec.of(outputAvroSchema)
86+
def irCodec: serde.AvroCodec = serde.AvroCodec.of(irAvroSchema)
87+
def outputCodec: serde.AvroCodec = serde.AvroCodec.of(outputAvroSchema)
8588

8689
// Start tiling specific variables
8790

@@ -90,9 +93,12 @@ class GroupByServingInfoParsed(val groupByServingInfo: GroupByServingInfo, parti
9093

9194
// End tiling specific variables
9295

93-
def outputChrononSchema: StructType = {
94-
StructType.from(s"${groupBy.metaData.cleanName}_OUTPUT", aggregator.windowedAggregator.outputSchema)
95-
}
96+
def outputChrononSchema: StructType =
97+
if (groupByServingInfo.groupBy.aggregations == null) {
98+
selectedChrononSchema
99+
} else {
100+
StructType.from(s"${groupBy.metaData.cleanName}_OUTPUT", aggregator.windowedAggregator.outputSchema)
101+
}
96102

97103
lazy val outputAvroSchema: String = { AvroConversions.fromChrononSchema(outputChrononSchema).toString() }
98104

@@ -118,7 +124,7 @@ class GroupByServingInfoParsed(val groupByServingInfo: GroupByServingInfo, parti
118124
AvroConversions.toChrononSchema(parser.parse(mutationValueAvroSchema)).asInstanceOf[StructType]
119125
}
120126

121-
def mutationValueAvroCodec: AvroCodec = AvroCodec.of(mutationValueAvroSchema)
127+
def mutationValueAvroCodec: serde.AvroCodec = serde.AvroCodec.of(mutationValueAvroSchema)
122128

123129
// Schema for data consumed by the streaming job.
124130
// Needs consistency with mutationDf Schema for backfill group by. (Shared queries)

online/src/main/scala/ai/chronon/online/JoinCodec.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import ai.chronon.online.OnlineDerivationUtil.DerivationFunc
2727
import ai.chronon.online.OnlineDerivationUtil.buildDerivationFunction
2828
import ai.chronon.online.OnlineDerivationUtil.buildDerivedFields
2929
import ai.chronon.online.OnlineDerivationUtil.buildRenameOnlyDerivationFunction
30+
import ai.chronon.online.serde.AvroCodec
31+
3032
import com.google.gson.Gson
3133

3234
case class JoinCodec(conf: JoinOps,
3335
keySchema: StructType,
3436
baseValueSchema: StructType,
35-
keyCodec: AvroCodec,
36-
baseValueCodec: AvroCodec)
37+
keyCodec: serde.AvroCodec,
38+
baseValueCodec: serde.AvroCodec)
3739
extends Serializable {
3840

3941
@transient lazy val valueSchema: StructType = {
@@ -87,7 +89,7 @@ case class JoinCodec(conf: JoinOps,
8789

8890
object JoinCodec {
8991

90-
def buildLoggingSchema(joinName: String, keyCodec: AvroCodec, valueCodec: AvroCodec): String = {
92+
def buildLoggingSchema(joinName: String, keyCodec: serde.AvroCodec, valueCodec: serde.AvroCodec): String = {
9193
val schemaMap = Map(
9294
"join_name" -> joinName,
9395
"key_schema" -> keyCodec.schemaStr,

online/src/main/scala/ai/chronon/online/TTLCache.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class TTLCache[I, O](f: I => O,
4545

4646
case class Entry(value: O, updatedAtMillis: Long, var markedForUpdate: AtomicBoolean = new AtomicBoolean(false))
4747
@transient implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
48+
4849
private val updateWhenNull =
4950
new function.BiFunction[I, Entry, Entry] {
5051
override def apply(t: I, u: Entry): Entry = {

online/src/main/scala/ai/chronon/online/TileCodec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import ai.chronon.api.Extensions.WindowUtils
2525
import ai.chronon.api.GroupBy
2626
import ai.chronon.api.ScalaJavaConversions._
2727
import ai.chronon.api.StructType
28+
import ai.chronon.online.serde.AvroCodec
2829
import org.apache.avro.generic.GenericData
2930

3031
import scala.collection.JavaConverters._
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package ai.chronon.online.fetcher
2+
import ai.chronon.api.Constants.MetadataDataset
3+
import ai.chronon.api.ScalaJavaConversions.JMapOps
4+
import ai.chronon.online.{FlagStore, FlagStoreConstants, FlexibleExecutionContext, KVStore}
5+
6+
import scala.concurrent.ExecutionContext
7+
8+
case class FetchContext(kvStore: KVStore,
9+
metadataDataset: String = MetadataDataset,
10+
timeoutMillis: Long = 10000,
11+
debug: Boolean = false,
12+
flagStore: FlagStore = null,
13+
disableErrorThrows: Boolean = false,
14+
executionContextOverride: ExecutionContext = null) {
15+
16+
def isTilingEnabled: Boolean = {
17+
Option(flagStore)
18+
.map(_.isSet(FlagStoreConstants.TILING_ENABLED, Map.empty[String, String].toJava))
19+
.exists(_.asInstanceOf[Boolean])
20+
}
21+
22+
def isCachingEnabled(groupByName: String): Boolean = {
23+
Option(flagStore)
24+
.exists(_.isSet("enable_fetcher_batch_ir_cache", Map("group_by_streaming_dataset" -> groupByName).toJava))
25+
}
26+
27+
def shouldStreamingDecodeThrow(groupByName: String): Boolean = {
28+
Option(flagStore)
29+
.exists(
30+
_.isSet("disable_streaming_decoding_error_throws", Map("group_by_streaming_dataset" -> groupByName).toJava))
31+
}
32+
33+
def getOrCreateExecutionContext: ExecutionContext = {
34+
Option(executionContextOverride).getOrElse(FlexibleExecutionContext.buildExecutionContext)
35+
}
36+
}

0 commit comments

Comments
 (0)