-
Notifications
You must be signed in to change notification settings - Fork 0
chore: moving join codec from fetcher into metadata store #401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis change introduces a caching mechanism for join codecs across multiple components. In the Fetcher, a lazy cache is used to streamline join operations with improved error handling and readability. The MetadataStore adds helper methods for join codec creation and caching. TTLCache is updated to support an optional callback during entry creation. Additionally, tests in SchemaEvolutionTest are adjusted to utilize the new cache interface. Changes
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
online/src/main/scala/ai/chronon/online/Fetcher.scala (2)
122-124
: Consider adding TTL duration parameter.The cache initialization could benefit from configurable TTL duration.
- lazy val joinCodecCache: TTLCache[String, Try[JoinCodec]] = buildJoinCodecCache( - Some(logControlEvent) - ) + lazy val joinCodecCache: TTLCache[String, Try[JoinCodec]] = buildJoinCodecCache( + Some(logControlEvent), + ttlDurationMs = config.getJoinCodecCacheTtl + )
430-448
: Consider adding metrics for control events.Track success/failure rates of control event logging.
private def logControlEvent(encTry: Try[JoinCodec]): Unit = { - if (encTry.isFailure) return + val ctx = Metrics.Context(Environment.Fetcher) + if (encTry.isFailure) { + ctx.incrementException(encTry.failed.get) + return + }online/src/main/scala/ai/chronon/online/MetadataStore.scala (3)
166-187
: Add documentation for onCreateFunc parameter.The callback parameter's purpose and usage should be documented.
def buildJoinCodecCache( + /** Callback invoked when a new join codec is created. + * @param onCreateFunc Optional callback that receives the Try[JoinCodec] result + */ onCreateFunc: Option[Try[JoinCodec] => Unit]): TTLCache[String, Try[JoinCodec]] = {
189-254
: Consider a more functional approach.
- Replace mutable collections with immutable ones.
- Use Option instead of null check for aggregations.
- val keyFields = new mutable.LinkedHashSet[StructField] - val valueFields = new mutable.ListBuffer[StructField] + val keyFields = joinConf.joinPartOps.flatMap { joinPart => + // ... existing logic returning Seq[StructField] + }.toSet + val valueFields = joinConf.joinPartOps.flatMap { joinPart => + // ... existing logic returning Seq[StructField] + } - if (servingInfo.groupBy.aggregations == null) { + Option(servingInfo.groupBy.aggregations).fold(servingInfo.selectedChrononSchema) { _ => + servingInfo.outputChrononSchema + }
173-179
: Use structured logging for errors.Consider using structured logging to make error tracking easier.
- logger.error( - s"Failed to fetch joinName = ${joinName} or build join codec due to ${th.traceString}") + logger.error("Failed to build join codec", + Map("joinName" -> joinName, "error" -> th.getMessage, "trace" -> th.traceString))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
online/src/main/scala/ai/chronon/online/Fetcher.scala
(16 hunks)online/src/main/scala/ai/chronon/online/MetadataStore.scala
(4 hunks)online/src/main/scala/ai/chronon/online/TTLCache.scala
(2 hunks)spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
🔇 Additional comments (7)
online/src/main/scala/ai/chronon/online/TTLCache.scala (3)
37-38
: LGTM: Clean parameter additionOptional callback parameter with sensible default maintains backward compatibility.
40-44
: LGTM: Well-structured callback wrapperClean implementation that safely handles the optional callback.
55-55
: LGTM: Consistent usage of wrappedCreatorCorrectly applies the callback at both entry creation points.
Also applies to: 82-82
online/src/main/scala/ai/chronon/online/Fetcher.scala (2)
87-88
: LGTM! Clean case class formatting.The case class formatting improves readability.
161-165
: Improved error handling with fallback strategy.The code now gracefully handles derivation failures by attempting rename-only operations.
Also applies to: 177-200
spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (1)
264-264
: LGTM!The change aligns with moving join codec from fetcher to metadata store.
online/src/main/scala/ai/chronon/online/MetadataStore.scala (1)
20-30
: Clean import organization.Imports are well-organized and support the new join codec functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
online/src/main/scala/ai/chronon/online/Fetcher.scala (2)
160-164
: Improve error handling with specific exception types.The current implementation uses generic exception handling. Consider creating specific exception types for join part and external part failures.
- val internalMap = internalResponse.values.getOrElse( - Map("join_part_fetch_exception" -> internalResponse.values.failed.get.traceString)) - val externalMap = externalResponse.values.getOrElse(Map( - "external_part_fetch_exception" -> externalResponse.values.failed.get.traceString)) + val internalMap = internalResponse.values.getOrElse { + val exception = internalResponse.values.failed.get + Map("join_part_fetch_exception" -> JoinPartFetchException(exception).traceString) + } + val externalMap = externalResponse.values.getOrElse { + val exception = externalResponse.values.failed.get + Map("external_part_fetch_exception" -> ExternalPartFetchException(exception).traceString) + }
316-317
: Consider adding retry mechanism.The cache refresh on failure is good, but consider adding a retry mechanism with exponential backoff.
- joinCodecCache.refresh(resp.request.name) + RetryUtils.withExponentialBackoff { + joinCodecCache.refresh(resp.request.name) + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
online/src/main/scala/ai/chronon/online/Fetcher.scala
(16 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
online/src/main/scala/ai/chronon/online/Fetcher.scala (3)
122-124
: LGTM: Efficient caching implementation.The lazy initialization with callback for control event logging is a good pattern.
281-283
: LGTM: Improved readability.The formatting changes make the code more readable.
428-432
: LGTM: Improved error handling in logControlEvent.The signature change to accept
Try[JoinCodec]
and early return on failure is a good improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
online/src/main/scala/ai/chronon/online/Fetcher.scala (2)
121-123
: Consider adding TTL configuration.The cache initialization lacks TTL configuration, which could lead to unbounded memory growth.
lazy val joinCodecCache: TTLCache[String, Try[JoinCodec]] = buildJoinCodecCache( - Some(logControlEvent) + Some(logControlEvent), + ttlSeconds = 3600 // Configure based on your needs )
180-188
: Improve error handling readability.The nested error handling is complex. Consider extracting to a separate method.
+ private def handleRenameOnlyDeriveError( + ctx: Metrics.Context, + joinCodec: JoinCodec, + request: Request, + baseMap: Map[String, AnyRef] + ): Map[String, AnyRef] = { + Try { + joinCodec + .renameOnlyDeriveFunc(request.keys, baseMap) + .mapValues(_.asInstanceOf[AnyRef]) + .toMap + } match { + case Success(renameOnlyDerivedMap) => renameOnlyDerivedMap + case Failure(exception) => + ctx.incrementException(exception) + Map("derivation_rename_exception" -> exception.traceString.asInstanceOf[AnyRef]) + } + } + val renameOnlyDerivedMap: Map[String, AnyRef] = - renameOnlyDerivedMapTry match { - case Success(renameOnlyDerivedMap) => - renameOnlyDerivedMap - case Failure(exception) => - ctx.incrementException(exception) - Map("derivation_rename_exception" -> exception.traceString - .asInstanceOf[AnyRef]) - } + handleRenameOnlyDeriveError(ctx, joinCodec, internalResponse.request, baseMap)Also applies to: 190-191
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
online/src/main/scala/ai/chronon/online/Fetcher.scala
(14 hunks)online/src/main/scala/ai/chronon/online/MetadataStore.scala
(6 hunks)spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
- online/src/main/scala/ai/chronon/online/MetadataStore.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: join_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
online/src/main/scala/ai/chronon/online/Fetcher.scala (3)
109-110
: LGTM: Improved string formatting.The string interpolation is now more readable.
415-417
: LGTM: Improved error handling.Early return on failure is a good practice.
162-162
:❓ Verification inconclusive
Verify cache consistency.
The cache is used in multiple places and refreshed on failures. Ensure consistent error handling across all cache access points.
Also applies to: 252-252, 308-309
🏁 Script executed:
#!/bin/bash # Find all cache access points rg -A 2 'joinCodecCache'Length of output: 1467
Action: Verify Cache Handling Consistency
- Confirm that error handling for
joinCodecCache
is uniform across all access points (e.g., at lines 162, 252, & 308).- Ensure that retrieval, refresh, and clear operations follow a consistent strategy in both main code and tests.
## Summary building join output schema should belong to metadata store - and also reduces the size of fetcher. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an optimized caching mechanism for data join operations, resulting in improved performance and reliability. - Added new methods to facilitate the creation and management of join codecs. - **Bug Fixes** - Enhanced error handling for join codec operations, ensuring clearer context for failures. - **Documentation** - Improved code readability and clarity through updated comments and method signatures. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary building join output schema should belong to metadata store - and also reduces the size of fetcher. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an optimized caching mechanism for data join operations, resulting in improved performance and reliability. - Added new methods to facilitate the creation and management of join codecs. - **Bug Fixes** - Enhanced error handling for join codec operations, ensuring clearer context for failures. - **Documentation** - Improved code readability and clarity through updated comments and method signatures. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary building join output schema should belong to metadata store - and also reduces the size of fetcher. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an optimized caching mechanism for data join operations, resulting in improved performance and reliability. - Added new methods to facilitate the creation and management of join codecs. - **Bug Fixes** - Enhanced error handling for join codec operations, ensuring clearer context for failures. - **Documentation** - Improved code readability and clarity through updated comments and method signatures. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary building join output schema should belong to metadata store - and also reduces the size of fetcher. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an optimized caching mechanism for data join operations, resulting in improved performance and reliability. - Added new methods to facilitate the creation and management of join codecs. - **Bug Fixes** - Enhanced error handling for join codec operations, ensuring clearer context for failures. - **Documentation** - Improved code readability and clarity through updated comments and method signatures. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary building join output schema should belong to metadata store - and also reduces the size of fetcher. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an optimized caching mechanism for data join operations, resulting in improved performance and reliability. - Added new methods to facilitate the creation and management of join codecs. - **Bug Fixes** - Enhanced error handling for join codec operations, ensuring clearer context for failures. - **Documentation** - Improved code readability and clarity through updated comments and method signatures. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary building join output schema should belong to metadata store - and also reduces the size of fetcher. ## Cheour clientslist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an optimized caching mechanism for data join operations, resulting in improved performance and reliability. - Added new methods to facilitate the creation and management of join codecs. - **Bug Fixes** - Enhanced error handling for join codec operations, ensuring clearer context for failures. - **Documentation** - Improved code readability and clarity through updated comments and method signatures. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
building join output schema should belong to metadata store - and also reduces the size of fetcher.
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Documentation