-
Notifications
You must be signed in to change notification settings - Fork 0
refactor: split fetcher logic into multiple files #425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe PR introduces a new Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant F as Fetcher (JoinPartFetcher/GroupByFetcher)
participant FC as FetchContext
participant M as MetadataStore
participant S as SerdeUtils
C->>F: Submit data request (GroupBy/Join)
F->>FC: Wraps configuration into FetchContext
F->>M: Retrieve metadata & join config via FC
M-->>F: Return metadata
F->>S: Serialize/Deserialize using compact serializers
F-->>C: Return processed response
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
||
def serializeTileKey(key: TileKey): Array[Byte] = { | ||
binarySerializer.get().serialize(key) | ||
SerdeUtils.compactSerializer.get().serialize(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no bugs here - just needed thread local
@@ -306,7 +306,7 @@ object FlinkJob { | |||
val kafkaBootstrap = jobArgs.kafkaBootstrap.toOption | |||
|
|||
val api = buildApi(onlineClassName, props) | |||
val metadataStore = new MetadataStore(api.genKvStore, MetadataDataset, timeoutMillis = 10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10k is the default - there was a compiler warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (12)
online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1)
8-36
: LGTM! Consider adding class documentation.Well-structured case class with clear responsibilities and thread-safe operations.
Add scaladoc to describe the class purpose and parameters:
+/** + * Context for fetch operations containing configuration and utilities. + * + * @param kvStore Key-value store for data access + * @param metadataDataset Dataset for metadata storage + * @param timeoutMillis Timeout for fetch operations + * @param debug Enable debug mode + * @param flagStore Store for feature flags + * @param disableErrorThrows Disable error throwing + * @param executionContextOverride Custom execution context + */online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (1)
60-63
: Consider local or lazy init instead of vars.online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (2)
38-113
: Consider using a monotonic clock for duration measurement.- val startTimeMs = System.currentTimeMillis() + val startTimeMs = System.nanoTime()
115-147
: Using random sampling for logging could be replaced with debug logs.online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (4)
127-131
: Put request.
No explicit error handling if put fails. Consider adding.recoverWith
.
263-265
: Potential.get
throw.
Consider safer handling over.get
to avoid unexpected errors.
343-343
: multiPut usage.
No.recover
if partial writes fail. Possibly handle failures.
352-352
: Create dataset.
Async, no guarantee on success. Consider wait or verification.online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (3)
125-127
: New fetchGroupBys
Consider adding dedicated tests.
145-146
: Assertion context
Include request ID for clarity.- s"Logic error. Responses are not aligned..." + s"Logic error (req=$requestId). Responses are not aligned..."
326-326
: joinConfTry
Log missing conf for debug.online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (1)
71-169
: mergeWithStreaming
Add tests if possible.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (54)
api/src/main/scala/ai/chronon/api/SerdeUtils.scala
(1 hunks)api/src/main/scala/ai/chronon/api/TilingUtils.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(4 hunks)flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala
(1 hunks)flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/AvroConversions.scala
(2 hunks)online/src/main/scala/ai/chronon/online/CompatParColls.scala
(0 hunks)online/src/main/scala/ai/chronon/online/Extensions.scala
(1 hunks)online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala
(1 hunks)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
(5 hunks)online/src/main/scala/ai/chronon/online/JoinCodec.scala
(2 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(12 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala
(0 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
(3 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
(11 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(2 hunks)online/src/main/scala/ai/chronon/online/stats/DriftStore.scala
(5 hunks)online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala
(11 hunks)online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala
(5 hunks)online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala
(1 hunks)online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(4 hunks)spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
(4 hunks)spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
(5 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala
(2 hunks)
💤 Files with no reviewable changes (2)
- online/src/main/scala/ai/chronon/online/CompatParColls.scala
- online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala
✅ Files skipped from review due to trivial changes (15)
- spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
- online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala
- online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala
- online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala
- online/src/main/scala/ai/chronon/online/TileCodec.scala
- flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
- spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
- spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala
- flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
- spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala
- spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
- flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala
- online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
🔇 Additional comments (137)
online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala (2)
17-25
: Clean package organization and import consolidation.
105-117
: Good refactoring of decode methods.Clean extraction of common functionality into
decodeArray
with functional composition indecodeMap
.spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala (1)
108-108
: LGTM! Good architectural separation.Moving join configuration to a dedicated metadata store improves modularity.
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1)
78-78
: LGTM! Consistent implementation.Change aligns with the metadata store pattern used in ObservabilityDemoDataLoader.
online/src/main/scala/ai/chronon/online/Extensions.scala (1)
51-51
: LGTM! Better type specificity.Using serde.AvroCodec provides clearer serialization implementation intent.
spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala (1)
3-3
: LGTM! Better serialization organization.Consolidating serialization utilities in SerdeUtils improves code maintainability.
Also applies to: 40-40
spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala (2)
98-98
: LGTM! Improved metadata handling.The change to use
metadataStore
aligns with the refactoring goals.
103-106
: LGTM! Better integer instantiation.Using
Integer.valueOf()
instead ofnew Integer()
improves performance through value caching.spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala (1)
92-92
: LGTM! Improved metadata handling.The change to use
metadataStore
aligns with the refactoring goals.online/src/main/scala/ai/chronon/online/AvroConversions.scala (1)
186-186
: LGTM! Better package organization.The change to use
serde.AvroCodec
improves code organization.Also applies to: 196-196
spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala (1)
83-83
: LGTM! Improved metadata handling.The change to use
metadataStore
aligns with the refactoring goals.api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1)
6-26
: LGTM! Thread-safe serialization implementation.The implementation correctly uses ThreadLocal for thread safety and lazy initialization for performance optimization.
online/src/main/scala/ai/chronon/online/Api.scala (2)
81-94
: LGTM! Improved error handling in getString.The error handling now provides better context for debugging.
96-109
: LGTM! Enhanced error handling in getStringArray.The error handling is now consistent with getString method.
spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (1)
117-117
: LGTM! Simplified MetadataStore access.Using fetcher.metadataStore aligns with the refactoring goals.
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (2)
87-87
: LGTM! Consistent MetadataStore access.Using fetcher.metadataStore maintains consistency with other changes.
172-175
: LGTM! Improved method chaining.The chained method calls improve readability while maintaining the same functionality.
spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (3)
24-24
: LGTM!Import change aligns with the refactoring of fetcher logic.
90-90
: LGTM!Return type change maintains consistency with the fetcher package.
99-99
: LGTM!Return statements correctly updated to use fetcher.DataMetrics.
Also applies to: 138-138
spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (2)
191-191
: LGTM!Centralizes metadata handling through metadataStore.
217-217
: LGTM!Consistently uses metadataStore for serving info retrieval.
spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala (1)
424-424
: LGTM!Consistently uses fetcher.metadataStore, aligning with the refactoring.
spark/src/main/scala/ai/chronon/spark/Driver.scala (3)
31-31
: LGTM!Import statement is now more specific, aligning with the refactoring objective.
464-472
: LGTM!Multi-line formatting improves readability.
613-615
: LGTM!FetchContext encapsulates KV store and metadata dataset, improving code organization.
online/src/main/scala/ai/chronon/online/JoinCodec.scala (3)
30-30
: LGTM!Import statement is appropriately specific.
37-38
: LGTM!Parameter types consistently use serde.AvroCodec.
92-92
: LGTM!Method signature consistently uses serde.AvroCodec.
spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (3)
26-27
: LGTM!Import statements are well-organized and specific.
241-241
: LGTM!MetadataStore instantiation properly uses FetchContext.
266-266
: LGTM!Using Long.valueOf is more efficient as it may reuse cached values.
spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala (2)
20-20
: LGTM!Import statement is appropriately specific.
117-117
: LGTM!Return type correctly uses fetcher.DataMetrics.
spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala (2)
23-23
: Looks good.
109-109
: Neat usage of fetch context.api/src/main/scala/ai/chronon/api/TilingUtils.scala (3)
4-4
: Import is consistent.
13-13
: Ensure no null key.
18-18
: Check bytes not null.online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (25)
29-29
: Import is fine.
71-73
: Test setup is logical.
88-88
: Mock is correct.
90-92
: Tests look fine.
97-97
: Good verification.
118-118
: Same mocking approach.
120-120
: Batch fetch logic is consistent.
127-127
: Similar verification to line 97.
128-128
: Capturing requests is fine.
147-147
: Repeated mocking approach.
149-149
: Same fetch logic.
158-158
: Same verification step.
159-159
: Looks fine.
170-170
: Neat stubbing.
174-178
: Good coverage of serving info.
185-185
: Mocking is valid.
186-186
: Setup is straightforward.
197-202
: Checks caching scenario properly.
222-222
: Mock is fine.
223-223
: Includes flagStore, looks good.
224-226
: Valid usage of fetcher with new store.
232-234
: Cache flags tested well.
238-239
: Mock usage is fine.
253-253
: Constructor usage is consistent.
269-269
: Same approach.online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala (3)
12-12
: LGTM! Type updates align with refactoring goals.The change from
GroupByRequestMeta
toLambdaKvRequest
is consistent across the codebase.Also applies to: 124-125
266-266
: LGTM! Codec update aligns with serialization changes.Updated to use
serde.AvroCodec
consistently.
159-160
: LGTM! Test case updates maintain type consistency.Test cases properly updated to use
LambdaKvRequest
.spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java (2)
22-22
: LGTM! Import update aligns with refactoring.Updated to use
FetchTypes
consistently.
42-42
: LGTM! Class and type updates maintain consistency.Class name and response type properly updated to use
FetchTypes
.Also applies to: 73-73
online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (2)
6-7
: LGTM! Import cleanup and method signature updates.Improved code organization with better type definitions and helper methods.
Also applies to: 113-117
123-145
: LGTM! Request handling logic maintains functionality.Updated to use
LambdaKvRequest
while preserving core caching behavior.flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
24-24
: LGTM! Type annotations improve clarity.Added explicit Int types for constants.
Also applies to: 246-246, 257-257, 261-261
309-309
: LGTM! MetadataStore initialization updated.Using
FetchContext
consistently for initialization.spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (3)
24-26
: Imports look aligned with new structure.
266-268
: Clear calls might need synchronization if invoked concurrently.Ensure no races occur when multiple threads clear caches simultaneously.
328-329
: Instantiating FetchContext and MetadataStore is consistent with refactoring.spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (4)
28-29
: New imports match refactored classes.
80-80
: Context-based instantiation is a cleaner approach.
101-102
: Good: consistent pattern for directoryMetadataStore.
743-743
: MetadataStore usage is now centralized via FetchContext.online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (2)
1-15
: License header is standard.
17-37
: Class and imports well-structured.online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (12)
17-17
: No issues here.
19-19
: Star import.
No pipeline flags; all good.
27-28
: Imports added.
Looks consistent with the rest of the code.
32-33
: New collection imports.
All fine.
35-35
: Utility imports.
Nothing problematic.
56-56
: Refactored constructor.
Single-parameter approach is neater.
73-73
: Implicit ExecutionContext.
Looks correct for Future usage.
87-88
: KV fetch logic.
Exception handling is present via.recoverWith
.
99-99
: Method scope changed to private.
Ensure no external classes rely on it.
102-103
: Fetching string array.
Implementation is straightforward.
258-258
: Switch to serde.AvroCodec.
Consistent with the new approach.
274-275
: TTLCache usage.
Seems valid for caching codec objects.online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (12)
1-2
: Package line & blank line.
New file starts cleanly.
3-16
: Imports block.
Covers aggregator, concurrency, and logging.
18-20
: Doc block for class.
Helpful context for the fetcher.
22-23
: Class declaration.
Straightforward usage ofFetchContext
andMetadataStore
.
25-25
: Implicit execution context.
Works for async tasks.
27-27
: Logger instance.
No concerns.
29-40
: isCachingEnabled method.
Simple boolean logic, no issues.
42-113
: toLambdaKvRequest method.
Encodes keys, handles fallback casting. Nicely structured.
114-152
: attemptDerivations method.
Gracefully handles derivation failures.
154-172
: fetchGroupBys method.
MultiGet logic, caching checks, robust usage of Futures.Also applies to: 173-185, 188-272
274-335
: fetchColumns method.
Batch queries for groupBy data. Good flow.
336-342
: LambdaKvRequest case class.
Clear and concise.online/src/main/scala/ai/chronon/online/stats/DriftStore.scala (9)
4-4
: Extensions import.
No concerns here.
7-7
: Observability import.
Needed for logging/metrics.
10-10
: Fetcher imports.
Pulls inFetchContext
&MetadataStore
.
13-14
: Concurrency & Try usage.
Looks valid.
18-18
: metadataDataset param.
Default set toConstants.MetadataDataset
.
20-22
: FetchContext & MetadataStore initialization.
Centralizes config neatly.
188-188
: getDriftSeries signature.
Uses stored join configs.
194-194
: Retrieving JoinConf.
map
usage ensures non-blocking flow.
210-210
: getSummarySeries with joinConf.
Access pattern is consistent with the rest.online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (11)
26-27
: Imports look fine
No issues.
33-33
: No concerns
36-36
: Tailrec import
Seems fine.
88-90
: Check concurrency usage
Ensure overriding doesn't introduce thread-safety risks.
92-97
: fetchContext usage
Looks good.
112-112
: joinCodecCache
No issues.
129-131
: fetchJoin override
Check ifjoinConf
usage aligns with metadata.
210-211
: Using serde.AvroCodec
Matches overall refactor.
234-239
: retry logic
Double-check edge case when tries=0.
316-317
: fetchExternal signature
No concerns.
348-352
: onlineExternalParts
Implementation looks fine.online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (6)
24-24
: ListOps import
No issues.
28-29
: serde.AvroCodec
Migration looks correct.
49-49
: toScala call
No problems.
83-84
: AvroCodec references
Change is consistent.Also applies to: 86-87
96-101
: Conditional aggregator logic
Seems fine.
127-127
: mutationValueAvroCodec
No concerns.online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (5)
1-16
: Class setup
Implementation is straightforward.
18-26
: Logger & RequestContext
Looks okay.
28-69
: decodeAndMerge
Ensure empty streaming is handled correctly.
171-211
: mergeTiledIrsFromStreaming
Logic seems sound.
213-285
: reportKvResponse, getServingInfo, toBatchIr
All looks fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
.bazelrc (1)
9-11
: Update comment to reflect new default Scala version.Comment incorrectly states default is 2.12.
-# Default scala version to 2.12 +# Default scala version to 2.13online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (5)
21-28
: Consider typed keys.
ReplacingMap[String, Any]
with typed structures could reduce runtime errors.
72-98
: Avoid returning null.
Use anOption
return to align with Scala idioms.Example fix:
- return null + return None
159-167
: Reuse Gson.
Creating a newGson()
per debug call might impact performance. Cache it if frequently used.
214-225
: Track zero responses.
If response is empty, consider logging zero metrics for better observability.
266-285
: Return Option from toBatchIr.
Returningnull
can be error-prone in Scala.Example fix:
- if (bytes == null) return null + if (bytes == null) return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
.bazelrc
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: bazel_config_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (4)
17-19
: Review class instantiation carefully.
Ensure no concurrency hazards when reusingfetchContext
ormetadataStore
across requests.
48-55
: Guard empty streaming data.
streamingResponsesOpt
could beSome(Seq.empty)
, skipping merge logic. Double-check edge cases.
133-147
: Validate failed decodes.
Dropping rows on decode failure may hide data issues. Confirm acceptable.
227-264
: Handle forced refresh carefully.
Frequent refresh might be costly. Confirm that forced fetch is needed in all scenarios.
.bazelrc
Outdated
common --repo_env=SCALA_VERSION=2.13.12 | ||
common --define=SCALA_VERSION=2.13.12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify dependency compatibility with Scala 2.13.
Ensure all dependencies support Scala 2.13.12.
🏁 Script executed:
#!/bin/bash
# Check build files for Scala version dependencies
rg -g '*.scala' -g 'BUILD' -g '*.sbt' '2\.12'
Length of output: 126
Action: Align Scala Versions
.bazelrc
(lines 12-13) sets Scala to 2.13.12.- Found
quickstart/mongo-online-impl/build.sbt
using Scala 2.12.12. - Update dependencies for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eyeballed changes - seem to look ok if tests are passing.
One worry is rebasing / cherry-picking / upstreaming stuff to OSS will get super hard.
b87fdbe
to
9cfa908
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (12)
api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1)
3-4
: Consider importing specific Thrift protocols.-import ai.chronon.api.thrift.protocol.{TBinaryProtocol, TCompactProtocol} +import ai.chronon.api.thrift.protocol.TCompactProtocolonline/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (2)
17-36
: Imports are broad.
Consider explicit imports instead of wildcards for clarity.
288-297
: Robust error logging.
Check repeated log calls for spam potential.online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (3)
119-130
: Batch columns fetch.
Logic is correct. More tests might be helpful.
185-205
: TTLCache usage.
Looks consistent. Might confirm test covers multiple calls.
236-237
: FetchContext mocking.
Seems fine. Possibly mock only what's needed.online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (2)
159-272
: Core fetch flow.
Aggregates batch + streaming. Simplify if possible. Consider concurrency checks.
287-334
: Column fetch.
Mapping approach is clean. Possibly unify with fetchGroupBys for dryness.online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (2)
30-36
: Consider constructor injection for easier testing.Instantiating
GroupByFetcher
inside the class limits test flexibility.
135-166
: Prefer logging over println.- if (fetchContext.debug || Math.random() < 0.001) { - println(s"Failed to fetch $groupByRequest with \n${ex.traceString}") - } + if (fetchContext.debug || Math.random() < 0.001) { + logger.warn(s"Failed to fetch $groupByRequest", ex) + }online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2)
29-70
: Consider clarifying return structure.Returning a map without quick reference might be confusing for new developers.
121-170
: Gracefully handle partial decoding failure.Discarding null rows is fine but careful about silent data loss.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (55)
api/src/main/scala/ai/chronon/api/SerdeUtils.scala
(1 hunks)api/src/main/scala/ai/chronon/api/TilingUtils.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(4 hunks)flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala
(1 hunks)flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/AvroConversions.scala
(2 hunks)online/src/main/scala/ai/chronon/online/CompatParColls.scala
(0 hunks)online/src/main/scala/ai/chronon/online/Extensions.scala
(1 hunks)online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala
(1 hunks)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
(5 hunks)online/src/main/scala/ai/chronon/online/JoinCodec.scala
(2 hunks)online/src/main/scala/ai/chronon/online/TTLCache.scala
(1 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(12 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala
(0 hunks)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
(3 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
(11 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(2 hunks)online/src/main/scala/ai/chronon/online/stats/DriftStore.scala
(6 hunks)online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala
(10 hunks)online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala
(5 hunks)online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala
(1 hunks)online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(4 hunks)spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
(4 hunks)spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
(5 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala
(2 hunks)
💤 Files with no reviewable changes (2)
- online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala
- online/src/main/scala/ai/chronon/online/CompatParColls.scala
✅ Files skipped from review due to trivial changes (1)
- online/src/main/scala/ai/chronon/online/TTLCache.scala
🚧 Files skipped from review as they are similar to previous changes (41)
- spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala
- online/src/main/scala/ai/chronon/online/Extensions.scala
- online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala
- online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala
- flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala
- online/src/main/scala/ai/chronon/online/Api.scala
- spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
- spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
- flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
- online/src/main/scala/ai/chronon/online/TileCodec.scala
- flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
- spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
- online/src/main/scala/ai/chronon/online/AvroConversions.scala
- spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
- online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala
- api/src/main/scala/ai/chronon/api/TilingUtils.scala
- spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala
- spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala
- spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
- spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
- spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
- spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
- online/src/main/scala/ai/chronon/online/JoinCodec.scala
- spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java
- spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala
- spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala
- spark/src/main/scala/ai/chronon/spark/Driver.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
- online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
- online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala
- online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala
- spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala
- online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
- spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala
- online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
- flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
- online/src/main/scala/ai/chronon/online/stats/DriftStore.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: streaming_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (45)
api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1)
6-16
:✅ Verification successful
Thread-local implementation looks good!
Clean implementation using thread-local storage and lazy initialization. The @transient annotation correctly prevents serialization of these fields.
However, verify thread safety:
🏁 Script executed:
#!/bin/bash # Check for any concurrent usage of SerdeUtils that might need synchronization rg -A 5 "SerdeUtils\.(compact(De)?[sS]erializer)"Length of output: 3757
Thread-local usage confirmed as safe.
- Lazy initialization with
ThreadLocal
and@transient
is correctly implemented.- Grep results show consistent, isolated usage via
.get()
across the codebase.No further changes needed.
spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala (2)
23-23
: LGTM!Import statement updated to include FetchContext.
109-109
: LGTM!MetadataStore initialization refactored to use FetchContext, simplifying the parameter passing.
online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (3)
113-114
: LGTM!Method signature updated to use LambdaKvRequest, aligning with the refactoring.
116-116
: LGTM!Good practice: Local variable defined to avoid repeated Map.empty calls.
123-124
: LGTM!Case pattern updated to handle LambdaKvRequest type.
Also applies to: 125-125
online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala (3)
124-125
: LGTM!Test case updated to use LambdaKvRequest.
159-160
: LGTM!Test case updated to use LambdaKvRequest.
266-266
: LGTM!Mock updated to use serde.AvroCodec.
Also applies to: 311-311
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (1)
618-618
: LGTM!MetadataStore initialization refactored to use FetchContext.
Also applies to: 743-743
spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (3)
24-26
: Imports look fine.
No obvious duplication or overshadowing issues.
265-269
: Ensure thread safety when clearing caches.
Check for concurrent calls that might lead to race conditions.
328-329
: Good context-based initialization.
Ensure all callers set upfetchContext
properly.online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (8)
56-57
: Constructor refactor looks solid.
Better to have one context object than multiple params.
73-73
: Validate concurrency.
ReusingfetchContext
's ExecutionContext is fine. Confirm no deadlock risk.
87-88
: Single point for KVStore & timeouts.
Check negative or large timeout handling.
99-111
: Method now private.
Ensure no external usage is broken. Possibly update docs.
170-174
: Consolidated dataset usage.
Ensure correct dataset name fromfetchContext.metadataDataset
.
264-273
: Serde usage is consistent.
No issues spotted.
274-279
: Lazy TTLCache.
Consider concurrency if multiple threads initialize it at once.
353-353
: Dataset creation triggered.
Confirm no race with other initializations.online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (10)
29-29
: New imports.
All good. Keep them minimal.
60-61
: Separate fetchers.
Makes logic clearer. Great separation.
63-65
: New context fields.
Check that required fields are all set.
72-76
: Test setup revised.
Spies on specialized fetchers. Good for coverage.
89-99
: Mock fetchGroupBys.
No issues. Straight to the point.
147-164
: Missing response check.
Throws correct exception. Good approach.
168-180
: Serving info refresh.
Ensure concurrency is safe if multiple fetchers refresh simultaneously.
206-233
: Flag-based cache toggle.
Neat. Validate flags for potential misconfig.
251-251
: Validation for null keys.
Logic is minimal but correct.
267-281
: Key missing scenario.
Throws expected exception key. Good coverage.online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (4)
1-41
: Package & caching logic.
Short and clear. No major issues.
42-113
: Key encoding approach.
Casts keys if parse fails. Very defensive. Good job.
114-153
: Derivative handling.
Catches exceptions and reverts to rename-only. Smart fallback.
338-343
: Case class for requests.
Sufficiently clear. No concerns.online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (4)
37-40
: Pass-through method looks fine.
41-46
: Fetch columns delegation is straightforward.
47-50
: Method usage looks correct.
51-53
: Cache size check approach is clear.online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (6)
17-27
: Case class usage is concise.
72-119
: Verify error handling on potential Future failures.Ensure no uncaught exceptions occur during merging.
172-212
: Tiled merges look correct.
214-226
: Null guard is acceptable here.
227-265
: Refresh logic is solid for batch updates.
266-286
: Check for potential null return at line 267.Caller logic must handle a null
FinalBatchIr
.
def fetchJoins(requests: Seq[Request], joinConf: Option[Join] = None): Future[Seq[Response]] = { | ||
val startTimeMs = System.currentTimeMillis() | ||
// convert join requests to groupBy requests | ||
val joinDecomposed: Seq[(Request, Try[Seq[Either[PrefixedRequest, KeyMissingException]]])] = | ||
requests.map { request => | ||
// use passed-in join or fetch one | ||
val joinTry: Try[JoinOps] = joinConf | ||
.map(conf => Success(JoinOps(conf))) | ||
.getOrElse(metadataStore.getJoinConf(request.name)) | ||
|
||
var joinContext: Option[Metrics.Context] = None | ||
|
||
val decomposedTry = joinTry.map { join => | ||
joinContext = Some(Metrics.Context(Metrics.Environment.JoinFetching, join.join)) | ||
joinContext.get.increment("join_request.count") | ||
|
||
join.joinPartOps.map { part => | ||
val joinContextInner = Metrics.Context(joinContext.get, part) | ||
val missingKeys = part.leftToRight.keys.filterNot(request.keys.contains) | ||
|
||
if (missingKeys.nonEmpty) { | ||
Right(KeyMissingException(part.fullPrefix, missingKeys.toSeq, request.keys)) | ||
} else { | ||
val rightKeys = part.leftToRight.map { case (leftKey, rightKey) => rightKey -> request.keys(leftKey) } | ||
Left( | ||
PrefixedRequest( | ||
part.fullPrefix, | ||
Request(part.groupBy.getMetaData.getName, rightKeys, request.atMillis, Some(joinContextInner)))) | ||
} | ||
|
||
} | ||
} | ||
request.copy(context = joinContext) -> decomposedTry | ||
} | ||
|
||
val groupByRequests = joinDecomposed.flatMap { case (_, gbTry) => | ||
gbTry match { | ||
case Failure(_) => Iterator.empty | ||
case Success(requests) => requests.iterator.flatMap(_.left.toOption).map(_.request) | ||
} | ||
} | ||
|
||
val groupByResponsesFuture = groupByFetcher.fetchGroupBys(groupByRequests) | ||
|
||
// re-attach groupBy responses to join | ||
groupByResponsesFuture | ||
.map { groupByResponses => | ||
val responseMap = groupByResponses.iterator.map { response => response.request -> response.values }.toMap | ||
val responses = joinDecomposed.iterator.map { case (joinRequest, decomposedRequestsTry) => | ||
val joinValuesTry = decomposedRequestsTry.map { groupByRequestsWithPrefix => | ||
groupByRequestsWithPrefix.iterator.flatMap { | ||
|
||
case Right(keyMissingException) => | ||
Map(keyMissingException.requestName + "_exception" -> keyMissingException.getMessage) | ||
|
||
case Left(PrefixedRequest(prefix, groupByRequest)) => | ||
parseGroupByResponse(prefix, groupByRequest, responseMap) | ||
}.toMap | ||
|
||
} | ||
joinValuesTry match { | ||
case Failure(ex) => joinRequest.context.foreach(_.incrementException(ex)) | ||
case Success(responseMap) => | ||
joinRequest.context.foreach { ctx => | ||
ctx.distribution("response.keys.count", responseMap.size) | ||
} | ||
} | ||
joinRequest.context.foreach { ctx => | ||
ctx.distribution("internal.latency.millis", System.currentTimeMillis() - startTimeMs) | ||
ctx.increment("internal.request.count") | ||
} | ||
Response(joinRequest, joinValuesTry) | ||
}.toSeq | ||
responses | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
This method is lengthy.
Splitting logic (join decomposition, groupBy fetching, response mapping) would boost clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nikhil-zlai Understood! I'll move on from the refactoring suggestion. Let me know if you need any other assistance!
## Summary fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement. Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith Made Metadata methods more explicit FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler Added fetch context - to replace 10 constructor params ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling. - Added a new `FetchContext` class to manage fetching operations and execution contexts. - Implemented a new `GroupByFetcher` class for efficient group-by data retrieval. - **Refactor** - Upgraded serialization and deserialization to use a more efficient, compact protocol. - Standardized API definitions and type declarations across modules to improve clarity and maintainability. - Enhanced error handling in various methods to provide more informative messages. - **Chores** - Removed outdated utilities and reorganized dependency imports. - Updated test suites to align with the refactored architecture. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement. Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith Made Metadata methods more explicit FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler Added fetch context - to replace 10 constructor params ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling. - Added a new `FetchContext` class to manage fetching operations and execution contexts. - Implemented a new `GroupByFetcher` class for efficient group-by data retrieval. - **Refactor** - Upgraded serialization and deserialization to use a more efficient, compact protocol. - Standardized API definitions and type declarations across modules to improve clarity and maintainability. - Enhanced error handling in various methods to provide more informative messages. - **Chores** - Removed outdated utilities and reorganized dependency imports. - Updated test suites to align with the refactored architecture. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement. Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith Made Metadata methods more explicit FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler Added fetch context - to replace 10 constructor params ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling. - Added a new `FetchContext` class to manage fetching operations and execution contexts. - Implemented a new `GroupByFetcher` class for efficient group-by data retrieval. - **Refactor** - Upgraded serialization and deserialization to use a more efficient, compact protocol. - Standardized API definitions and type declarations across modules to improve clarity and maintainability. - Enhanced error handling in various methods to provide more informative messages. - **Chores** - Removed outdated utilities and reorganized dependency imports. - Updated test suites to align with the refactored architecture. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement. Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith Made Metadata methods more explicit FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler Added fetch context - to replace 10 constructor params ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling. - Added a new `FetchContext` class to manage fetching operations and execution contexts. - Implemented a new `GroupByFetcher` class for efficient group-by data retrieval. - **Refactor** - Upgraded serialization and deserialization to use a more efficient, compact protocol. - Standardized API definitions and type declarations across modules to improve clarity and maintainability. - Enhanced error handling in various methods to provide more informative messages. - **Chores** - Removed outdated utilities and reorganized dependency imports. - Updated test suites to align with the refactored architecture. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement. Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith Made Metadata methods more explicit FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler Added fetch context - to replace 10 constructor params ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling. - Added a new `FetchContext` class to manage fetching operations and execution contexts. - Implemented a new `GroupByFetcher` class for efficient group-by data retrieval. - **Refactor** - Upgraded serialization and deserialization to use a more efficient, compact protocol. - Standardized API definitions and type declarations across modules to improve clarity and maintainability. - Enhanced error handling in various methods to provide more informative messages. - **Chores** - Removed outdated utilities and reorganized dependency imports. - Updated test suites to align with the refactored architecture. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement. Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith Made Metadata methods more explicit FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler Added fetch context - to replace 10 constructor params ## Cheour clientslist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling. - Added a new `FetchContext` class to manage fetching operations and execution contexts. - Implemented a new `GroupByFetcher` class for efficient group-by data retrieval. - **Refactor** - Upgraded serialization and deserialization to use a more efficient, compact protocol. - Standardized API definitions and type declarations across modules to improve clarity and maintainability. - Enhanced error handling in various methods to provide more informative messages. - **Chores** - Removed outdated utilities and reorganized dependency imports. - Updated test suites to align with the refactored architecture. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement.
Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params
Checklist
Summary by CodeRabbit
FetchContext
class to manage fetching operations and execution contexts.GroupByFetcher
class for efficient group-by data retrieval.