Skip to content

Commit 0a4115a

Browse files
authored
perf: Online + Avro path optimizations (#655)
## Summary guided by flamegraph from one of our customers - pre-compute window.millis - eliminate branching in AvroConversions.toChrononRow - by pre-building a row generator func `Row.fromCached` - optimize `expandWindowedTileIr` by pre-computing the expander indexes. - bring-in fast-serde from linkedin which does generic data decoding, by pre-generating decoder classes. - optimize FetcherCache.getBatchBytes logic - early exit when externalParts are null - bigtablekvstore impl optimizations on how we construct the keys ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new method for converting cached data to Chronon row format. - Introduced a public field to expose the smallest tail hop duration in milliseconds for group-by serving information. - **Bug Fixes** - Improved error handling and null safety in batch response processing and fetcher logic. - Enhanced logging with better exception handling and sampling in online fetcher responses. - Added exception logging around synchronous KVStore response retrieval. - **Refactor** - Optimized windowing and mapping logic for performance and clarity in several aggregation and expansion methods. - Modularized derivation and logging logic in the fetcher for better maintainability. - Updated method and field names for clarity regarding window resolution and tail hop durations. - Refactored Avro serialization/deserialization to use LinkedIn Fast Avro SerDe with cached conversion functions. - Improved conversion utilities for Avro to Chronon row representations. - Centralized Scala-to-Java byte array conversions in tiling utilities. - **Chores** - Added LinkedIn Fast Avro SerDe as a dependency and updated related build and dependency files. - Updated utility methods for more consistent data type conversions. - **Style** - Improved code comments and documentation for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 6b482ea commit 0a4115a

File tree

22 files changed

+2010
-197
lines changed

22 files changed

+2010
-197
lines changed

aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,20 @@ object ResolutionUtils {
6767
/** Find the smallest tail window resolution in a GroupBy. Returns 1D if the GroupBy does not define any windows (all-time aggregates).
6868
* The window resolutions are: 5 min for a GroupBy a window < 12 hrs, 1 hr for < 12 days, 1 day for > 12 days.
6969
*/
70-
def getSmallestWindowResolutionInMillis(groupBy: GroupBy): Long =
71-
Option(
72-
groupBy.aggregations.toScala.toArray
73-
.flatMap(aggregation =>
74-
if (aggregation.windows != null) aggregation.windows.toScala
75-
else None)
76-
.map(FiveMinuteResolution.calculateTailHop)
77-
).filter(_.nonEmpty).map(_.min).getOrElse(WindowUtils.Day.millis)
70+
def getSmallestTailHopMillis(groupBy: GroupBy): Long = {
71+
72+
val tailHops =
73+
for (
74+
aggs <- Option(groupBy.aggregations).toSeq;
75+
agg <- aggs.iterator().toScala;
76+
windows <- Option(agg.windows).toSeq;
77+
window <- windows.iterator().toScala
78+
) yield {
79+
FiveMinuteResolution.calculateTailHop(window)
80+
}
81+
82+
if (tailHops.isEmpty) WindowUtils.Day.millis
83+
else tailHops.min
84+
85+
}
7886
}

aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ case class FinalBatchIr(collapsed: Array[Any], tailHops: HopsAggregator.OutputAr
3131
*
3232
* update/merge/finalize are related to snapshot data. As such they follow the snapshot Schema
3333
* and aggregators.
34-
* However mutations come into play later in the group by and a finalized version of the snapshot
34+
* However, mutations come into play later in the group by and a finalized version of the snapshot
3535
* data is created to be processed with the mutations rows.
3636
* Since the dataframe inputs are aligned between mutations and snapshot (input) no additional schema is needed.
3737
*/
@@ -109,9 +109,10 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
109109
def updateIr(ir: Array[Any], row: Row, queryTs: Long, hasReversal: Boolean = false): Unit = {
110110
var i: Int = 0
111111
while (i < windowedAggregator.length) {
112+
val windowMillis = windowMappings(i).millis
112113
val window = windowMappings(i).aggregationPart.window
113114
val hopIndex = tailHopIndices(i)
114-
val rowInWindow = (row.ts >= TsUtils.round(queryTs - window.millis, hopSizes(hopIndex)) && row.ts < queryTs)
115+
val rowInWindow = (row.ts >= TsUtils.round(queryTs - windowMillis, hopSizes(hopIndex)) && row.ts < queryTs)
115116
if (window == null || rowInWindow) {
116117
if (hasReversal && row.isBefore) {
117118
windowedAggregator(i).delete(ir, row)
@@ -127,10 +128,11 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
127128
val otherIrTs = otherIr.ts
128129
var i: Int = 0
129130
while (i < windowedAggregator.length) {
131+
val windowMillis = windowMappings(i).millis
130132
val window = windowMappings(i).aggregationPart.window
131133
val hopIndex = tailHopIndices(i)
132-
val irInWindow =
133-
(otherIrTs >= TsUtils.round(queryTs - window.millis, hopSizes(hopIndex)) && otherIrTs < queryTs)
134+
lazy val irInWindow =
135+
(otherIrTs >= TsUtils.round(queryTs - windowMillis, hopSizes(hopIndex)) && otherIrTs < queryTs)
134136
if (window == null || irInWindow) {
135137
ir(i) = windowedAggregator(i).merge(ir(i), otherIr.ir(i))
136138
}
@@ -143,17 +145,18 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
143145
def mergeTailHops(ir: Array[Any], queryTs: Long, batchEndTs: Long, batchIr: FinalBatchIr): Array[Any] = {
144146
var i: Int = 0
145147
while (i < windowedAggregator.length) {
148+
val windowMillis = windowMappings(i).millis
146149
val window = windowMappings(i).aggregationPart.window
147150
if (window != null) { // no hops for unwindowed
148151
val hopIndex = tailHopIndices(i)
149-
val queryTail = TsUtils.round(queryTs - window.millis, hopSizes(hopIndex))
152+
val queryTail = TsUtils.round(queryTs - windowMillis, hopSizes(hopIndex))
150153
val hopIrs = batchIr.tailHops(hopIndex)
151154
val relevantHops = mutable.ArrayBuffer[Any](ir(i))
152155
var idx: Int = 0
153156
while (idx < hopIrs.length) {
154157
val hopIr = hopIrs(idx)
155158
val hopStart = hopIr.last.asInstanceOf[Long]
156-
if ((batchEndTs - window.millis) + tailBufferMillis > hopStart && hopStart >= queryTail) {
159+
if ((batchEndTs - windowMillis) + tailBufferMillis > hopStart && hopStart >= queryTail) {
157160
relevantHops += hopIr(baseIrIndices(i))
158161
}
159162
idx += 1

api/src/main/scala/ai/chronon/api/Extensions.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ object Extensions {
256256
}
257257
}
258258

259-
case class WindowMapping(aggregationPart: AggregationPart, baseIrIndex: Int)
259+
case class WindowMapping(aggregationPart: AggregationPart, baseIrIndex: Int, millis: Long)
260260

261261
case class UnpackedAggregations(perBucket: Array[AggregationPart], perWindow: Array[WindowMapping])
262262

@@ -295,7 +295,8 @@ object Extensions {
295295
)
296296
.orNull,
297297
bucket),
298-
counter
298+
counter,
299+
if (window != null) window.millis else -1
299300
)
300301
}
301302
counter += 1

api/src/main/scala/ai/chronon/api/Row.scala

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,85 @@ object Row {
123123
}
124124
}
125125

126+
private val passThroughFunc: Any => Any = { value: Any => value }
127+
128+
// recursively traverse a logical struct, and convert it chronon's row type
129+
def fromCached[CompositeType, BinaryType, ArrayType, StringType](dataType: DataType,
130+
decomposer: (CompositeType, Int) => Iterator[Any],
131+
debinarizer: BinaryType => Array[Byte],
132+
delister: ArrayType => util.ArrayList[Any],
133+
deStringer: StringType => String): Any => Any = {
134+
135+
def edit(dataType: DataType): Any => Any =
136+
fromCached(dataType, decomposer, debinarizer, delister, deStringer)
137+
138+
def guard(func: Any => Any): Any => Any = { value =>
139+
if (value == null) value else func(value)
140+
}
141+
142+
val baseFunc: Any => Any = dataType match {
143+
case StructType(_, fields) =>
144+
val length = fields.length
145+
val funcs: Array[Any => Any] = fields.map(_.fieldType).map(edit)
146+
147+
guard { value: Any =>
148+
val iter = decomposer(value.asInstanceOf[CompositeType], length)
149+
150+
val newArr = new Array[Any](length)
151+
var idx = 0
152+
while (iter.hasNext) {
153+
val value = iter.next()
154+
newArr.update(idx, funcs(idx)(value))
155+
idx += 1
156+
}
157+
newArr
158+
}
159+
160+
case ListType(elemType) =>
161+
val func = edit(elemType)
162+
163+
guard { value: Any =>
164+
val arr = delister(value.asInstanceOf[ArrayType])
165+
166+
if (func != passThroughFunc) {
167+
var idx = 0
168+
while (idx < arr.size) {
169+
arr.set(idx, func(arr.get(idx)))
170+
idx += 1
171+
}
172+
}
173+
174+
arr
175+
176+
}
177+
178+
case MapType(keyType, valueType) =>
179+
val keyFunc = edit(keyType)
180+
val valueFunc = edit(valueType)
181+
182+
guard { value: Any =>
183+
val newMap = new util.HashMap[Any, Any]()
184+
val map = value.asInstanceOf[util.Map[Any, Any]]
185+
val iter = map.entrySet().iterator()
186+
while (iter.hasNext) {
187+
val entry = iter.next()
188+
newMap.put(keyFunc(entry.getKey), valueFunc(entry.getValue))
189+
}
190+
newMap
191+
}
192+
193+
case BinaryType =>
194+
guard { value: Any =>
195+
debinarizer(value.asInstanceOf[BinaryType])
196+
}
197+
198+
case StringType => guard { value: Any => deStringer(value.asInstanceOf[StringType]) }
199+
case _ => passThroughFunc
200+
}
201+
202+
baseFunc
203+
}
204+
126205
// recursively traverse a chronon dataType value, and convert it to an external type
127206
def to[StructType, BinaryType, ListType, MapType, OutputSchema](
128207
value: Any,

api/src/main/scala/ai/chronon/api/TilingUtils.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ai.chronon.api
22

33
import ai.chronon.fetcher.TileKey
44

5+
import java.util
56
import scala.jdk.CollectionConverters._
67

78
// Convenience functions for working with tiling
@@ -17,13 +18,24 @@ object TilingUtils {
1718
key
1819
}
1920

21+
private def toList(arr: Array[Byte]): java.util.ArrayList[java.lang.Byte] = {
22+
if (arr == null) return null
23+
val result = new util.ArrayList[java.lang.Byte](arr.length)
24+
var idx = 0
25+
while (idx < arr.length) {
26+
result.add(arr(idx))
27+
idx += 1
28+
}
29+
result
30+
}
31+
2032
def buildTileKey(dataset: String,
2133
keyBytes: Array[Byte],
2234
tileSizeMs: Option[Long],
2335
tileStartTs: Option[Long]): TileKey = {
2436
val tileKey = new TileKey()
2537
tileKey.setDataset(dataset)
26-
tileKey.setKeyBytes(keyBytes.toList.asJava.asInstanceOf[java.util.List[java.lang.Byte]])
38+
tileKey.setKeyBytes(toList(keyBytes))
2739
tileSizeMs.foreach(tileKey.setTileSizeMillis)
2840
tileStartTs.foreach(tileKey.setTileStartTimestampMillis)
2941
tileKey

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class FlinkJob(eventSrc: FlinkSource[Map[String, Any]],
9696
"Tiling is enabled.")
9797

9898
val tilingWindowSizeInMillis: Long =
99-
ResolutionUtils.getSmallestWindowResolutionInMillis(groupByServingInfoParsed.groupBy)
99+
ResolutionUtils.getSmallestTailHopMillis(groupByServingInfoParsed.groupBy)
100100

101101
// we expect parallelism on the source stream to be set by the source provider
102102
val sourceSparkProjectedStream: DataStream[Map[String, Any]] =

0 commit comments

Comments
 (0)