-
Notifications
You must be signed in to change notification settings - Fork 0
BigTable / Fetcher updates - use closeAsync, setTimeouts, allow bulkRead / readRows choice #630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request enhances the GCP integration by introducing configurable options and additional methods in the BigTableKVStoreImpl. The multiGet functionality now conditionally uses bulk or standard row reads based on a new configuration parameter. New helper methods have been added for query filtering and setting retry settings in GcpApiImpl, with corresponding updates in tests using property-based approaches. Additionally, Fetcher has been refactored to simplify combined response mapping and improve latency metric logging. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant KVStore
Client->>KVStore: multiGet(requests)
alt useBulkReadRows true
KVStore->>KVStore: bulkReadRowsMultiGet(requestsByDataset)
else
KVStore->>KVStore: readRowsMultiGet(requestsByDataset)
end
KVStore-->>Client: responses
sequenceDiagram
participant Client
participant GcpApiImpl
participant ConfigHelper
participant KVStore
Client->>GcpApiImpl: Initialize service with conf
GcpApiImpl->>ConfigHelper: getOptional/getOrElseThrow
ConfigHelper-->>GcpApiImpl: Config values
GcpApiImpl->>GcpApiImpl: setClientRetrySettings(builder, conf)
GcpApiImpl->>KVStore: new BigTableKVStoreImpl(dataClient, adminClient, bqClient, conf)
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (1)
🧰 Additional context used🧠 Learnings (1)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)
🧬 Code Graph Analysis (1)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (3)
⏰ Context from checks skipped due to timeout of 90000ms (21)
🔇 Additional comments (42)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)
146-249
: bulkReadRowsMultiGet.Flush via closeAsync is good; consider partial failures if close fails.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala
(6 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
(6 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
(7 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (4)
create
(94-94)create
(96-125)multiPut
(460-510)multiGet
(127-144)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2)
GcpApiImpl
(26-195)GcpApiImpl
(197-246)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ApiFutureUtils.scala (2)
ApiFutureUtils
(12-40)toCompletableFuture
(14-39)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (37)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (3)
147-147
: Clean merging logic.Straightforward approach to combine internal and external responses.
206-206
: Metric rename is consistent.Renaming clarifies the difference between request-level and overall latency.
216-216
: New context usage.Good practice to isolate metrics for join-fetching.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (21)
5-5
: Added import for bulk read config.Enables bulk read rows testing.
26-26
: Table-driven tests support.Good approach for multiple configurations.
106-135
: Property-based test for blob data.Solid coverage verifying serialization round trip.
139-173
: Blob data update test.Ensures correct overwrite behavior.
317-337
: Time series range query test.Validates range correctness.
341-359
: Multiple dataset time series test.Checks multi-dataset queries.
363-382
: Single-day time series coverage.Good narrow range check.
386-408
: Time series test with no data.Verifies handling of empty intervals.
413-438
: Multiple keys in time series.Ensures correctness across different keys.
443-472
: Repeated streaming tile updates.Covers overwriting on same tile.
477-507
: Streaming tiled query (multiple days).Checks expanded date range.
511-545
: Multiple keys with streaming tiles.Tests parallel streams properly.
549-598
: Mixed batch end times.Verifies queries with different intervals.
602-632
: One day streaming tile query.Simple interval coverage check.
636-666
: Same-day streaming tile query.Focus on partial-day coverage.
669-701
: Streaming tiled query with no data.Correctly handles empty partition.
704-717
: Helper: writeGeneratedTimeSeriesData.Straightforward utility for tests.
719-728
: Helper: generateAndWriteTimeSeriesData.Well-structured for multi-key scenario.
730-739
: Blob value validation helper.Ensures payload is intact.
741-749
: Time series value validation helper.Checks timestamps and payload correctness.
751-757
: createConfigurations function.Neat table-driven approach for enabling/disabling bulk reads.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (8)
13-13
: New import for retry settings.Helps configure Bigtable timeouts.
21-21
: Added java.time.Duration.Needed for custom retry durations.
28-28
: Importing GcpApiImpl.Clean internal reference.
29-29
: Potential duplicate import?Ensure no duplication is introduced.
97-98
: Applying client settings.Customized retry/timeouts improve resiliency.
107-107
: Pass conf in constructor.Consistent with new config usage.
128-175
: setClientRetrySettings.Good handling of environment-based retry config.
199-229
: New config constants.Centralizes environment keys for clarity.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (5)
72-73
: Extended constructor.Allows usage of BigQuery and additional config.
92-93
: useBulkReadRows from config.Flexible switching between bulk and normal reads.
128-144
: multiGet logic update.Cleanly branches bulk vs read approach.
251-332
: readRowsMultiGet.Straight path for standard fetch.
363-390
: Time series filters set.Neat function for daily partitioning.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala
Outdated
Show resolved
Hide resolved
val closeFuture = batcher.closeAsync() | ||
val closeCompletableFuture = ApiFutureUtils.toCompletableFuture(closeFuture) | ||
val closeScalaFuture = FutureConverters.toScala(closeCompletableFuture) | ||
|
||
// order matters - we need to ensure the close op is done as that triggers flushes of pending work which we | ||
// need as part of the final Future[List[Row]] | ||
val resultFuture: Future[util.List[Row]] = for { | ||
_ <- closeScalaFuture // close the batcher (which flushes pending work) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the docs - we don't seem to need this close method, i notice that rivulet does it - but wonder what if we don't do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This triggers a flush of any outstanding batches - as part of batcher.add we are queueing up work which ideally for us gets sent immediately based on our batch size = 1 and initial delay being unset, but in case it is held on for a bit - we need the call to be flushed out which this method is doing for us.
Future.sequence(datasetFutures).map(_.flatten) | ||
private def readRowsMultiGet( | ||
requestsByDataset: Map[String, Seq[KVStore.GetRequest]]): Seq[Future[Seq[KVStore.GetResponse]]] = { | ||
requestsByDataset.map { case (dataset, datasetRequests) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: refactor out nested code as functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you have in mind here? pulling out the handling for the different table types or something else?
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala
Outdated
Show resolved
Hide resolved
4ab71bc
to
3435b45
Compare
…ead / readRows choice (#630) ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced bulk read support and refined time series data filtering to enhance data retrieval performance and accuracy. - Improved client retry mechanisms for more robust operations under varying conditions. - **Refactor** - Streamlined the handling of combined responses and updated latency metric reporting for clearer performance insights. - Centralized configuration access for environment variables and settings. - **Tests** - Expanded property-based testing across multiple configurations to ensure consistent and reliable functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…ead / readRows choice (#630) ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced bulk read support and refined time series data filtering to enhance data retrieval performance and accuracy. - Improved client retry mechanisms for more robust operations under varying conditions. - **Refactor** - Streamlined the handling of combined responses and updated latency metric reporting for clearer performance insights. - Centralized configuration access for environment variables and settings. - **Tests** - Expanded property-based testing across multiple configurations to ensure consistent and reliable functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…ead / readRows choice (#630) ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced bulk read support and refined time series data filtering to enhance data retrieval performance and accuracy. - Improved client retry mechanisms for more robust operations under varying conditions. - **Refactor** - Streamlined the handling of combined responses and updated latency metric reporting for clearer performance insights. - Centralized configuration access for environment variables and settings. - **Tests** - Expanded property-based testing across multiple configurations to ensure consistent and reliable functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…ead / readRows choice (#630) ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced bulk read support and refined time series data filtering to enhance data retrieval performance and accuracy. - Improved client retry mechanisms for more robust operations under varying conditions. - **Refactor** - Streamlined the handling of combined responses and updated latency metric reporting for clearer performance insights. - Centralized configuration access for environment variables and settings. - **Tests** - Expanded property-based testing across multiple configurations to ensure consistent and reliable functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…ead / readRows choice (#630) ## Summary ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced bulk read support and refined time series data filtering to enhance data retrieval performance and accuracy. - Improved client retry mechanisms for more robust operations under varying conditions. - **Refactor** - Streamlined the handling of combined responses and updated latency metric reporting for clearer performance insights. - Centralized configuration access for environment variables and settings. - **Tests** - Expanded property-based testing across multiple configurations to ensure consistent and reliable functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Checklist
Summary by CodeRabbit
New Features
Refactor
Tests