-
Notifications
You must be signed in to change notification settings - Fork 0
feat: tiling support api changes part 1 #288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe pull request modifies multiple files to enhance code clarity and functionality. Changes include improvements in import statements for better readability, updates to the Changes
Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
87f9800
to
f46bcbb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1)
Line range hint
56-64
: Add assertions to validate job submission.The test lacks verification steps.
val submitter = DataprocSubmitter() - submitter.submit(spark.FlinkJob, + val jobId = submitter.submit(spark.FlinkJob, Map(MainClass -> "ai.chronon.flink.FlinkJob", FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar", JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"), List.empty, "--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl", "--groupby-name=e2e-count", "-ZGCP_PROJECT_ID=bigtable-project-id", - "-ZGCP_INSTANCE_ID=bigtable-instance-id") + "-ZGCP_INSTANCE_ID=bigtable-instance-id") + assertNotNull("Job ID should not be null", jobId)
🧹 Nitpick comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (2)
56-56
: Remove the "DUMMY COMMIT" comment.This comment doesn't provide any value.
- // DUMMY COMMIT
Line range hint
56-64
: Document the test's purpose.Add a descriptive comment explaining what this test verifies.
+ // Verifies Flink job submission with BigTable configuration val submitter = DataprocSubmitter()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: scala_compile_fmt_fix
f46bcbb
to
7bc42b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
online/src/main/scala/ai/chronon/online/FetcherBase.scala (1)
365-380
: Consider parameterizing tile configurationThe tile key configuration uses hardcoded values (-1) for
tileSizeMillis
andtileStartTimestampMillis
.Consider making these configurable:
- new TileKey() - .setKeyBytes(streamingKeyBytes.toList.map((b) => java.lang.Byte.valueOf(b)).asJava) - .setDataset(groupByServingInfo.groupByOps.streamingDataset) - .setTileSizeMillis(-1) - .setTileStartTimestampMillis(-1)) + new TileKey() + .setKeyBytes(streamingKeyBytes.toList.map((b) => java.lang.Byte.valueOf(b)).asJava) + .setDataset(groupByServingInfo.groupByOps.streamingDataset) + .setTileSizeMillis(groupByServingInfo.tileConfig.tileSizeMillis) + .setTileStartTimestampMillis(groupByServingInfo.tileConfig.startTimestampMillis))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(1 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/FetcherBase.scala
(8 hunks)
✅ Files skipped from review due to trivial changes (1)
- online/src/main/scala/ai/chronon/online/Api.scala
🚧 Files skipped from review as they are similar to previous changes (1)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
online/src/main/scala/ai/chronon/online/FetcherBase.scala (5)
52-52
: LGTM: Import additionClean import of
TileKey
for tile-based streaming.
74-74
: LGTM: Method signature updateChanged from
Option[Seq[TimedValue]]
toSeq[TimedValue]
for simpler handling.
103-103
: LGTM: Simplified null checkDirect empty check on sequence instead of option handling.
452-453
: LGTM: Streamlined streaming response handlingSimplified streaming response handling by flattening the responses.
Also applies to: 462-462
724-724
: LGTM: Case class field updateUpdated
GroupByRequestMeta
to useSeq[GetRequest]
for streaming requests.
cfe39e9
to
944f2a6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)
251-251
: Use Some(ts) instead of Option(ts)
Option(ts)
can handle null values, but sincets
is a primitive Long,Some(ts)
is more idiomatic.- buildRowKey(request.keyBytes, request.dataset, Option(ts)) + buildRowKey(request.keyBytes, request.dataset, Some(ts))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (7)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala
(5 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(1 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/FetcherBase.scala
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
- cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala
🚧 Files skipped from review as they are similar to previous changes (2)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
- online/src/main/scala/ai/chronon/online/Api.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
98-101
: LGTM! Good logging practice.The logging statement provides clear runtime information about the job configuration.
online/src/main/scala/ai/chronon/online/FetcherBase.scala (3)
21-21
: Neat aggregator import usage.
No issues observed.
23-29
: Consolidated imports look good.
They align well with streaming logic.
35-36
: Concurrency imports are correct.
Use of Futures and Try is appropriate.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (3)
3-7
: LGTM: Import organizationImports are now better organized, grouping related imports together.
381-397
: Add validation for tileSizeMs
buildTiledRowKey
should validate thattileSizeMs
is positive and a multiple of a reasonable time unit.
450-454
:⚠️ Potential issueIncomplete implementation
computeTiles
returns a dummy tile instead of actual computation.
def tileQueryColumnBased(query: Query, startTs: Long, endTs: Long) = {} | ||
|
||
def tileQueryCellBased(query: Query, startTs: Long, endTs: Long) = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty method implementations
tileQueryColumnBased
and tileQueryCellBased
are empty. Either implement them or mark as TODO.
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
944f2a6
to
73df588
Compare
Summary
Checklist
Summary by CodeRabbit
BigTableKVStoreTest
class for consistency.Api.scala
file for improved readability.DynamoDBKVStoreImpl
andBigTableKVStoreImpl
files.FetcherBase
class, enhancing the flexibility of the fetching mechanism.mapDatasetToTable
method for better structure in theBigTableKVStore
.