Skip to content

refactor: split fetcher logic into multiple files #425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 25, 2025

Conversation

nikhil-zlai
Copy link
Contributor

@nikhil-zlai nikhil-zlai commented Feb 24, 2025

Summary

fetcher has grown over time into a large file with many large functions that are hard to work with. This refactoring doesn't change any functionality - just placement.

Made some of the scala code more idiomatic - if(try.isFailed) - vs try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features
    • Introduced a unified configuration context that enhances data fetching, including improved group-by and join operations with more robust error handling.
    • Added a new FetchContext class to manage fetching operations and execution contexts.
    • Implemented a new GroupByFetcher class for efficient group-by data retrieval.
  • Refactor
    • Upgraded serialization and deserialization to use a more efficient, compact protocol.
    • Standardized API definitions and type declarations across modules to improve clarity and maintainability.
    • Enhanced error handling in various methods to provide more informative messages.
  • Chores
    • Removed outdated utilities and reorganized dependency imports.
    • Updated test suites to align with the refactored architecture.

Copy link

coderabbitai bot commented Feb 24, 2025

Walkthrough

The PR introduces a new SerdeUtils object providing thread-local Thrift-based serializers/deserializers and updates various modules to use its compact serializers. It refactors MetadataStore initialization by wrapping configuration into a new FetchContext, updates import paths (e.g., for AvroCodec and ArrayRow), and adjusts types throughout. Additionally, new fetcher-related classes (GroupByFetcher, GroupByResponseHandler, JoinPartFetcher) are added while obsolete files are removed and tests are updated accordingly.

Changes

File(s) Change Summary
api/.../SerdeUtils.scala Added a new object with lazy thread-local instances for binary and compact serialization.
api/.../TilingUtils.scala, online/.../AvroConversions.scala, online/.../Extensions.scala, spark/.../LoggingSchema.scala, online/.../TileCodec.scala Replaced old serializer/deserializer and updated AvroCodec usage/imports to use the new SerdeUtils and serde.AvroCodec.
flink/.../FlinkJob.scala, spark/.../Driver.scala, online/.../Api.scala, online/.../DriftStore.scala, online/.../MetadataStore.scala, online/.../FetchContext.scala Refactored MetadataStore initialization to use a new FetchContext; added explicit type declarations and updated error handling.
online/.../Fetcher.scala, online/.../GroupByFetcher.scala, online/.../GroupByResponseHandler.scala, online/.../JoinPartFetcher.scala, online/.../FetcherCache.scala, (FetcherBase.scala removed) Updated Fetcher methods to use FetchContext and SerdeUtils, added new classes for group-by and join operations, and streamlined caching/error handling.
flink/.../TestFlinkJob.scala, flink/.../FlinkRowAggregators.scala, online/.../AvroDeSerializationSupportSpec.scala, plus various test files under spark/src/test/... Updated imports and instantiation in tests to match new MetadataStore, AvroCodec paths, and serializer usage; adjusted types for request metadata.
Various files (e.g., online/.../ExternalSourceRegistry.scala, spark/.../CompareJob.scala, spark/.../ObservabilityDemo*.scala) Consolidated import statements, syntax, and comment formatting for improved clarity and consistency.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant F as Fetcher (JoinPartFetcher/GroupByFetcher)
    participant FC as FetchContext
    participant M as MetadataStore
    participant S as SerdeUtils
    C->>F: Submit data request (GroupBy/Join)
    F->>FC: Wraps configuration into FetchContext
    F->>M: Retrieve metadata & join config via FC
    M-->>F: Return metadata
    F->>S: Serialize/Deserialize using compact serializers
    F-->>C: Return processed response
Loading

Possibly related PRs

Suggested reviewers

  • piyush-zlai
  • david-zlai
  • varant-zlai

Poem

In code we trust, our types refined,
New serializers bright, in thread-local kind.
FetchContext leads the way,
Metadata flows, clear as day.
Join and GroupBy in harmony align 😊
CodeRabbit’s spirit makes our changes shine!

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.


def serializeTileKey(key: TileKey): Array[Byte] = {
binarySerializer.get().serialize(key)
SerdeUtils.compactSerializer.get().serialize(key)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no bugs here - just needed thread local

@@ -306,7 +306,7 @@ object FlinkJob {
val kafkaBootstrap = jobArgs.kafkaBootstrap.toOption

val api = buildApi(onlineClassName, props)
val metadataStore = new MetadataStore(api.genKvStore, MetadataDataset, timeoutMillis = 10000)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10k is the default - there was a compiler warning

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (12)
online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1)

8-36: LGTM! Consider adding class documentation.

Well-structured case class with clear responsibilities and thread-safe operations.

Add scaladoc to describe the class purpose and parameters:

+/**
+ * Context for fetch operations containing configuration and utilities.
+ *
+ * @param kvStore Key-value store for data access
+ * @param metadataDataset Dataset for metadata storage
+ * @param timeoutMillis Timeout for fetch operations
+ * @param debug Enable debug mode
+ * @param flagStore Store for feature flags
+ * @param disableErrorThrows Disable error throwing
+ * @param executionContextOverride Custom execution context
+ */
online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (1)

60-63: Consider local or lazy init instead of vars.

online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (2)

38-113: Consider using a monotonic clock for duration measurement.

- val startTimeMs = System.currentTimeMillis()
+ val startTimeMs = System.nanoTime()

115-147: Using random sampling for logging could be replaced with debug logs.

online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (4)

127-131: Put request.
No explicit error handling if put fails. Consider adding .recoverWith.


263-265: Potential .get throw.
Consider safer handling over .get to avoid unexpected errors.


343-343: multiPut usage.
No .recover if partial writes fail. Possibly handle failures.


352-352: Create dataset.
Async, no guarantee on success. Consider wait or verification.

online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (3)

125-127: New fetchGroupBys
Consider adding dedicated tests.


145-146: Assertion context
Include request ID for clarity.

- s"Logic error. Responses are not aligned..."
+ s"Logic error (req=$requestId). Responses are not aligned..."

326-326: joinConfTry
Log missing conf for debug.

online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (1)

71-169: mergeWithStreaming
Add tests if possible.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 45bae2f and 24c7c8a.

📒 Files selected for processing (54)
  • api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/Api.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/AvroConversions.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/CompatParColls.scala (0 hunks)
  • online/src/main/scala/ai/chronon/online/Extensions.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (5 hunks)
  • online/src/main/scala/ai/chronon/online/JoinCodec.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TileCodec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (12 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala (0 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (3 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (11 hunks)
  • online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/stats/DriftStore.scala (5 hunks)
  • online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (11 hunks)
  • online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala (5 hunks)
  • online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (4 hunks)
  • spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (4 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (5 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala (2 hunks)
💤 Files with no reviewable changes (2)
  • online/src/main/scala/ai/chronon/online/CompatParColls.scala
  • online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala
✅ Files skipped from review due to trivial changes (15)
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
  • online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala
  • online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala
  • online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala
  • online/src/main/scala/ai/chronon/online/TileCodec.scala
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
  • spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
  • spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
  • spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala
  • spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
  • flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala
  • online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
🔇 Additional comments (137)
online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala (2)

17-25: Clean package organization and import consolidation.


105-117: Good refactoring of decode methods.

Clean extraction of common functionality into decodeArray with functional composition in decodeMap.

spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala (1)

108-108: LGTM! Good architectural separation.

Moving join configuration to a dedicated metadata store improves modularity.

spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1)

78-78: LGTM! Consistent implementation.

Change aligns with the metadata store pattern used in ObservabilityDemoDataLoader.

online/src/main/scala/ai/chronon/online/Extensions.scala (1)

51-51: LGTM! Better type specificity.

Using serde.AvroCodec provides clearer serialization implementation intent.

spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala (1)

3-3: LGTM! Better serialization organization.

Consolidating serialization utilities in SerdeUtils improves code maintainability.

Also applies to: 40-40

spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala (2)

98-98: LGTM! Improved metadata handling.

The change to use metadataStore aligns with the refactoring goals.


103-106: LGTM! Better integer instantiation.

Using Integer.valueOf() instead of new Integer() improves performance through value caching.

spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala (1)

92-92: LGTM! Improved metadata handling.

The change to use metadataStore aligns with the refactoring goals.

online/src/main/scala/ai/chronon/online/AvroConversions.scala (1)

186-186: LGTM! Better package organization.

The change to use serde.AvroCodec improves code organization.

Also applies to: 196-196

spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala (1)

83-83: LGTM! Improved metadata handling.

The change to use metadataStore aligns with the refactoring goals.

api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1)

6-26: LGTM! Thread-safe serialization implementation.

The implementation correctly uses ThreadLocal for thread safety and lazy initialization for performance optimization.

online/src/main/scala/ai/chronon/online/Api.scala (2)

81-94: LGTM! Improved error handling in getString.

The error handling now provides better context for debugging.


96-109: LGTM! Enhanced error handling in getStringArray.

The error handling is now consistent with getString method.

spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (1)

117-117: LGTM! Simplified MetadataStore access.

Using fetcher.metadataStore aligns with the refactoring goals.

spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (2)

87-87: LGTM! Consistent MetadataStore access.

Using fetcher.metadataStore maintains consistency with other changes.


172-175: LGTM! Improved method chaining.

The chained method calls improve readability while maintaining the same functionality.

spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (3)

24-24: LGTM!

Import change aligns with the refactoring of fetcher logic.


90-90: LGTM!

Return type change maintains consistency with the fetcher package.


99-99: LGTM!

Return statements correctly updated to use fetcher.DataMetrics.

Also applies to: 138-138

spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (2)

191-191: LGTM!

Centralizes metadata handling through metadataStore.


217-217: LGTM!

Consistently uses metadataStore for serving info retrieval.

spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala (1)

424-424: LGTM!

Consistently uses fetcher.metadataStore, aligning with the refactoring.

spark/src/main/scala/ai/chronon/spark/Driver.scala (3)

31-31: LGTM!

Import statement is now more specific, aligning with the refactoring objective.


464-472: LGTM!

Multi-line formatting improves readability.


613-615: LGTM!

FetchContext encapsulates KV store and metadata dataset, improving code organization.

online/src/main/scala/ai/chronon/online/JoinCodec.scala (3)

30-30: LGTM!

Import statement is appropriately specific.


37-38: LGTM!

Parameter types consistently use serde.AvroCodec.


92-92: LGTM!

Method signature consistently uses serde.AvroCodec.

spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (3)

26-27: LGTM!

Import statements are well-organized and specific.


241-241: LGTM!

MetadataStore instantiation properly uses FetchContext.


266-266: LGTM!

Using Long.valueOf is more efficient as it may reuse cached values.

spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala (2)

20-20: LGTM!

Import statement is appropriately specific.


117-117: LGTM!

Return type correctly uses fetcher.DataMetrics.

spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala (2)

23-23: Looks good.


109-109: Neat usage of fetch context.

api/src/main/scala/ai/chronon/api/TilingUtils.scala (3)

4-4: Import is consistent.


13-13: Ensure no null key.


18-18: Check bytes not null.

online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (25)

29-29: Import is fine.


71-73: Test setup is logical.


88-88: Mock is correct.


90-92: Tests look fine.


97-97: Good verification.


118-118: Same mocking approach.


120-120: Batch fetch logic is consistent.


127-127: Similar verification to line 97.


128-128: Capturing requests is fine.


147-147: Repeated mocking approach.


149-149: Same fetch logic.


158-158: Same verification step.


159-159: Looks fine.


170-170: Neat stubbing.


174-178: Good coverage of serving info.


185-185: Mocking is valid.


186-186: Setup is straightforward.


197-202: Checks caching scenario properly.


222-222: Mock is fine.


223-223: Includes flagStore, looks good.


224-226: Valid usage of fetcher with new store.


232-234: Cache flags tested well.


238-239: Mock usage is fine.


253-253: Constructor usage is consistent.


269-269: Same approach.

online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala (3)

12-12: LGTM! Type updates align with refactoring goals.

The change from GroupByRequestMeta to LambdaKvRequest is consistent across the codebase.

Also applies to: 124-125


266-266: LGTM! Codec update aligns with serialization changes.

Updated to use serde.AvroCodec consistently.


159-160: LGTM! Test case updates maintain type consistency.

Test cases properly updated to use LambdaKvRequest.

spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java (2)

22-22: LGTM! Import update aligns with refactoring.

Updated to use FetchTypes consistently.


42-42: LGTM! Class and type updates maintain consistency.

Class name and response type properly updated to use FetchTypes.

Also applies to: 73-73

online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (2)

6-7: LGTM! Import cleanup and method signature updates.

Improved code organization with better type definitions and helper methods.

Also applies to: 113-117


123-145: LGTM! Request handling logic maintains functionality.

Updated to use LambdaKvRequest while preserving core caching behavior.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

24-24: LGTM! Type annotations improve clarity.

Added explicit Int types for constants.

Also applies to: 246-246, 257-257, 261-261


309-309: LGTM! MetadataStore initialization updated.

Using FetchContext consistently for initialization.

spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (3)

24-26: Imports look aligned with new structure.


266-268: Clear calls might need synchronization if invoked concurrently.

Ensure no races occur when multiple threads clear caches simultaneously.


328-329: Instantiating FetchContext and MetadataStore is consistent with refactoring.

spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (4)

28-29: New imports match refactored classes.


80-80: Context-based instantiation is a cleaner approach.


101-102: Good: consistent pattern for directoryMetadataStore.


743-743: MetadataStore usage is now centralized via FetchContext.

online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (2)

1-15: License header is standard.


17-37: Class and imports well-structured.

online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (12)

17-17: No issues here.


19-19: Star import.
No pipeline flags; all good.


27-28: Imports added.
Looks consistent with the rest of the code.


32-33: New collection imports.
All fine.


35-35: Utility imports.
Nothing problematic.


56-56: Refactored constructor.
Single-parameter approach is neater.


73-73: Implicit ExecutionContext.
Looks correct for Future usage.


87-88: KV fetch logic.
Exception handling is present via .recoverWith.


99-99: Method scope changed to private.
Ensure no external classes rely on it.


102-103: Fetching string array.
Implementation is straightforward.


258-258: Switch to serde.AvroCodec.
Consistent with the new approach.


274-275: TTLCache usage.
Seems valid for caching codec objects.

online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (12)

1-2: Package line & blank line.
New file starts cleanly.


3-16: Imports block.
Covers aggregator, concurrency, and logging.


18-20: Doc block for class.
Helpful context for the fetcher.


22-23: Class declaration.
Straightforward usage of FetchContext and MetadataStore.


25-25: Implicit execution context.
Works for async tasks.


27-27: Logger instance.
No concerns.


29-40: isCachingEnabled method.
Simple boolean logic, no issues.


42-113: toLambdaKvRequest method.
Encodes keys, handles fallback casting. Nicely structured.


114-152: attemptDerivations method.
Gracefully handles derivation failures.


154-172: fetchGroupBys method.
MultiGet logic, caching checks, robust usage of Futures.

Also applies to: 173-185, 188-272


274-335: fetchColumns method.
Batch queries for groupBy data. Good flow.


336-342: LambdaKvRequest case class.
Clear and concise.

online/src/main/scala/ai/chronon/online/stats/DriftStore.scala (9)

4-4: Extensions import.
No concerns here.


7-7: Observability import.
Needed for logging/metrics.


10-10: Fetcher imports.
Pulls in FetchContext & MetadataStore.


13-14: Concurrency & Try usage.
Looks valid.


18-18: metadataDataset param.
Default set to Constants.MetadataDataset.


20-22: FetchContext & MetadataStore initialization.
Centralizes config neatly.


188-188: getDriftSeries signature.
Uses stored join configs.


194-194: Retrieving JoinConf.
map usage ensures non-blocking flow.


210-210: getSummarySeries with joinConf.
Access pattern is consistent with the rest.

online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (11)

26-27: Imports look fine
No issues.


33-33: No concerns


36-36: Tailrec import
Seems fine.


88-90: Check concurrency usage
Ensure overriding doesn't introduce thread-safety risks.


92-97: fetchContext usage
Looks good.


112-112: joinCodecCache
No issues.


129-131: fetchJoin override
Check if joinConf usage aligns with metadata.


210-211: Using serde.AvroCodec
Matches overall refactor.


234-239: retry logic
Double-check edge case when tries=0.


316-317: fetchExternal signature
No concerns.


348-352: onlineExternalParts
Implementation looks fine.

online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (6)

24-24: ListOps import
No issues.


28-29: serde.AvroCodec
Migration looks correct.


49-49: toScala call
No problems.


83-84: AvroCodec references
Change is consistent.

Also applies to: 86-87


96-101: Conditional aggregator logic
Seems fine.


127-127: mutationValueAvroCodec
No concerns.

online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (5)

1-16: Class setup
Implementation is straightforward.


18-26: Logger & RequestContext
Looks okay.


28-69: decodeAndMerge
Ensure empty streaming is handled correctly.


171-211: mergeTiledIrsFromStreaming
Logic seems sound.


213-285: reportKvResponse, getServingInfo, toBatchIr
All looks fine.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
.bazelrc (1)

9-11: Update comment to reflect new default Scala version.

Comment incorrectly states default is 2.12.

-# Default scala version to 2.12
+# Default scala version to 2.13
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (5)

21-28: Consider typed keys.
Replacing Map[String, Any] with typed structures could reduce runtime errors.


72-98: Avoid returning null.
Use an Option return to align with Scala idioms.

Example fix:

-      return null
+      return None

159-167: Reuse Gson.
Creating a new Gson() per debug call might impact performance. Cache it if frequently used.


214-225: Track zero responses.
If response is empty, consider logging zero metrics for better observability.


266-285: Return Option from toBatchIr.
Returning null can be error-prone in Scala.

Example fix:

-  if (bytes == null) return null
+  if (bytes == null) return None
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between d324397 and 56118f0.

📒 Files selected for processing (2)
  • .bazelrc (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: bazel_config_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (4)

17-19: Review class instantiation carefully.
Ensure no concurrency hazards when reusing fetchContext or metadataStore across requests.


48-55: Guard empty streaming data.
streamingResponsesOpt could be Some(Seq.empty), skipping merge logic. Double-check edge cases.


133-147: Validate failed decodes.
Dropping rows on decode failure may hide data issues. Confirm acceptable.


227-264: Handle forced refresh carefully.
Frequent refresh might be costly. Confirm that forced fetch is needed in all scenarios.

.bazelrc Outdated
Comment on lines 12 to 13
common --repo_env=SCALA_VERSION=2.13.12
common --define=SCALA_VERSION=2.13.12
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify dependency compatibility with Scala 2.13.

Ensure all dependencies support Scala 2.13.12.


🏁 Script executed:

#!/bin/bash
# Check build files for Scala version dependencies
rg -g '*.scala' -g 'BUILD' -g '*.sbt' '2\.12' 

Length of output: 126


Action: Align Scala Versions

  • .bazelrc (lines 12-13) sets Scala to 2.13.12.
  • Found quickstart/mongo-online-impl/build.sbt using Scala 2.12.12.
  • Update dependencies for consistency.

Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eyeballed changes - seem to look ok if tests are passing.
One worry is rebasing / cherry-picking / upstreaming stuff to OSS will get super hard.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (12)
api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1)

3-4: Consider importing specific Thrift protocols.

-import ai.chronon.api.thrift.protocol.{TBinaryProtocol, TCompactProtocol}
+import ai.chronon.api.thrift.protocol.TCompactProtocol
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (2)

17-36: Imports are broad.
Consider explicit imports instead of wildcards for clarity.


288-297: Robust error logging.
Check repeated log calls for spam potential.

online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (3)

119-130: Batch columns fetch.
Logic is correct. More tests might be helpful.


185-205: TTLCache usage.
Looks consistent. Might confirm test covers multiple calls.


236-237: FetchContext mocking.
Seems fine. Possibly mock only what's needed.

online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (2)

159-272: Core fetch flow.
Aggregates batch + streaming. Simplify if possible. Consider concurrency checks.


287-334: Column fetch.
Mapping approach is clean. Possibly unify with fetchGroupBys for dryness.

online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (2)

30-36: Consider constructor injection for easier testing.

Instantiating GroupByFetcher inside the class limits test flexibility.


135-166: Prefer logging over println.

- if (fetchContext.debug || Math.random() < 0.001) {
-   println(s"Failed to fetch $groupByRequest with \n${ex.traceString}")
- }
+ if (fetchContext.debug || Math.random() < 0.001) {
+   logger.warn(s"Failed to fetch $groupByRequest", ex)
+ }
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2)

29-70: Consider clarifying return structure.

Returning a map without quick reference might be confusing for new developers.


121-170: Gracefully handle partial decoding failure.

Discarding null rows is fine but careful about silent data loss.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a355eb5 and 9cfa908.

📒 Files selected for processing (55)
  • api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/Api.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/AvroConversions.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/CompatParColls.scala (0 hunks)
  • online/src/main/scala/ai/chronon/online/Extensions.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (5 hunks)
  • online/src/main/scala/ai/chronon/online/JoinCodec.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TTLCache.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/TileCodec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (12 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala (0 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (3 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (11 hunks)
  • online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/stats/DriftStore.scala (6 hunks)
  • online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (10 hunks)
  • online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala (5 hunks)
  • online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (4 hunks)
  • spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (4 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (5 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala (2 hunks)
💤 Files with no reviewable changes (2)
  • online/src/main/scala/ai/chronon/online/fetcher/FetcherBase.scala
  • online/src/main/scala/ai/chronon/online/CompatParColls.scala
✅ Files skipped from review due to trivial changes (1)
  • online/src/main/scala/ai/chronon/online/TTLCache.scala
🚧 Files skipped from review as they are similar to previous changes (41)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemoDataLoader.scala
  • online/src/main/scala/ai/chronon/online/Extensions.scala
  • online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala
  • online/src/test/scala/ai/chronon/online/test/TileCodecTest.scala
  • flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala
  • online/src/main/scala/ai/chronon/online/Api.scala
  • spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
  • spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
  • online/src/main/scala/ai/chronon/online/TileCodec.scala
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
  • spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
  • online/src/main/scala/ai/chronon/online/AvroConversions.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
  • online/src/test/scala/ai/chronon/online/test/LRUCacheTest.scala
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala
  • spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
  • online/src/main/scala/ai/chronon/online/JoinCodec.scala
  • spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/JavaFetchTypesTest.java
  • spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala
  • spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
  • online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
  • online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala
  • online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala
  • spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala
  • online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala
  • online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
  • online/src/main/scala/ai/chronon/online/stats/DriftStore.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: streaming_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (45)
api/src/main/scala/ai/chronon/api/SerdeUtils.scala (1)

6-16:

✅ Verification successful

Thread-local implementation looks good!

Clean implementation using thread-local storage and lazy initialization. The @transient annotation correctly prevents serialization of these fields.

However, verify thread safety:


🏁 Script executed:

#!/bin/bash
# Check for any concurrent usage of SerdeUtils that might need synchronization
rg -A 5 "SerdeUtils\.(compact(De)?[sS]erializer)" 

Length of output: 3757


Thread-local usage confirmed as safe.

  • Lazy initialization with ThreadLocal and @transient is correctly implemented.
  • Grep results show consistent, isolated usage via .get() across the codebase.

No further changes needed.

spark/src/test/scala/ai/chronon/spark/test/streaming/StreamingTest.scala (2)

23-23: LGTM!

Import statement updated to include FetchContext.


109-109: LGTM!

MetadataStore initialization refactored to use FetchContext, simplifying the parameter passing.

online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (3)

113-114: LGTM!

Method signature updated to use LambdaKvRequest, aligning with the refactoring.


116-116: LGTM!

Good practice: Local variable defined to avoid repeated Map.empty calls.


123-124: LGTM!

Case pattern updated to handle LambdaKvRequest type.

Also applies to: 125-125

online/src/test/scala/ai/chronon/online/test/FetcherCacheTest.scala (3)

124-125: LGTM!

Test case updated to use LambdaKvRequest.


159-160: LGTM!

Test case updated to use LambdaKvRequest.


266-266: LGTM!

Mock updated to use serde.AvroCodec.

Also applies to: 311-311

spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (1)

618-618: LGTM!

MetadataStore initialization refactored to use FetchContext.

Also applies to: 743-743

spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (3)

24-26: Imports look fine.
No obvious duplication or overshadowing issues.


265-269: Ensure thread safety when clearing caches.
Check for concurrent calls that might lead to race conditions.


328-329: Good context-based initialization.
Ensure all callers set up fetchContext properly.

online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (8)

56-57: Constructor refactor looks solid.
Better to have one context object than multiple params.


73-73: Validate concurrency.
Reusing fetchContext's ExecutionContext is fine. Confirm no deadlock risk.


87-88: Single point for KVStore & timeouts.
Check negative or large timeout handling.


99-111: Method now private.
Ensure no external usage is broken. Possibly update docs.


170-174: Consolidated dataset usage.
Ensure correct dataset name from fetchContext.metadataDataset.


264-273: Serde usage is consistent.
No issues spotted.


274-279: Lazy TTLCache.
Consider concurrency if multiple threads initialize it at once.


353-353: Dataset creation triggered.
Confirm no race with other initializations.

online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala (10)

29-29: New imports.
All good. Keep them minimal.


60-61: Separate fetchers.
Makes logic clearer. Great separation.


63-65: New context fields.
Check that required fields are all set.


72-76: Test setup revised.
Spies on specialized fetchers. Good for coverage.


89-99: Mock fetchGroupBys.
No issues. Straight to the point.


147-164: Missing response check.
Throws correct exception. Good approach.


168-180: Serving info refresh.
Ensure concurrency is safe if multiple fetchers refresh simultaneously.


206-233: Flag-based cache toggle.
Neat. Validate flags for potential misconfig.


251-251: Validation for null keys.
Logic is minimal but correct.


267-281: Key missing scenario.
Throws expected exception key. Good coverage.

online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (4)

1-41: Package & caching logic.
Short and clear. No major issues.


42-113: Key encoding approach.
Casts keys if parse fails. Very defensive. Good job.


114-153: Derivative handling.
Catches exceptions and reverts to rename-only. Smart fallback.


338-343: Case class for requests.
Sufficiently clear. No concerns.

online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (4)

37-40: Pass-through method looks fine.


41-46: Fetch columns delegation is straightforward.


47-50: Method usage looks correct.


51-53: Cache size check approach is clear.

online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (6)

17-27: Case class usage is concise.


72-119: Verify error handling on potential Future failures.

Ensure no uncaught exceptions occur during merging.


172-212: Tiled merges look correct.


214-226: Null guard is acceptable here.


227-265: Refresh logic is solid for batch updates.


266-286: Check for potential null return at line 267.

Caller logic must handle a null FinalBatchIr.

Comment on lines +58 to +133
def fetchJoins(requests: Seq[Request], joinConf: Option[Join] = None): Future[Seq[Response]] = {
val startTimeMs = System.currentTimeMillis()
// convert join requests to groupBy requests
val joinDecomposed: Seq[(Request, Try[Seq[Either[PrefixedRequest, KeyMissingException]]])] =
requests.map { request =>
// use passed-in join or fetch one
val joinTry: Try[JoinOps] = joinConf
.map(conf => Success(JoinOps(conf)))
.getOrElse(metadataStore.getJoinConf(request.name))

var joinContext: Option[Metrics.Context] = None

val decomposedTry = joinTry.map { join =>
joinContext = Some(Metrics.Context(Metrics.Environment.JoinFetching, join.join))
joinContext.get.increment("join_request.count")

join.joinPartOps.map { part =>
val joinContextInner = Metrics.Context(joinContext.get, part)
val missingKeys = part.leftToRight.keys.filterNot(request.keys.contains)

if (missingKeys.nonEmpty) {
Right(KeyMissingException(part.fullPrefix, missingKeys.toSeq, request.keys))
} else {
val rightKeys = part.leftToRight.map { case (leftKey, rightKey) => rightKey -> request.keys(leftKey) }
Left(
PrefixedRequest(
part.fullPrefix,
Request(part.groupBy.getMetaData.getName, rightKeys, request.atMillis, Some(joinContextInner))))
}

}
}
request.copy(context = joinContext) -> decomposedTry
}

val groupByRequests = joinDecomposed.flatMap { case (_, gbTry) =>
gbTry match {
case Failure(_) => Iterator.empty
case Success(requests) => requests.iterator.flatMap(_.left.toOption).map(_.request)
}
}

val groupByResponsesFuture = groupByFetcher.fetchGroupBys(groupByRequests)

// re-attach groupBy responses to join
groupByResponsesFuture
.map { groupByResponses =>
val responseMap = groupByResponses.iterator.map { response => response.request -> response.values }.toMap
val responses = joinDecomposed.iterator.map { case (joinRequest, decomposedRequestsTry) =>
val joinValuesTry = decomposedRequestsTry.map { groupByRequestsWithPrefix =>
groupByRequestsWithPrefix.iterator.flatMap {

case Right(keyMissingException) =>
Map(keyMissingException.requestName + "_exception" -> keyMissingException.getMessage)

case Left(PrefixedRequest(prefix, groupByRequest)) =>
parseGroupByResponse(prefix, groupByRequest, responseMap)
}.toMap

}
joinValuesTry match {
case Failure(ex) => joinRequest.context.foreach(_.incrementException(ex))
case Success(responseMap) =>
joinRequest.context.foreach { ctx =>
ctx.distribution("response.keys.count", responseMap.size)
}
}
joinRequest.context.foreach { ctx =>
ctx.distribution("internal.latency.millis", System.currentTimeMillis() - startTimeMs)
ctx.increment("internal.request.count")
}
Response(joinRequest, joinValuesTry)
}.toSeq
responses
}
}
Copy link

@coderabbitai coderabbitai bot Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

This method is lengthy.

Splitting logic (join decomposition, groupBy fetching, response mapping) would boost clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhil-zlai Understood! I'll move on from the refactoring suggestion. Let me know if you need any other assistance!

@nikhil-zlai nikhil-zlai merged commit f473147 into main Feb 25, 2025
20 checks passed
@nikhil-zlai nikhil-zlai deleted the fetcher_refactor_2 branch February 25, 2025 03:18
@coderabbitai coderabbitai bot mentioned this pull request Feb 25, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Feb 26, 2025
## Summary
fetcher has grown over time into a large file with many large functions
that are hard to work with. This refactoring doesn't change any
functionality - just placement.

Made some of the scala code more idiomatic - if(try.isFailed) - vs
try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params


## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Introduced a unified configuration context that enhances data
fetching, including improved group-by and join operations with more
robust error handling.
- Added a new `FetchContext` class to manage fetching operations and
execution contexts.
- Implemented a new `GroupByFetcher` class for efficient group-by data
retrieval.
- **Refactor**
- Upgraded serialization and deserialization to use a more efficient,
compact protocol.
- Standardized API definitions and type declarations across modules to
improve clarity and maintainability.
- Enhanced error handling in various methods to provide more informative
messages.
- **Chores**
	- Removed outdated utilities and reorganized dependency imports.
	- Updated test suites to align with the refactored architecture.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@coderabbitai coderabbitai bot mentioned this pull request Apr 8, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary
fetcher has grown over time into a large file with many large functions
that are hard to work with. This refactoring doesn't change any
functionality - just placement.

Made some of the scala code more idiomatic - if(try.isFailed) - vs
try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params


## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Introduced a unified configuration context that enhances data
fetching, including improved group-by and join operations with more
robust error handling.
- Added a new `FetchContext` class to manage fetching operations and
execution contexts.
- Implemented a new `GroupByFetcher` class for efficient group-by data
retrieval.
- **Refactor**
- Upgraded serialization and deserialization to use a more efficient,
compact protocol.
- Standardized API definitions and type declarations across modules to
improve clarity and maintainability.
- Enhanced error handling in various methods to provide more informative
messages.
- **Chores**
	- Removed outdated utilities and reorganized dependency imports.
	- Updated test suites to align with the refactored architecture.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary
fetcher has grown over time into a large file with many large functions
that are hard to work with. This refactoring doesn't change any
functionality - just placement.

Made some of the scala code more idiomatic - if(try.isFailed) - vs
try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params


## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Introduced a unified configuration context that enhances data
fetching, including improved group-by and join operations with more
robust error handling.
- Added a new `FetchContext` class to manage fetching operations and
execution contexts.
- Implemented a new `GroupByFetcher` class for efficient group-by data
retrieval.
- **Refactor**
- Upgraded serialization and deserialization to use a more efficient,
compact protocol.
- Standardized API definitions and type declarations across modules to
improve clarity and maintainability.
- Enhanced error handling in various methods to provide more informative
messages.
- **Chores**
	- Removed outdated utilities and reorganized dependency imports.
	- Updated test suites to align with the refactored architecture.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
fetcher has grown over time into a large file with many large functions
that are hard to work with. This refactoring doesn't change any
functionality - just placement.

Made some of the scala code more idiomatic - if(try.isFailed) - vs
try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params


## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Introduced a unified configuration context that enhances data
fetching, including improved group-by and join operations with more
robust error handling.
- Added a new `FetchContext` class to manage fetching operations and
execution contexts.
- Implemented a new `GroupByFetcher` class for efficient group-by data
retrieval.
- **Refactor**
- Upgraded serialization and deserialization to use a more efficient,
compact protocol.
- Standardized API definitions and type declarations across modules to
improve clarity and maintainability.
- Enhanced error handling in various methods to provide more informative
messages.
- **Chores**
	- Removed outdated utilities and reorganized dependency imports.
	- Updated test suites to align with the refactored architecture.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
fetcher has grown over time into a large file with many large functions
that are hard to work with. This refactoring doesn't change any
functionality - just placement.

Made some of the scala code more idiomatic - if(try.isFailed) - vs
try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params


## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Introduced a unified configuration context that enhances data
fetching, including improved group-by and join operations with more
robust error handling.
- Added a new `FetchContext` class to manage fetching operations and
execution contexts.
- Implemented a new `GroupByFetcher` class for efficient group-by data
retrieval.
- **Refactor**
- Upgraded serialization and deserialization to use a more efficient,
compact protocol.
- Standardized API definitions and type declarations across modules to
improve clarity and maintainability.
- Enhanced error handling in various methods to provide more informative
messages.
- **Chores**
	- Removed outdated utilities and reorganized dependency imports.
	- Updated test suites to align with the refactored architecture.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
fetcher has grown over time into a large file with many large functions
that are hard to work with. This refactoring doesn't change any
functionality - just placement.

Made some of the scala code more idiomatic - if(try.isFailed) - vs
try.recoverWith
Made Metadata methods more explicit
FetcherBase -> JoinPartFetcher + GroupByFetcher + GroupByResponseHandler
Added fetch context - to replace 10 constructor params


## Cheour clientslist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Introduced a unified configuration context that enhances data
fetching, including improved group-by and join operations with more
robust error handling.
- Added a new `FetchContext` class to manage fetching operations and
execution contexts.
- Implemented a new `GroupByFetcher` class for efficient group-by data
retrieval.
- **Refactor**
- Upgraded serialization and deserialization to use a more efficient,
compact protocol.
- Standardized API definitions and type declarations across modules to
improve clarity and maintainability.
- Enhanced error handling in various methods to provide more informative
messages.
- **Chores**
	- Removed outdated utilities and reorganized dependency imports.
	- Updated test suites to align with the refactored architecture.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants