-
Notifications
You must be signed in to change notification settings - Fork 0
Add value info struct to join schema response to pull feature -> key mapping #728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughA new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Service
participant MetadataStore
participant JoinCodec
Client->>Service: Request join schema
Service->>MetadataStore: Fetch join config
MetadataStore->>JoinCodec: Build join codec (collect valueInfos)
JoinCodec-->>MetadataStore: Return codec with valueInfos
MetadataStore-->>Service: Return join schema with valueInfos
Service-->>Client: Respond with join schema (includes valueInfos)
Poem
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
c42ee3b
to
ebf12b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (1)
88-90
:⚠️ Potential issueWait timeout is 10 000 seconds
Duration(10000, SECONDS)
≈ 2.7 h; comment says “change back to millis”. Use seconds in single digits or milliseconds.-val result = Await.result(future, Duration(10000, SECONDS)) // todo: change back to millis +val result = Await.result(future, Duration("10s"))
🧹 Nitpick comments (4)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherMetadataTest.scala (1)
28-30
: Executor never shutdown
ExecutionContext
backed by a fixed pool is left running; tests may leak threads.-implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) +val exec = Executors.newFixedThreadPool(1) +implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(exec) +try { + // test body … +} finally { + exec.shutdownNow() +}spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherFailureTest.scala (1)
39-41
: Thread leak in testSame fixed-thread pool not closed. Wrap in
try/finally
and callshutdownNow()
.spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (1)
67-70
: Close executor
newFixedThreadPool(1)
created per test run; not shut down. Addfinally { exec.shutdownNow() }
.spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (1)
122-134
: Large string opsRepeated
logger.info
with multiline string inside loop may flood logs. Consider debug level or guard.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherFailureTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherMetadataTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (8)
aggregator/src/test/scala/ai/chronon/aggregator/test/DataGen.scala (1)
Column
(177-205)api/src/main/scala/ai/chronon/api/DataType.scala (7)
BooleanType
(148-148)DoubleType
(142-142)IntType
(138-138)ListType
(158-158)LongType
(140-140)StringType
(152-152)StructField
(163-163)api/src/main/scala/ai/chronon/api/Builders.scala (4)
Builders
(27-371)Source
(106-140)Selects
(29-39)MetaData
(261-315)api/src/main/scala/ai/chronon/api/TsUtils.scala (2)
TsUtils
(23-42)datetimeToTs
(36-38)spark/src/main/scala/ai/chronon/spark/Extensions.scala (3)
Extensions
(37-299)save
(141-151)withTimeBasedColumn
(227-232)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (4)
TableUtils
(43-613)TableUtils
(615-617)sql
(289-317)createDatabase
(125-138)spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala (4)
DataFrameGen
(39-158)entities
(65-72)events
(52-62)mutations
(83-157)
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: analyzer_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: flink_tests
- GitHub Check: spark_tests
- GitHub Check: service_commons_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: online_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: flink_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: api_tests
- GitHub Check: fetcher_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: analyzer_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { case (_, kvMap) => | ||
singleFileMetadataStore.put(kvMap, singleFileDataSet) | ||
} | ||
singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unbounded Await
Await.result(..., Duration.Inf)
can hang forever, hiding failures.
-singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf))
+singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration("30s")))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { case (_, kvMap) => | |
singleFileMetadataStore.put(kvMap, singleFileDataSet) | |
} | |
singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) | |
val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { case (_, kvMap) => | |
singleFileMetadataStore.put(kvMap, singleFileDataSet) | |
} | |
singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration("30s"))) |
e6a4d27
to
429edb2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherMetadataTest.scala (1)
54-54
: Fix unbounded Await durations.Multiple Await.result calls with Duration.Inf can lead to hangs if failures occur.
Apply this diff:
-singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) +singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration("30s")))
🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherMetadataTest.scala (2)
34-34
: Replace println with proper logging.Use SLF4J logger instead of println for consistent logging.
- println(s"conf resource path for dir walker: ${confResource.getPath}") + // Add logger definition at class level: private val logger = LoggerFactory.getLogger(getClass) + logger.debug(s"conf resource path for dir walker: ${confResource.getPath}")
47-50
: Consider removing or clarifying the configuration comment.Developer note about IntelliJ configuration could be misleading.
- // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing + // Note: If tests fail in IntelliJ, set working directory to /chronon instead of $MODULE_DIR
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (9)
online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java
(1 hunks)online/src/main/scala/ai/chronon/online/JoinCodec.scala
(2 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(2 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
(3 hunks)service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherFailureTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherMetadataTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java
- online/src/main/scala/ai/chronon/online/JoinCodec.scala
- online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherFailureTest.scala
- online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
- online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java
- spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: streaming_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: spark_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: online_tests
- GitHub Check: online_tests
- GitHub Check: groupby_tests
- GitHub Check: flink_tests
- GitHub Check: spark_tests
- GitHub Check: service_commons_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: groupby_tests
- GitHub Check: batch_tests
- GitHub Check: api_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherMetadataTest.scala (1)
125-131
: Verify valueInfos implementation matches requirements.The PR aims to add valueInfos to map features to keys, so this test is critical.
The test correctly validates that:
- All left keys are present in valueInfos
- Value schema fields match value info fields
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (2)
200-200
: Code refactoring improves maintainability.Moving data generation to FetcherTestUtil simplifies this test class.
The refactoring makes tests more maintainable by centralizing test data generation.
Also applies to: 206-206, 216-216
130-130
:✅ Verification successful
Verify FetcherTestUtil implementation handles all test cases.
Make sure utility methods support all previous test scenarios.
Also applies to: 151-152, 157-157
🏁 Script executed:
#!/bin/bash # Check if FetcherTestUtil implements all necessary methods # First check if the file exists if fd -t f "FetcherTestUtil.scala" --exec cat {} \; | grep -q "object FetcherTestUtil"; then echo "FetcherTestUtil exists" # Check for required methods fd -t f "FetcherTestUtil.scala" --exec cat {} \; | grep -E "def generate(Random|Mutation|EventOnly)Data" fd -t f "FetcherTestUtil.scala" --exec cat {} \; | grep "def joinResponses" echo "Checking references to FetcherTestUtil methods:" rg "FetcherTestUtil\." --type scala else echo "FetcherTestUtil.scala not found!" fiLength of output: 2898
FetcherTestUtil verification complete
Verified that
FetcherTestUtil
defines all utility methods used across tests—generateRandomData
,generateMutationData
,generateEventOnlyData
, andjoinResponses
—and that each invocation inChainingFetcherTest
,FetcherMetadataTest
,FetcherFailureTest
, andFetcherTest.scala
is supported. No missing methods; no further action needed.
val joinKeyName = "joins/team.example_join.v1" | ||
|
||
val response = inMemoryKvStore.get(GetRequest(joinKeyName.getBytes(), singleFileDataSet)) | ||
val res = Await.result(response, Duration.Inf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace all unbounded Await durations.
These additional Await.result calls also use Duration.Inf.
- val res = Await.result(response, Duration.Inf)
+ val res = Await.result(response, Duration("30s"))
- directoryPut.flatMap(putRequests => Await.result(putRequests, Duration.Inf))
+ directoryPut.flatMap(putRequests => Await.result(putRequests, Duration("30s")))
- val dirRes = Await.result(dirResponse, Duration.Inf)
+ val dirRes = Await.result(dirResponse, Duration("30s"))
- val emptyRes = Await.result(emptyResponse, Duration.Inf)
+ val emptyRes = Await.result(emptyResponse, Duration("30s"))
Also applies to: 78-78, 82-82, 92-92
val spark: SparkSession = submission.SparkSessionBuilder.build(sessionName, local = true) | ||
|
||
it should "test metadata store" in { | ||
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add resource cleanup for ExecutionContext.
Thread pool is never shut down.
- implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
+ val executor = Executors.newFixedThreadPool(1)
+ implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)
+ // Add in "after" or "tearDown" block: executor.shutdown()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) | |
// allocate a dedicated thread pool and make it the implicit context | |
val executor = Executors.newFixedThreadPool(1) | |
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor) | |
// TODO: in your “after”/“tearDown” (e.g. afterAll) block, add: | |
// executor.shutdown() |
val namespace = "join_schema" | ||
val tableUtils: TableUtils = TableUtils(spark) | ||
val joinConf = FetcherTestUtil.generateEventOnlyData(namespace, tableUtils, spark) | ||
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add resource cleanup for second ExecutionContext.
Thread pool in second test method is also never shut down.
- implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
+ val executor = Executors.newFixedThreadPool(1)
+ implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)
+ // Remember to shut down in cleanup/teardown: executor.shutdown()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) | |
val executor = Executors.newFixedThreadPool(1) | |
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor) | |
// Remember to shut down in cleanup/teardown: executor.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…mapping (#728) ## Summary Updating the JoinSchemaResponse to include a mapping from feature -> listing key. This PR updates our JoinSchemaResponse to include a value info case class with these details. ## Checklist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added detailed metadata for join value fields, including feature names, group names, prefixes, left keys, and schema descriptions, now available in join schema responses. - **Bug Fixes** - Improved consistency and validation between join configuration keys and value field metadata. - **Tests** - Enhanced and added tests to validate the presence and correctness of value field metadata in join schema responses. - Introduced new test suites covering fetcher failure scenarios and metadata store functionality. - Refactored existing fetcher tests to use external utility methods for data generation. - Added utility methods for generating deterministic, random, and event-only test data configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…mapping (#728) ## Summary Updating the JoinSchemaResponse to include a mapping from feature -> listing key. This PR updates our JoinSchemaResponse to include a value info case class with these details. ## Checklist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added detailed metadata for join value fields, including feature names, group names, prefixes, left keys, and schema descriptions, now available in join schema responses. - **Bug Fixes** - Improved consistency and validation between join configuration keys and value field metadata. - **Tests** - Enhanced and added tests to validate the presence and correctness of value field metadata in join schema responses. - Introduced new test suites covering fetcher failure scenarios and metadata store functionality. - Refactored existing fetcher tests to use external utility methods for data generation. - Added utility methods for generating deterministic, random, and event-only test data configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…mapping (#728) ## Summary Updating the JoinSchemaResponse to include a mapping from feature -> listing key. This PR updates our JoinSchemaResponse to include a value info case class with these details. ## Cheour clientslist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added detailed metadata for join value fields, including feature names, group names, prefixes, left keys, and schema descriptions, now available in join schema responses. - **Bug Fixes** - Improved consistency and validation between join configuration keys and value field metadata. - **Tests** - Enhanced and added tests to validate the presence and correctness of value field metadata in join schema responses. - Introduced new test suites covering fetcher failure scenarios and metadata store functionality. - Refactored existing fetcher tests to use external utility methods for data generation. - Added utility methods for generating deterministic, random, and event-only test data configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Updating the JoinSchemaResponse to include a mapping from feature -> listing key. This PR updates our JoinSchemaResponse to include a value info case class with these details.
Checklist
Summary by CodeRabbit
Summary by CodeRabbit