-
Notifications
You must be signed in to change notification settings - Fork 0
perf: Online + Avro path optimizations #655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update introduces new utility methods for cached and schema-aware data conversion, particularly enhancing Avro-to-Chronon row transformations. It modularizes derivation and logging within the fetcher, improves null safety, and optimizes mapping logic in tile expansion. The smallest tail hop calculation is clarified and propagated across APIs, online, and test code. Dependencies for Avro FastSerde are added, with associated Bazel and Maven configuration updates. Minor refactoring and comment adjustments are made for clarity and maintainability, with no changes to public API signatures except for extended case classes and new methods. Changes
Sequence Diagram(s)sequenceDiagram
participant AvroCodec
participant AvroConversions
participant Row
participant ChrononRow
AvroCodec->>AvroConversions: toChrononRowFunc(schema)
AvroConversions->>Row: fromCached(dataType, ...)
Row-->>AvroConversions: conversion function
AvroConversions-->>AvroCodec: conversion function
AvroCodec->>AvroConversions: conversionFunc(avroRecord)
AvroConversions->>ChrononRow: Convert Avro to Chronon row
ChrononRow-->>AvroCodec: Array[Any]
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala (1)
101-101
: Same null pointer concern.
🧹 Nitpick comments (1)
api/src/main/scala/ai/chronon/api/Row.scala (1)
126-197
: Recursive converter looks flexible.
Consider numeric type edge cases.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (11)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
(4 hunks)api/src/main/scala/ai/chronon/api/Extensions.scala
(2 hunks)api/src/main/scala/ai/chronon/api/Row.scala
(1 hunks)maven_install.json
(10 hunks)online/BUILD.bazel
(2 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(3 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(4 hunks)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
(1 hunks)tools/build_rules/dependencies/load_dependencies.bzl
(1 hunks)tools/build_rules/dependencies/maven_repository.bzl
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
online/src/main/scala/ai/chronon/online/TileCodec.scala (2)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/TwoStackLiteAggregator.scala (1)
init
(42-42)aggregator/src/main/scala/ai/chronon/aggregator/row/RowAggregator.scala (2)
init
(69-69)clone
(154-154)
⏰ Context from checks skipped due to timeout of 90000ms (21)
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: hub_tests
- GitHub Check: hub_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: online_tests
- GitHub Check: bazel_config_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: orchestration_tests
- GitHub Check: orchestration_tests
- GitHub Check: aggregator_tests
🔇 Additional comments (24)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
259-259
: Improved design by adding milliseconds to WindowMapping case classAdding the millis field eliminates the need for repeated computation of window.millis in downstream code.
299-299
: Good handling of null window caseSetting millis to -1 when window is null provides a clear sentinel value for code using this field.
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala (4)
112-115
: Performance optimization by caching window.millisPre-computing window.millis reduces repeated access and calculations in this hot loop.
131-135
: Consistent optimization pattern across methodSame pattern applied here - caching window.millis to avoid repeated property access.
148-153
: Performance improvement in tail hops processingCaching window.millis optimizes this performance-sensitive code path.
159-159
: Consistent use of cached valueThe optimization is consistently applied throughout the code.
tools/build_rules/dependencies/maven_repository.bzl (1)
116-116
: Dependency addition looks good.Fastserde is correctly added for Avro optimizations.
tools/build_rules/dependencies/load_dependencies.bzl (1)
8-8
: Repository addition is correct.New Maven repo is needed for fastserde; no issues.
online/BUILD.bazel (1)
31-31
: Build dependency update is correct.fastserde is properly added to both targets.
Also applies to: 70-70
maven_install.json (1)
1-26992
: All good—dependency lockfile updates look correct and expected.online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (1)
195-201
: Potential edge case iflatest.millis
==batchEndTsMillis
.
Might produce a null when equality occurs.online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (2)
154-163
: Neat wrapper around the cached function.
Keeps logic minimal.
165-190
: Efficient Avro decoding.
Confirm concurrency safety if invoked from multiple threads.online/src/main/scala/ai/chronon/online/TileCodec.scala (4)
32-32
: Minimal import is fine.
99-100
: Case class is concise.
101-124
: Good precomputation for mappings.
Improves runtime.
126-132
: Cloning logic in expansion step.
Double-check thread safety.online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala (7)
19-19
: Looks good.
Appropriate import for StructType.
24-24
: No issues.
Necessary import for GenericRecord.
37-37
: DatumWriter usage looks correct.
No obvious issues.
38-38
: DatumReader usage looks correct.
Schema reference is consistent.
94-94
: Potential null pointer.
decode(bytes)
can be null; confirm safe usage.
26-27
:✅ Verification successful
Confirm fastserde version stability.
FastGenericDatumReader/Writer improves performance. Verify the library version is pinned to avoid regressions.
🏁 Script executed:
#!/bin/bash rg "com.linkedin.avroutil1" -A 5Length of output: 10539
Fastserde version pinned (0.4.25) confirmed.
- Dependency is explicitly set to 0.4.25 in both
maven_install.json
andtools/build_rules/dependencies/maven_repository.bzl
.
48-49
:✅ Verification successful
Check null handling in
toChrononRowFunc
.
decode
can return null. Ensure safety.
🏁 Script executed:
#!/bin/bash rg "def toChrononRowFunc" -A 10Length of output: 1029
Null safety in
toChrononRowFunc
is verified.
The lambda inAvroConversions.toChrononRowFunc
explicitly checks for a null input and returns null accordingly, ensuring that any null value fromdecode
is safely handled. No further action is needed on this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wdyt of carving out the LI avro serde libs in a different change that we can take the time to test out in the various paths in our lambda that exercise the avro ser/de?
// cache these mapping out of hot-path | ||
private case class ExpanderMapping(irPos: Int, bucketPos: Int) | ||
|
||
private val expanderMappings: Array[ExpanderMapping] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very cool!
f755f67
to
6cb5471
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
api/src/main/scala/ai/chronon/api/TilingUtils.scala
(2 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala
(12 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(8 hunks)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
🧰 Additional context used
🧬 Code Graph Analysis (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (3)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (8)
ScalaJavaConversions
(6-97)IteratorOps
(51-55)toScala
(16-22)toScala
(32-38)toScala
(41-43)toScala
(52-54)toScala
(62-68)toScala
(80-86)online/src/main/scala/ai/chronon/online/Api.scala (1)
KVStore
(34-51)api/src/main/scala/ai/chronon/api/TilingUtils.scala (2)
TilingUtils
(9-43)deserializeTileKey
(15-19)
⏰ Context from checks skipped due to timeout of 90000ms (21)
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: online_tests
- GitHub Check: hub_tests
- GitHub Check: hub_tests
- GitHub Check: online_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: orchestration_tests
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: orchestration_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: spark_tests
🔇 Additional comments (3)
api/src/main/scala/ai/chronon/api/TilingUtils.scala (3)
5-5
: Import added for new utility method.The java.util import is necessary for ArrayList usage.
21-30
: New utility method improves array-to-list conversion.The imperative while loop approach avoids intermediate collections and preallocates the ArrayList to the correct size, which optimizes performance for a potentially hot code path.
38-38
: Simplified keyBytes conversion.Replaced complex conversion chain with direct utility method call, improving readability and likely performance.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala
Outdated
Show resolved
Hide resolved
@@ -113,6 +113,7 @@ maven_repository = repository( | |||
|
|||
# Avro | |||
"org.apache.avro:avro:1.11.3", | |||
"com.linkedin.avroutil1:avro-fastserde:0.4.25", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dannnggg
d9f7777
to
789342b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
148-152
: Null sentinel pattern retained despite previous feedback.The zipping logic continues to use null when external responses are missing. Consider returning an empty sequence instead.
409-411
: 🛠️ Refactor suggestionEarly exit optimization returns null.
While performance is improved by avoiding unnecessary processing, returning
null
breaks type safety.- if (validRequests.isEmpty) { return Future.successful(null) } + if (validRequests.isEmpty) { return Future.successful(Seq.empty) }
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)
125-125
: Changed log level from debug to info.Consider potential impact on log volume from this higher visibility setting.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (21)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala
(1 hunks)aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
(4 hunks)api/src/main/scala/ai/chronon/api/Extensions.scala
(2 hunks)api/src/main/scala/ai/chronon/api/Row.scala
(1 hunks)api/src/main/scala/ai/chronon/api/TilingUtils.scala
(2 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala
(7 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(1 hunks)maven_install.json
(10 hunks)online/BUILD.bazel
(2 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
(2 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(3 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(8 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(4 hunks)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
(1 hunks)tools/build_rules/dependencies/load_dependencies.bzl
(1 hunks)tools/build_rules/dependencies/maven_repository.bzl
(1 hunks)
✅ Files skipped from review due to trivial changes (3)
- online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
- flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
🚧 Files skipped from review as they are similar to previous changes (16)
- tools/build_rules/dependencies/load_dependencies.bzl
- online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala
- online/BUILD.bazel
- spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
- online/src/main/scala/ai/chronon/online/Api.scala
- api/src/main/scala/ai/chronon/api/TilingUtils.scala
- tools/build_rules/dependencies/maven_repository.bzl
- api/src/main/scala/ai/chronon/api/Extensions.scala
- online/src/main/scala/ai/chronon/online/TileCodec.scala
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala
- online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
- maven_install.json
- online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
- online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
- api/src/main/scala/ai/chronon/api/Row.scala
- online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
⏰ Context from checks skipped due to timeout of 90000ms (36)
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: hub_tests
- GitHub Check: hub_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: orchestration_tests
- GitHub Check: bazel_config_tests
- GitHub Check: groupby_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: orchestration_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (4)
179-180
: Good modularization of derivation logic.Extraction of processing logic into
applyDerivations
improves code maintainability.
191-252
: Well-structured derivation logic with proper fallbacks.The extracted method handles derivation with appropriate fallbacks and error handling. Metrics are correctly incremented for monitoring.
293-317
: Clean separation of logging responsibility.Refactored
logResponse
to delegate encoding and publishing to a dedicated method.
319-378
: Well-extracted logging logic.The new
encodeAndPublishLog
method correctly encapsulates encoding, sampling, and publishing functionality.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (3)
176-186
: Simplified multi-get with bulk reads batcher.Good usage of BigTable's batcher API with proper resource management through
batcher.close()
.
291-293
: Standardized future conversion pattern.Consistently uses
ApiFutureUtils.toCompletableFuture
andFutureConverters.toScala
for cleaner async handling.
358-360
: Consistent future conversion pattern in multiPut.Same standardized conversion approach applied to put operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (2)
166-172
: Utility function for ArrayList creationHelper method to create ArrayList from iterator, but not used in this file.
Consider removing this unused method or using it to reduce code duplication in the primitive array conversions below.
174-244
: Optimized Avro to Chronon row conversionLeverages the new Row.fromCached to create efficient conversion functions with optimizations for:
- Special handling of Avro FastSerde's primitive array types
- Efficient record field iteration with AbstractIterator
- Type-specific optimized converters
Comment on line 190 acknowledges potential performance concerns with case matching.
Consider using the
buildArray
helper to reduce repetition in array conversion cases.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (7)
api/src/main/scala/ai/chronon/api/Row.scala
(1 hunks)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
(3 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(3 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(4 hunks)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
(2 hunks)online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- online/src/main/scala/ai/chronon/online/TileCodec.scala
- online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
- online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
🧰 Additional context used
🧬 Code Graph Analysis (1)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
api/src/main/scala/ai/chronon/api/Row.scala (2)
Row
(72-126)fromCached
(129-203)
⏰ Context from checks skipped due to timeout of 90000ms (36)
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: online_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: hub_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: analyzer_tests
- GitHub Check: hub_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: join_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: batch_tests
- GitHub Check: orchestration_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: bazel_config_tests
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: orchestration_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (1)
300-316
: Optimized and simplified Avro-to-Chronon row conversionUses the precomputed converter function from the serving info rather than doing conversion on the fly, which aligns with PR performance objectives.
online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala (2)
57-57
: Caching converter for better performanceCached converter eliminates repeated construction of conversion logic for the same schema.
69-69
: Simplified row conversion using cached converterReplaces complex mapping logic with simple function call.
api/src/main/scala/ai/chronon/api/Row.scala (2)
126-126
: Added utility identity functionSimple pass-through function for optimization paths.
128-203
: Added efficient cached converter generatorCreates reusable conversion functions instead of performing conversions directly. Major performance optimization through:
- Pre-computing field conversion functions
- Optimizing list traversal when element conversion is identity
- Applying null guards efficiently
This addresses a key performance bottleneck identified in the PR objectives.
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (2)
30-30
: Added FastSerde dependency importIntegrates LinkedIn's fast serialization/deserialization library as noted in PR objectives.
155-164
: Added high-level converter functionClean wrapper that handles null case and type conversion.
9d667ad
to
98819a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
166-172
: Helper method extraction opportunityThis utility could replace duplicate code in array type handlers below.
def buildArray(size: Int, iterator: util.Iterator[Any]): util.ArrayList[Any] = { val arr = new util.ArrayList[Any](size) while (iterator.hasNext) { arr.add(iterator.next()) } arr }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (22)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala
(1 hunks)aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
(4 hunks)api/src/main/scala/ai/chronon/api/Extensions.scala
(2 hunks)api/src/main/scala/ai/chronon/api/Row.scala
(1 hunks)api/src/main/scala/ai/chronon/api/TilingUtils.scala
(2 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(1 hunks)maven_install.json
(10 hunks)online/BUILD.bazel
(2 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
(3 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(3 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(7 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(4 hunks)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
(2 hunks)online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
(1 hunks)tools/build_rules/dependencies/load_dependencies.bzl
(1 hunks)tools/build_rules/dependencies/maven_repository.bzl
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
- tools/build_rules/dependencies/maven_repository.bzl
🚧 Files skipped from review as they are similar to previous changes (19)
- online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
- flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
- online/src/main/scala/ai/chronon/online/Api.scala
- online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala
- online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
- spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
- online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala
- api/src/main/scala/ai/chronon/api/Extensions.scala
- api/src/main/scala/ai/chronon/api/TilingUtils.scala
- online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala
- online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
- tools/build_rules/dependencies/load_dependencies.bzl
- maven_install.json
- api/src/main/scala/ai/chronon/api/Row.scala
- online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
- online/src/main/scala/ai/chronon/online/TileCodec.scala
- online/BUILD.bazel
- online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
⏰ Context from checks skipped due to timeout of 90000ms (36)
- GitHub Check: service_tests
- GitHub Check: hub_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: streaming_tests
- GitHub Check: api_tests
- GitHub Check: hub_tests
- GitHub Check: groupby_tests
- GitHub Check: online_tests
- GitHub Check: online_tests
- GitHub Check: analyzer_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: join_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: spark_tests
- GitHub Check: flink_tests
- GitHub Check: batch_tests
- GitHub Check: streaming_tests
- GitHub Check: bazel_config_tests
- GitHub Check: spark_tests
- GitHub Check: orchestration_tests
- GitHub Check: join_tests
- GitHub Check: orchestration_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (3)
30-30
: Added FastSerde primitive array supportImporting LinkedIn's fastserde primitives for optimized array handling.
155-164
: Optimized converter with cachingCached conversion function improves performance by avoiding redundant schema processing.
174-244
: Optimized cached conversion implementationEfficient implementation ordering cases by frequency and handling specialized primitive arrays.
{ // cases are ordered by most frequent use | ||
// TODO: Leverage type info if this case match proves to be expensive | ||
case doubles: fastavro.PrimitiveDoubleArrayList => | ||
val arr = new util.ArrayList[Any](doubles.size) | ||
val iterator = doubles.iterator() | ||
while (iterator.hasNext) { | ||
arr.add(iterator.next()) | ||
} | ||
arr | ||
|
||
case longs: fastavro.PrimitiveLongArrayList => | ||
val arr = new util.ArrayList[Any](longs.size) | ||
val iterator = longs.iterator() | ||
while (iterator.hasNext) { | ||
arr.add(iterator.next()) | ||
} | ||
arr | ||
|
||
case genericArray: GenericData.Array[Any] => | ||
val arr = new util.ArrayList[Any](genericArray.size) | ||
val iterator = genericArray.iterator() | ||
while (iterator.hasNext) { | ||
arr.add(iterator.next()) | ||
} | ||
arr | ||
|
||
case ints: fastavro.PrimitiveIntArrayList => | ||
val arr = new util.ArrayList[Any](ints.size) | ||
val iterator = ints.iterator() | ||
while (iterator.hasNext) { | ||
arr.add(iterator.next()) | ||
} | ||
arr | ||
|
||
case floats: fastavro.PrimitiveFloatArrayList => | ||
val arr = new util.ArrayList[Any](floats.size) | ||
val iterator = floats.iterator() | ||
while (iterator.hasNext) { | ||
arr.add(iterator.next()) | ||
} | ||
arr | ||
|
||
case bools: fastavro.PrimitiveBooleanArrayList => | ||
val arr = new util.ArrayList[Any](bools.size) | ||
val iterator = bools.iterator() | ||
while (iterator.hasNext) { | ||
arr.add(iterator.next()) | ||
} | ||
arr | ||
|
||
case valueOfUnknownType => | ||
throw new RuntimeException(s"Found unknown list type in avro record: ${valueOfUnknownType.getClass.getName}") | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Repeated array conversion patterns
Each array type uses identical conversion pattern. Use helper method.
{ // cases are ordered by most frequent use
// TODO: Leverage type info if this case match proves to be expensive
case doubles: fastavro.PrimitiveDoubleArrayList =>
- val arr = new util.ArrayList[Any](doubles.size)
- val iterator = doubles.iterator()
- while (iterator.hasNext) {
- arr.add(iterator.next())
- }
- arr
+ buildArray(doubles.size, doubles.iterator())
case longs: fastavro.PrimitiveLongArrayList =>
- val arr = new util.ArrayList[Any](longs.size)
- val iterator = longs.iterator()
- while (iterator.hasNext) {
- arr.add(iterator.next())
- }
- arr
+ buildArray(longs.size, longs.iterator())
case genericArray: GenericData.Array[Any] =>
- val arr = new util.ArrayList[Any](genericArray.size)
- val iterator = genericArray.iterator()
- while (iterator.hasNext) {
- arr.add(iterator.next())
- }
- arr
+ buildArray(genericArray.size, genericArray.iterator())
case ints: fastavro.PrimitiveIntArrayList =>
- val arr = new util.ArrayList[Any](ints.size)
- val iterator = ints.iterator()
- while (iterator.hasNext) {
- arr.add(iterator.next())
- }
- arr
+ buildArray(ints.size, ints.iterator())
case floats: fastavro.PrimitiveFloatArrayList =>
- val arr = new util.ArrayList[Any](floats.size)
- val iterator = floats.iterator()
- while (iterator.hasNext) {
- arr.add(iterator.next())
- }
- arr
+ buildArray(floats.size, floats.iterator())
case bools: fastavro.PrimitiveBooleanArrayList =>
- val arr = new util.ArrayList[Any](bools.size)
- val iterator = bools.iterator()
- while (iterator.hasNext) {
- arr.add(iterator.next())
- }
- arr
+ buildArray(bools.size, bools.iterator())
Committable suggestion skipped: line range outside the PR's diff.
## Summary guided by flamegraph from one of our customers - pre-compute window.millis - eliminate branching in AvroConversions.toChrononRow - by pre-building a row generator func `Row.fromCached` - optimize `expandWindowedTileIr` by pre-computing the expander indexes. - bring-in fast-serde from linkedin which does generic data decoding, by pre-generating decoder classes. - optimize FetcherCache.getBatchBytes logic - early exit when externalParts are null - bigtablekvstore impl optimizations on how we construct the keys ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new method for converting cached data to Chronon row format. - Introduced a public field to expose the smallest tail hop duration in milliseconds for group-by serving information. - **Bug Fixes** - Improved error handling and null safety in batch response processing and fetcher logic. - Enhanced logging with better exception handling and sampling in online fetcher responses. - Added exception logging around synchronous KVStore response retrieval. - **Refactor** - Optimized windowing and mapping logic for performance and clarity in several aggregation and expansion methods. - Modularized derivation and logging logic in the fetcher for better maintainability. - Updated method and field names for clarity regarding window resolution and tail hop durations. - Refactored Avro serialization/deserialization to use LinkedIn Fast Avro SerDe with cached conversion functions. - Improved conversion utilities for Avro to Chronon row representations. - Centralized Scala-to-Java byte array conversions in tiling utilities. - **Chores** - Added LinkedIn Fast Avro SerDe as a dependency and updated related build and dependency files. - Updated utility methods for more consistent data type conversions. - **Style** - Improved code comments and documentation for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary guided by flamegraph from one of our customers - pre-compute window.millis - eliminate branching in AvroConversions.toChrononRow - by pre-building a row generator func `Row.fromCached` - optimize `expandWindowedTileIr` by pre-computing the expander indexes. - bring-in fast-serde from linkedin which does generic data decoding, by pre-generating decoder classes. - optimize FetcherCache.getBatchBytes logic - early exit when externalParts are null - bigtablekvstore impl optimizations on how we construct the keys ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new method for converting cached data to Chronon row format. - Introduced a public field to expose the smallest tail hop duration in milliseconds for group-by serving information. - **Bug Fixes** - Improved error handling and null safety in batch response processing and fetcher logic. - Enhanced logging with better exception handling and sampling in online fetcher responses. - Added exception logging around synchronous KVStore response retrieval. - **Refactor** - Optimized windowing and mapping logic for performance and clarity in several aggregation and expansion methods. - Modularized derivation and logging logic in the fetcher for better maintainability. - Updated method and field names for clarity regarding window resolution and tail hop durations. - Refactored Avro serialization/deserialization to use LinkedIn Fast Avro SerDe with cached conversion functions. - Improved conversion utilities for Avro to Chronon row representations. - Centralized Scala-to-Java byte array conversions in tiling utilities. - **Chores** - Added LinkedIn Fast Avro SerDe as a dependency and updated related build and dependency files. - Updated utility methods for more consistent data type conversions. - **Style** - Improved code comments and documentation for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary guided by flamegraph from one of our customers - pre-compute window.millis - eliminate branching in AvroConversions.toChrononRow - by pre-building a row generator func `Row.fromCached` - optimize `expandWindowedTileIr` by pre-computing the expander indexes. - bring-in fast-serde from linkedin which does generic data decoding, by pre-generating decoder classes. - optimize FetcherCache.getBatchBytes logic - early exit when externalParts are null - bigtablekvstore impl optimizations on how we construct the keys ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new method for converting cached data to Chronon row format. - Introduced a public field to expose the smallest tail hop duration in milliseconds for group-by serving information. - **Bug Fixes** - Improved error handling and null safety in batch response processing and fetcher logic. - Enhanced logging with better exception handling and sampling in online fetcher responses. - Added exception logging around synchronous KVStore response retrieval. - **Refactor** - Optimized windowing and mapping logic for performance and clarity in several aggregation and expansion methods. - Modularized derivation and logging logic in the fetcher for better maintainability. - Updated method and field names for clarity regarding window resolution and tail hop durations. - Refactored Avro serialization/deserialization to use LinkedIn Fast Avro SerDe with cached conversion functions. - Improved conversion utilities for Avro to Chronon row representations. - Centralized Scala-to-Java byte array conversions in tiling utilities. - **Chores** - Added LinkedIn Fast Avro SerDe as a dependency and updated related build and dependency files. - Updated utility methods for more consistent data type conversions. - **Style** - Improved code comments and documentation for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary guided by flamegraph from one of our customers - pre-compute window.millis - eliminate branching in AvroConversions.toChrononRow - by pre-building a row generator func `Row.fromCached` - optimize `expandWindowedTileIr` by pre-computing the expander indexes. - bring-in fast-serde from linkedin which does generic data decoding, by pre-generating decoder classes. - optimize FetcherCache.getBatchBytes logic - early exit when externalParts are null - bigtablekvstore impl optimizations on how we construct the keys ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new method for converting cached data to Chronon row format. - Introduced a public field to expose the smallest tail hop duration in milliseconds for group-by serving information. - **Bug Fixes** - Improved error handling and null safety in batch response processing and fetcher logic. - Enhanced logging with better exception handling and sampling in online fetcher responses. - Added exception logging around synchronous KVStore response retrieval. - **Refactor** - Optimized windowing and mapping logic for performance and clarity in several aggregation and expansion methods. - Modularized derivation and logging logic in the fetcher for better maintainability. - Updated method and field names for clarity regarding window resolution and tail hop durations. - Refactored Avro serialization/deserialization to use LinkedIn Fast Avro SerDe with cached conversion functions. - Improved conversion utilities for Avro to Chronon row representations. - Centralized Scala-to-Java byte array conversions in tiling utilities. - **Chores** - Added LinkedIn Fast Avro SerDe as a dependency and updated related build and dependency files. - Updated utility methods for more consistent data type conversions. - **Style** - Improved code comments and documentation for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary guided by flamegraph from one of our customers - pre-compute window.millis - eliminate branching in AvroConversions.toChrononRow - by pre-building a row generator func `Row.fromCached` - optimize `expandWindowedTileIr` by pre-computing the expander indexes. - bring-in fast-serde from linkedin which does generic data decoding, by pre-generating decoder classes. - optimize FetcherCache.getBatchBytes logic - early exit when externalParts are null - bigtablekvstore impl optimizations on how we construct the keys ## Cheour clientslist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new method for converting cached data to Chronon row format. - Introduced a public field to expose the smallest tail hop duration in milliseconds for group-by serving information. - **Bug Fixes** - Improved error handling and null safety in batch response processing and fetcher logic. - Enhanced logging with better exception handling and sampling in online fetcher responses. - Added exception logging around synchronous KVStore response retrieval. - **Refactor** - Optimized windowing and mapping logic for performance and clarity in several aggregation and expansion methods. - Modularized derivation and logging logic in the fetcher for better maintainability. - Updated method and field names for clarity regarding window resolution and tail hop durations. - Refactored Avro serialization/deserialization to use LinkedIn Fast Avro SerDe with cached conversion functions. - Improved conversion utilities for Avro to Chronon row representations. - Centralized Scala-to-Java byte array conversions in tiling utilities. - **Chores** - Added LinkedIn Fast Avro SerDe as a dependency and updated related build and dependency files. - Updated utility methods for more consistent data type conversions. - **Style** - Improved code comments and documentation for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
guided by flamegraph from one of our customers
Row.fromCached
expandWindowedTileIr
by pre-computing the expander indexes.Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Chores
Style