Skip to content

Enable tiling support in Gcp Flink jobs #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 10, 2025
Merged

Conversation

piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Feb 7, 2025

Summary

This PR wires up tiling support. Covers a few aspects:

  • BigTable KV store changes to support tiling - we take requests for the '_STREAMING' table for gets and puts using the TileKey thrift interface and map to corresponding BT RowKey + timerange lookups. We've yanked out event based support in the BT kv store. We're writing out data in the Row + tile format documented here - Option 1 - Tiles as Timestamped Rows.
  • Add a Flag in the FlagStore to indicate if we're using Tiling / not. Switched over the fetcher checks to use this instead of the prior GrpByServingInfo.isTilingEnabled flag. Leverage this flag in Flink to choose tiling / not. Set this flag to true in the GcpApi to always use tiling.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested - Tested on the Etsy side by running the job, hitting some fetcher cli endpoints.
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced dynamic tiling capabilities for time series and streaming data processing. This enhancement enables a configurable tiled data mode that improves data retrieval granularity, processing consistency, and overall query performance, resulting in more efficient and predictable operations for end-users.
    • Added new methods for constructing tile keys and row keys, enhancing data management capabilities.
    • Implemented flag-based control for enabling or disabling tiling in various components, allowing for more flexible configurations.
  • Bug Fixes

    • Corrected minor documentation errors in the FlagStore interface.
  • Tests

    • Expanded test coverage to validate new tiling functionalities and ensure robustness in handling time series data.

Copy link

coderabbitai bot commented Feb 7, 2025

Walkthrough

This update integrates tiling functionality across modules. A new .gitignore entry was added, and the TilingUtils now provides a buildTileKey method to create tile keys. Updates in cloud, Flink, online, and Spark components use these tile keys for refined row key construction and streaming requests. A flag store for tiling is introduced in multiple services, modifying flows based on the TILING_ENABLED flag. Minor documentation, import, and parameter renamings are also included.

Changes

Files Change Summary
.gitignore Added entry MODULE.bazel*.
api/.../TilingUtils.scala Added buildTileKey method and conversion import.
cloud_gcp/.../BigTableKVStoreImpl.scala
GcpApiImpl.scala
Updated tiling support: added buildTiledRowKey, getTableType; changed buildRowKey signature; integrated tiling flag store.
cloud_gcp/.../BigTableKVStoreTest.scala Changed key type from String to Array[Byte] and added tiled streaming test cases.
flink/.../AvroCodecFn.scala
FlinkJob.scala
Refactored tile key creation using TilingUtils; added flag-based conditional job selection in Flink.
online/.../(FlagStore.java, FlagStoreConstants.java, Api.scala, FetcherBase.scala) Corrected doc comments; introduced FlagStoreConstants with TILING_ENABLED; renamed afterTsMillis to startTsMillis; exposed and centralized tiling flag logic.
spark/.../(FetcherTest.scala, OnlineUtils.scala) Updated test methods to support tiling and modified method signatures for streaming, using TilingUtils.buildTileKey.

Sequence Diagram(s)

sequenceDiagram
    participant FJ as FlinkJob
    participant FS as FlagStore
    participant JB as GroupByJob
    FJ->>FS: Check TILING_ENABLED flag
    alt Tiling Enabled
        FJ->>JB: runTiledGroupByJob(env)
    else Not Enabled
        FJ->>JB: runGroupByJob(env)
    end
Loading
sequenceDiagram
    participant Caller as Module (AvroCodecFn/OnlineUtils)
    participant TU as TilingUtils
    participant TK as TileKey
    Caller->>TU: buildTileKey(dataset, keyBytes, tileSize, tileStart)
    TU-->>Caller: Return TileKey
    Caller->>TK: Use TileKey in request
Loading

Suggested reviewers

  • nikhil-zlai
  • tchow-zlai

Poem

In lines of code our changes gleam,
Tiling magic fuels the dream.
Keys now crafted with care so fine,
Flags decide the processing line.
Tests and docs join the playful team! 🎉

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)

167-167: LGTM! Consider adding documentation.

The use of TilingUtils.buildTileKey improves code maintainability.

Add a comment explaining the tile key construction parameters and their significance.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

383-389: Extract empty map to a constant.

Consider defining private val EmptyProperties = Map.empty[String, String].asJava as a class constant.

+private val EmptyProperties = Map.empty[String, String].asJava
-    val jobDatastream = if (api.flagStore.isSet(FlagStoreConstants.TILING_ENABLED, Map.empty[String, String].asJava)) {
+    val jobDatastream = if (api.flagStore.isSet(FlagStoreConstants.TILING_ENABLED, EmptyProperties)) {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 54f3f97 and b800513.

📒 Files selected for processing (15)
  • .gitignore (1 hunks)
  • api/py/ai/chronon/repo/run.py (1 hunks)
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala (2 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (10 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (6 hunks)
  • distribution/build_and_upload_gcp_artifacts.sh (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4 hunks)
  • online/src/main/java/ai/chronon/online/FlagStore.java (1 hunks)
  • online/src/main/java/ai/chronon/online/FlagStoreConstants.java (1 hunks)
  • online/src/main/scala/ai/chronon/online/Api.scala (3 hunks)
  • online/src/main/scala/ai/chronon/online/FetcherBase.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (5 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (5 hunks)
✅ Files skipped from review due to trivial changes (2)
  • online/src/main/java/ai/chronon/online/FlagStoreConstants.java
  • online/src/main/java/ai/chronon/online/FlagStore.java
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (25)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (7)

10-10: Import
TilingUtils import is appropriate for tile-based row key logic.


31-31: TimestampRange
Import correctly used for microsecond-level filtering.


51-64: Documentation shift
References tiles instead of events—clearly indicates new approach.


129-143: MultiGet tiling
Switch on table type is consistent. Tiling logic is properly isolated.


188-190: Seq[Byte] usage
Changing from Array to Seq is a clean Scala practice.


278-303: MultiPut row keys
Deleting old cells by timestamp avoids duplication before setting new data.


414-473: TableType & row key
Sealed trait is neat. Tiled row key builder is concise and flexible.

api/src/main/scala/ai/chronon/api/TilingUtils.scala (2)

10-10: CollectionConverters
Needed for keyBytes to Java list conversion.


37-48: buildTileKey
Method nicely applies optional fields on TileKey.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (3)

5-6: FlagStore imports
Necessary for tiling flag usage.


20-21: java.util import
Used by FlagStore function signature—fine.


24-35: tilingEnabledFlagStore
Hardcoded to true keeps tiling active. Straightforward approach.

spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1)

97-99: LGTM! Consistent use of tiling utilities.

The code correctly uses ResolutionUtils for window resolution and TilingUtils for tile key creation.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (3)

3-3: LGTM!

Import is required for the new tiling functionality.


507-511: LGTM!

Method signature change aligns with byte array key handling in BigTableKVStoreImpl.


338-505: LGTM!

Comprehensive test coverage for streaming tile updates and tiled queries across different time ranges.

online/src/main/scala/ai/chronon/online/FetcherBase.scala (3)

22-22: LGTM!

Import is required for resolution-related functionality.


370-381: LGTM!

Clean implementation of conditional tile key generation with proper fallback.


736-740: LGTM!

Clean implementation of tiling flag check with proper null handling.

spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (2)

30-31: LGTM!

Required imports for flag store functionality in tests.


599-614: LGTM!

Clean integration of tiling flag store and test case for tiling functionality.

Also applies to: 759-760

api/py/ai/chronon/repo/run.py (1)

135-135: LGTM!

Updated jar name aligns with new Bazel build process.

.gitignore (1)

98-101: Bazel ignores added.
Bazel temp and module files now excluded.

distribution/build_and_upload_gcp_artifacts.sh (2)

46-46: Use Bazel for Flink jar.
Flink assembly now built via Bazel.


50-50: Update FLINK_JAR path.
Path revised to Bazel output.

dropDsOnWrite: Boolean): Unit = {
val isTiled = groupByConf.isTilingEnabled
dropDsOnWrite: Boolean,
isTiled: Boolean): Unit = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Breaking change: New parameter added.

The addition of isTiled parameter requires updates to all callers.


🏁 Script executed:

#!/bin/bash
# Find all callers of putStreaming method
rg -A 2 "putStreaming\(" 

Length of output: 674


Breaking Change: Update putStreaming Call Sites

  • The putStreaming method now requires an extra Boolean parameter (isTiled) at its declaration.
  • At least one call site (in spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala) hasn’t been updated to supply the new parameter.

Please ensure that all callers are updated to provide an appropriate Boolean value.

@@ -228,7 +224,7 @@ abstract class Api(userConf: Map[String, String]) extends Serializable {

private var timeoutMillis: Long = 10000

private var flagStore: FlagStore = null
var flagStore: FlagStore = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider keeping flagStore private.

The variable is already accessible via setFlagStore. Making it public could lead to thread safety issues.

-var flagStore: FlagStore = null
+private var flagStore: FlagStore = null
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var flagStore: FlagStore = null
private var flagStore: FlagStore = null

val endTime = request.endTsMillis.getOrElse(System.currentTimeMillis())
setQueryTimeSeriesFilters(query, startTs, endTime, request.keyBytes, request.dataset)

case (Some(startTs), StreamingTable) =>
// we need to generate a rowkey corresponding to each tile from the startTs to now
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a todo? Looks like there's only one tileSize coming in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I can clarify the comment a bit better - we do expect only 1 tileSize here so we need to generate a RowKey for that given tile from startTs to now

* row key (without tiling) convention:
* <dataset>#<entity_key>#<start_date>
*/
def buildTiledRowKey(baseKeyBytes: Seq[Byte], dataset: String, ts: Long, tileSizeMs: Long): Array[Byte] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be part of the API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tileKey is part of the api now in a way (for the _STREAMING tables). We could make it explicit when we use the tileKey across the board (we're not using it for the batch table lookups & the tile summary lookups).
The tiledRowKey isn't an explicit part of the API as that is something internal to the BT kv store (we don't expose that structure and our clients to it)

maybeTileSize
.map(tileSize => buildTiledRowKey(keyBytes, dataset, dayTs, tileSize))
.getOrElse(buildRowKey(keyBytes, dataset, Some(dayTs)))

query.rowKey(ByteString.copyFrom(rowKey))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw does this have performance difference with a range query? eg.

query
.range(
        ByteStringRange.unbounded().startClosed(ByteString.copyFrom(startRow)).endOpen(ByteString.copyFrom(endRow)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the hood I think they should map down similarly given we have 1 row with the time range we are querying. The current code ends up being similar / identical to the tile summary tiles so I went with that approach as we've already wired it up.


def buildTileKey(dataset: String,
keyBytes: Array[Byte],
tileSizeMs: Option[Long],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didnt we say there should be a bucket somewhere here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't that the tile size? Or did you have something else in mind?

@piyush-zlai piyush-zlai force-pushed the piyush/bt_tiling_support branch from b800513 to 49331d4 Compare February 10, 2025 14:51
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1)

53-54: Breaking Change: Update putStreaming Call Sites

The addition of isTiled parameter requires updates to all callers.

🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1)

128-128: Address the TODO comment about deprecating putStreaming.

Consider creating a new issue to track the deprecation of putStreaming.

Would you like me to create a new issue to track this deprecation task?

online/src/main/scala/ai/chronon/online/FetcherBase.scala (1)

370-381: Add documentation for future layering support.

The tiling key construction logic is correct, but the TODO comment about layering support should be expanded into proper documentation.

Add a detailed comment explaining:

  • Current single-layer implementation
  • Future multi-layer expansion plans
  • Impact on tile key construction
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between b800513 and 49331d4.

📒 Files selected for processing (13)
  • .gitignore (1 hunks)
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala (2 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (10 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (6 hunks)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4 hunks)
  • online/src/main/java/ai/chronon/online/FlagStore.java (1 hunks)
  • online/src/main/java/ai/chronon/online/FlagStoreConstants.java (1 hunks)
  • online/src/main/scala/ai/chronon/online/Api.scala (3 hunks)
  • online/src/main/scala/ai/chronon/online/FetcherBase.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (5 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • online/src/main/java/ai/chronon/online/FlagStore.java
  • .gitignore
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala
  • online/src/main/java/ai/chronon/online/FlagStoreConstants.java
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
  • online/src/main/scala/ai/chronon/online/Api.scala
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (17)
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (3)

19-19: LGTM!

The new import for TilingUtils is correctly added.

Also applies to: 27-27


81-100: LGTM!

The tiling validation logic is correctly implemented.


97-99: LGTM!

The tile key construction using TilingUtils is correct.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

19-19: LGTM!

The new imports are correctly added.

Also applies to: 49-49


383-389: LGTM!

The tiling flag check is correctly implemented.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (4)

10-11: LGTM!

The new imports are correctly added.

Also applies to: 31-31


426-430: LGTM!

The TableType abstraction and implementation are well-designed.

Also applies to: 467-474


438-442: LGTM!

The tiled row key construction is correctly implemented.


51-63: LGTM!

The documentation is clear and accurate.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (4)

3-3: LGTM!

The TilingUtils import is correctly added.


339-367: LGTM!

The test case for repeated streaming tile updates is well-designed.


370-505: LGTM!

The test cases for tiled queries are comprehensive.


507-516: LGTM!

The method signature change is consistent with the new tiling functionality.

online/src/main/scala/ai/chronon/online/FetcherBase.scala (2)

22-22: LGTM!

The import is required for getting the smallest window resolution in tiling key construction.


736-740: LGTM!

The flag store implementation is robust with proper null handling and type casting.

spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (2)

605-613: LGTM!

The test flag store implementation correctly controls tiling behavior in tests.


757-761: LGTM!

The test case properly validates tiling functionality.

@piyush-zlai piyush-zlai force-pushed the piyush/bt_tiling_support branch from 49331d4 to 9e96614 Compare February 10, 2025 15:10
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1)

97-99: Consider extracting tile key construction to a helper method.

The tile key construction logic could be reused elsewhere.

+  private def buildStreamingTileKey(groupByConf: api.GroupBy, keyBytes: Array[Byte], groupByServingInfo: api.GroupByServingInfo): api.TileKey = {
+    TilingUtils.buildTileKey(
+      groupByConf.streamingDataset,
+      keyBytes,
+      Some(ResolutionUtils.getSmallestWindowResolutionInMillis(groupByServingInfo.groupBy)),
+      None)
+  }
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)

426-430: Consider using sealed trait for TableType.

This would help catch all cases in pattern matching at compile time.

-  sealed trait TableType
+  sealed trait TableType
spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (1)

605-613: Consider extracting FlagStore implementation.

The anonymous FlagStore implementation could be reused in other tests.

+  private class TestFlagStore(enableTiling: Boolean) extends FlagStore {
+    override def isSet(flagName: String, attributes: util.Map[String, String]): lang.Boolean = {
+      if (flagName == FlagStoreConstants.TILING_ENABLED) {
+        enableTiling
+      } else {
+        false
+      }
+    }
+  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 49331d4 and 9e96614.

📒 Files selected for processing (13)
  • .gitignore (1 hunks)
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala (2 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (10 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (6 hunks)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4 hunks)
  • online/src/main/java/ai/chronon/online/FlagStore.java (1 hunks)
  • online/src/main/java/ai/chronon/online/FlagStoreConstants.java (1 hunks)
  • online/src/main/scala/ai/chronon/online/Api.scala (3 hunks)
  • online/src/main/scala/ai/chronon/online/FetcherBase.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (5 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
  • online/src/main/java/ai/chronon/online/FlagStoreConstants.java
  • online/src/main/java/ai/chronon/online/FlagStore.java
  • .gitignore
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
  • api/src/main/scala/ai/chronon/api/TilingUtils.scala
  • online/src/main/scala/ai/chronon/online/Api.scala
  • online/src/main/scala/ai/chronon/online/FetcherBase.scala
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (6)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

19-19: LGTM! Required imports added.

The imports are necessary for flag store access and Scala-Java map conversion.

Also applies to: 49-49


383-389: Consider improving flag check readability.

Extract the flag check into a descriptive variable and verify if an empty context map is appropriate.

-    val jobDatastream = if (api.flagStore.isSet(FlagStoreConstants.TILING_ENABLED, Map.empty[String, String].asJava)) {
+    val isTilingEnabled = api.flagStore.isSet(FlagStoreConstants.TILING_ENABLED, Map.empty[String, String].asJava)
+    val jobDatastream = if (isTilingEnabled) {
✅ Verification successful

🏁 Script executed:

#!/bin/bash
# Check if flagStore.isSet is called with context in other files
rg -A 2 "flagStore\.isSet"

Length of output: 355


Refactor for better readability is valid.

  • Only usage of flagStore.isSet is in FlinkJob.scala (lines 383-389).
  • The empty map context is consistently implemented.
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (2)

54-54: Breaking Change: Update putStreaming Call Sites

The putStreaming method now requires an extra Boolean parameter (isTiled).


81-82: LGTM! Good validation for Entity groupBy's.

The assertion prevents tiling for Entity groupBy's which is not yet supported.

Also applies to: 97-98

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)

438-442: LGTM! Clear row key convention documentation.

The documentation clearly explains the row key structure for both tiled and non-tiled cases.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (1)

338-505: LGTM! Comprehensive test coverage for tiling.

The test cases cover:

  • Repeated streaming tile updates
  • Multiple day queries
  • Single day queries
  • Same day queries
  • Days without data

@piyush-zlai piyush-zlai merged commit 649883c into main Feb 10, 2025
7 checks passed
@piyush-zlai piyush-zlai deleted the piyush/bt_tiling_support branch February 10, 2025 20:26
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary
This PR wires up tiling support. Covers a few aspects:
* BigTable KV store changes to support tiling - we take requests for the
'_STREAMING' table for gets and puts using the TileKey thrift interface
and map to corresponding BT RowKey + timerange lookups. We've yanked out
event based support in the BT kv store. We're writing out data in the
Row + tile format documented here - [Option 1 - Tiles as Timestamped
Rows](https://docs.google.com/document/d/1wgzJVAkl5K1bBCr98WCZFiFeTTWqILdA3FTE7cz9Li4/edit?tab=t.0#bookmark=id.j54a5g8gj2m9).
* Add a Flag in the FlagStore to indicate if we're using Tiling / not.
Switched over the fetcher checks to use this instead of the prior
GrpByServingInfo.isTilingEnabled flag. Leverage this flag in Flink to
choose tiling / not. Set this flag to true in the GcpApi to always use
tiling.


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Tested on the Etsy side by running the job,
hitting some fetcher cli endpoints.
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced dynamic tiling capabilities for time series and streaming
data processing. This enhancement enables a configurable tiled data mode
that improves data retrieval granularity, processing consistency, and
overall query performance, resulting in more efficient and predictable
operations for end-users.
- Added new methods for constructing tile keys and row keys, enhancing
data management capabilities.
- Implemented flag-based control for enabling or disabling tiling in
various components, allowing for more flexible configurations.

- **Bug Fixes**
  - Corrected minor documentation errors in the FlagStore interface.

- **Tests**
- Expanded test coverage to validate new tiling functionalities and
ensure robustness in handling time series data.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary
This PR wires up tiling support. Covers a few aspects:
* BigTable KV store changes to support tiling - we take requests for the
'_STREAMING' table for gets and puts using the TileKey thrift interface
and map to corresponding BT RowKey + timerange lookups. We've yanked out
event based support in the BT kv store. We're writing out data in the
Row + tile format documented here - [Option 1 - Tiles as Timestamped
Rows](https://docs.google.com/document/d/1wgzJVAkl5K1bBCr98WCZFiFeTTWqILdA3FTE7cz9Li4/edit?tab=t.0#bookmark=id.j54a5g8gj2m9).
* Add a Flag in the FlagStore to indicate if we're using Tiling / not.
Switched over the fetcher checks to use this instead of the prior
GrpByServingInfo.isTilingEnabled flag. Leverage this flag in Flink to
choose tiling / not. Set this flag to true in the GcpApi to always use
tiling.


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Tested on the Etsy side by running the job,
hitting some fetcher cli endpoints.
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced dynamic tiling capabilities for time series and streaming
data processing. This enhancement enables a configurable tiled data mode
that improves data retrieval granularity, processing consistency, and
overall query performance, resulting in more efficient and predictable
operations for end-users.
- Added new methods for constructing tile keys and row keys, enhancing
data management capabilities.
- Implemented flag-based control for enabling or disabling tiling in
various components, allowing for more flexible configurations.

- **Bug Fixes**
  - Corrected minor documentation errors in the FlagStore interface.

- **Tests**
- Expanded test coverage to validate new tiling functionalities and
ensure robustness in handling time series data.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
This PR wires up tiling support. Covers a few aspects:
* BigTable KV store changes to support tiling - we take requests for the
'_STREAMING' table for gets and puts using the TileKey thrift interface
and map to corresponding BT RowKey + timerange lookups. We've yanked out
event based support in the BT kv store. We're writing out data in the
Row + tile format documented here - [Option 1 - Tiles as Timestamped
Rows](https://docs.google.com/document/d/1wgzJVAkl5K1bBCr98WCZFiFeTTWqILdA3FTE7cz9Li4/edit?tab=t.0#bookmark=id.j54a5g8gj2m9).
* Add a Flag in the FlagStore to indicate if we're using Tiling / not.
Switched over the fetcher checks to use this instead of the prior
GrpByServingInfo.isTilingEnabled flag. Leverage this flag in Flink to
choose tiling / not. Set this flag to true in the GcpApi to always use
tiling.


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Tested on the our clients side by running the job,
hitting some fetcher cli endpoints.
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced dynamic tiling capabilities for time series and streaming
data processing. This enhancement enables a configurable tiled data mode
that improves data retrieval granularity, processing consistency, and
overall query performance, resulting in more efficient and predictable
operations for end-users.
- Added new methods for constructing tile keys and row keys, enhancing
data management capabilities.
- Implemented flag-based control for enabling or disabling tiling in
various components, allowing for more flexible configurations.

- **Bug Fixes**
  - Corrected minor documentation errors in the FlagStore interface.

- **Tests**
- Expanded test coverage to validate new tiling functionalities and
ensure robustness in handling time series data.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
This PR wires up tiling support. Covers a few aspects:
* BigTable KV store changes to support tiling - we take requests for the
'_STREAMING' table for gets and puts using the TileKey thrift interface
and map to corresponding BT RowKey + timerange lookups. We've yanked out
event based support in the BT kv store. We're writing out data in the
Row + tile format documented here - [Option 1 - Tiles as Timestamped
Rows](https://docs.google.com/document/d/1wgzJVAkl5K1bBCr98WCZFiFeTTWqILdA3FTE7cz9Li4/edit?tab=t.0#bookmark=id.j54a5g8gj2m9).
* Add a Flag in the FlagStore to indicate if we're using Tiling / not.
Switched over the fetcher checks to use this instead of the prior
GrpByServingInfo.isTilingEnabled flag. Leverage this flag in Flink to
choose tiling / not. Set this flag to true in the GcpApi to always use
tiling.


## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Tested on the our clients side by running the job,
hitting some fetcher cli endpoints.
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced dynamic tiling capabilities for time series and streaming
data processing. This enhancement enables a configurable tiled data mode
that improves data retrieval granularity, processing consistency, and
overall query performance, resulting in more efficient and predictable
operations for end-users.
- Added new methods for constructing tile keys and row keys, enhancing
data management capabilities.
- Implemented flag-based control for enabling or disabling tiling in
various components, allowing for more flexible configurations.

- **Bug Fixes**
  - Corrected minor documentation errors in the FlagStore interface.

- **Tests**
- Expanded test coverage to validate new tiling functionalities and
ensure robustness in handling time series data.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
This PR wires up tiling support. Covers a few aspects:
* BigTable KV store changes to support tiling - we take requests for the
'_STREAMING' table for gets and puts using the TileKey thrift interface
and map to corresponding BT RowKey + timerange lookups. We've yanked out
event based support in the BT kv store. We're writing out data in the
Row + tile format documented here - [Option 1 - Tiles as Timestamped
Rows](https://docs.google.com/document/d/1wgzJVAkl5K1bBCr98WCZFiFeTTWqILdA3FTE7cz9Li4/edit?tab=t.0#bookmark=id.j54a5g8gj2m9).
* Add a Flag in the FlagStore to indicate if we're using Tiling / not.
Switched over the fetcher cheour clientss to use this instead of the prior
GrpByServingInfo.isTilingEnabled flag. Leverage this flag in Flink to
choose tiling / not. Set this flag to true in the GcpApi to always use
tiling.


## Cheour clientslist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Tested on the our clients side by running the job,
hitting some fetcher cli endpoints.
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced dynamic tiling capabilities for time series and streaming
data processing. This enhancement enables a configurable tiled data mode
that improves data retrieval granularity, processing consistency, and
overall query performance, resulting in more efficient and predictable
operations for end-users.
- Added new methods for constructing tile keys and row keys, enhancing
data management capabilities.
- Implemented flag-based control for enabling or disabling tiling in
various components, allowing for more flexible configurations.

- **Bug Fixes**
  - Corrected minor documentation errors in the FlagStore interface.

- **Tests**
- Expanded test coverage to validate new tiling functionalities and
ensure robustness in handling time series data.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants