Skip to content

Migrate from Play to Vert.x for Hub backend service #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Dec 13, 2024

Conversation

piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Dec 9, 2024

Summary

Migrate from Play to Vert.x for our Hub service to settle on one backend.

Most of the code is a copy / fork of the existing Hub code. Code is copied / identical where possible (e.g. the core underlying stuff that backs the play controllers / vert.x handlers). Some pieces like the play controllers -> vertx handlers are different in the web framework interfacing logic.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced WebServiceVerticle for managing the web service entry point.
    • Added FeaturesHandler, FeaturesRouter, and TimeSeriesHandler for handling feature-related requests and time series data.
    • Implemented InMemKVStoreHandler for managing in-memory key-value store operations.
  • Improvements

    • Enhanced error handling and response management across various handlers, including pagination and validation.
    • Updated logging configuration for better flexibility and readability.
  • Bug Fixes

    • Improved validation and error responses for pagination parameters across handlers.
  • Chores

    • Removed obsolete controllers and tests to streamline the codebase.
    • Restructured package organization for better clarity and maintainability.
    • Updated project dependencies and SBT plugins for improved build management.

Copy link

coderabbitai bot commented Dec 9, 2024

Walkthrough

The pull request introduces significant updates across multiple files, primarily focusing on enhancing the structure and functionality of the Scala application. Key changes include the addition of new modules, such as hub and service_commons, along with modifications to the build configuration and Docker setup. The workflow for testing Scala modules has been updated to include new job steps, and various controllers have been removed or refactored. Additionally, new handlers and services have been implemented to improve API interactions, particularly for managing joins and models.

Changes

File Change Summary
.github/workflows/test_scala_no_spark.yaml Added a new job step for running tests related to the hub module before service tests; minor formatting change.
build.sbt Introduced new dependencies for Vert.x and restructured projects: renamed hub to service_commons, created a new hub project, and updated project dependencies and aggregates.
docker-init/Dockerfile Updated to copy a JAR file instead of unzipping a ZIP file; streamlined Python dependency installation; added a new configuration file.
docker-init/build.sh Removed references to the Hub module; updated usage message and build logic.
docker-init/compose.yaml Added a new statsd service; updated app service to depend on statsd and set environment variable STATSD_HOST.
docker-init/hub/config.json Introduced a new configuration file with key-value pairs for online application settings.
docker-init/start.sh Enhanced error handling and updated command for executing the hub component from a script to a JAR file.
service/src/main/java/ai/chronon/service/WebServiceVerticle.java Added a new Vert.x-based web service entry point with HTTP server initialization and routing.
service/src/main/java/ai/chronon/service/handlers/FeaturesHandler.java Implemented a handler for bulk retrieval of features with error handling and response formatting.
service/src/main/java/ai/chronon/service/handlers/FeaturesRouter.java Created routing configuration for handling feature-related HTTP requests.
service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerJsonSerDeTest.java Introduced unit tests for JSON serialization/deserialization of JavaRequest objects.
service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerTest.java Added unit tests for the FeaturesHandler class, covering various scenarios.
hub/app/controllers/*.scala Removed multiple controller classes (e.g., ApplicationController, InMemKVStoreController, JoinController, etc.) that handled various API requests.
hub/app/module/*.scala Removed several Guice modules related to the DriftStore, InMemoryKVStore, and ModelStore.
hub/conf/application.conf Removed the main configuration file for the application.
hub/conf/routes Deleted route definitions for various API endpoints.
project/plugins.sbt Added several SBT plugins and removed one.
hub/src/main/scala/ai/chronon/hub/HubVerticle.java Introduced a new entry point for the Chronon Hub HTTP service with route management.
hub/src/main/scala/ai/chronon/hub/handlers/*.scala Added multiple new handler classes for managing joins, models, and time series data.
hub/src/test/scala/ai/chronon/hub/handlers/*.scala Introduced unit tests for new handler classes, ensuring proper functionality and error handling.

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • chewy-zlai

🐰 In the meadow, changes bloom,
New modules rise, dispelling gloom.
With handlers fresh and tests anew,
Our code hops forward, bright and true!
Let's celebrate this vibrant spree,
For every line, a victory! 🌼


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b02d589 and 0c58f35.

📒 Files selected for processing (1)
  • docker-init/build.sh (0 hunks)
💤 Files with no reviewable changes (1)
  • docker-init/build.sh

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 35

🧹 Outside diff range and nitpick comments (39)
hub_vertx/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala (5)

312-324: Provide informative error for unsupported granularity in drift metrics

When a client requests a raw granularity with a drift metric, the current implementation returns a generic bad request. Providing a more informative error message can help clients understand the issue.

Update the error message to explicitly state that raw granularity is unsupported for drift metrics:

 if (granularity == Raw) {
-  ctx.BadRequest("We don't support Raw granularity for drift metric types")
+  ctx.BadRequest("Granularity 'raw' is not supported for drift metrics. Please use 'percentile' or 'aggregates'.")
 } else {
   // Existing logic...
 }

299-308: Parameterize error messages for flexibility

In the transformScalaFutureToVertxResponse method, the error message is hardcoded as "Error fetching join drift", which may not be appropriate for all uses of this method.

Modify the method to accept an additional errorMessage parameter:

-def transformScalaFutureToVertxResponse(ctx: RoutingContext, scalaJsonResultFuture: ScalaFuture[String]): Unit = {
+def transformScalaFutureToVertxResponse(ctx: RoutingContext, scalaJsonResultFuture: ScalaFuture[String], errorMessage: String): Unit = {
   val vertxResultFuture =
     Future.fromCompletionStage(FutureConverters.toJava(scalaJsonResultFuture).toCompletableFuture)

   vertxResultFuture.onSuccess { tsData =>
     ctx.Ok(tsData)
   }
   vertxResultFuture.onFailure { err =>
-    ctx.InternalServerError(s"Error fetching join drift: ${err.getMessage}")
+    ctx.InternalServerError(s"$errorMessage: ${err.getMessage}")
   }
 }

When invoking the method, provide a contextual error message:

transformScalaFutureToVertxResponse(ctx, scalaJsonResultFuture, "Error fetching feature drift")

328-335: Extend support for additional drift algorithms

The parseAlgorithm method currently supports only "psi", "hellinger", and "jsd". If new algorithms are added in the future, the method will require modification.

Consider refactoring to make it easier to add new algorithms, perhaps by using a map or an enumeration that can be easily extended:

val algorithmMap = Map(
  "psi"       -> DriftMetric.PSI,
  "hellinger" -> DriftMetric.HELLINGER,
  "jsd"       -> DriftMetric.JENSEN_SHANNON
)

def parseAlgorithm(algorithm: Option[String]): Option[DriftMetric] = {
  algorithm.flatMap(a => algorithmMap.get(a.toLowerCase))
}

39-40: Remove unused imports to clean up code

The import statements import scala.util.ScalaJavaConversions.ListOps and import scala.util.ScalaJavaConversions.MapOps appear to be unused.

Removing unused imports can improve code readability.


205-238: Optimize numeric check in convertTileDriftSeriesInfoToTimeSeries

The method uses a check to determine if a feature is numeric by inspecting if the percentileDriftSeries contains non-null values. This approach might not be reliable if the data contains null values.

Consider setting the isNumeric flag based on metadata about the feature type rather than inspecting the data at runtime.

hub_vertx/src/test/scala/ai/chronon/hub/handlers/TimeSeriesHandlerTest.scala (3)

175-211: Add assertions for response content in valid result tests

In test_joinTsLookup_SendValidResultsOnCorrectlyFormedRequest, the test checks that the response is successful but does not assert the correctness of the response content in detail.

Enhance the test by adding more assertions:

  • Verify that the JSON response contains the expected data.
  • Check that the time series points have correct values and timestamps.
  • Assert that the grouping and feature items match the input parameters.

84-106: Reduce code duplication in test setup

The test methods have repetitive setup code for initializing mock behaviors and building query parameters.

Consider refactoring the setup code:

  • Create helper methods for setting up common mock behaviors.
  • Use test fixtures or a @Before method to initialize common configurations.
  • Streamline the creation of query parameters and path parameters.

463-547: Add type annotations for better code clarity

In helper methods like generateDriftSeries and generateSummarySeries, the return types can be explicitly stated to improve readability and maintainability.

For example:

private def generateDriftSeries(...): Try[Future[Seq[TileDriftSeries]]] = { ... }

While the return type is currently specified, ensuring all methods have explicit return types is a good practice.

service/src/main/java/ai/chronon/service/handlers/FeaturesHandler.java (3)

39-44: Consider restricting the visibility of the EntityType enum

The EntityType enum is declared as public, but if it's only used within the FeaturesHandler class, consider reducing its visibility to private or package-private to encapsulate it and adhere to the principle of least privilege.


47-55: Rename EntityKeyToValues for better clarity

The nested static class EntityKeyToValues could have a more descriptive name, such as FeatureResult or EntityFeatureResult, to better convey its purpose and improve code readability.


72-81: Enhance error response with detailed information

When parsing the request body fails, consider providing more specific error messages to the client. Including details about the expected format or the nature of the parsing error can aid in debugging and improve user experience.

build.sbt (2)

316-338: Remove Duplicate Assembly Jar Name Declarations in service Project

The assembly / assemblyJarName setting is declared twice in the service project's settings (lines 316 and 338). This redundancy can lead to confusion and unnecessary complexity in your build configuration. Please remove the duplicate declaration to streamline the settings.

Apply this diff to fix the issue:

- assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

# Remove one of the duplicate declarations.

360-392: Remove Duplicate Assembly Jar Name Declarations in hub_vertx Project

Similarly, in the hub_vertx project, the assembly / assemblyJarName setting is declared twice. Removing the duplicate will simplify the build configuration.

Apply this diff to fix the issue:

- assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

# Remove one of the duplicate declarations.
hub_vertx/src/main/scala/ai/chronon/hub/handlers/ModelsHandler.scala (2)

10-14: Enhance API documentation

While the endpoint's purpose is documented, it would be helpful to include:

  1. Response format and status codes
  2. Default values for pagination parameters
  3. Maximum limit constraints

20-21: Document pagination constants

The defaultOffset, defaultLimit, and maxLimit values from the Paginate trait are used but their values aren't immediately clear. Consider adding these details in the class documentation.

.github/workflows/test_scala_no_spark.yaml (1)

75-78: Optimize SBT configuration

The SBT_OPTS environment variable is already set at the workflow level. The explicit export here is redundant and could be removed for consistency with other test steps.

      - name: Run hub_vertx tests
        run: |
-         export SBT_OPTS="-Xmx8G -Xms2G"
          sbt "++ 2.12.18 hub_vertx/test"
hub_vertx/src/test/resources/group_bys/transaction_events.txn_group_by_user (1)

2-15: Consider adding schema validation constraints

The metadata section could benefit from schema validation constraints to ensure data quality and prevent configuration errors.

Consider adding:

  1. Allowed values for online field
  2. JSON schema validation for customJson
  3. Pattern validation for name and outputNamespace
hub_vertx/src/test/resources/group_bys/transaction_events.txn_group_by_merchant (2)

38-39: Document the meaning of operation codes

The operation codes (6 and 7) lack documentation about their purpose. Consider adding comments or documentation to explain what these operations represent.

Also applies to: 60-61


42-44: Replace magic numbers in timeUnit with meaningful constants

The timeUnit values (0 and 1) are magic numbers. Consider using named constants or an enum to clearly indicate what these values represent (e.g., DAYS, WEEKS).

Also applies to: 46-48, 49-52, 53-56, 64-67

docker-init/start.sh (1)

47-47: Consider parameterizing hardcoded values

The command contains several hardcoded values:

  • JAR version (0.1.0-SNAPSHOT)
  • Port number (9000)
  • Config file path (config.json)

Consider using environment variables for these values to make the script more configurable and maintainable.

-exec java -jar hub_vertx-0.1.0-SNAPSHOT.jar run ai.chronon.hub.HubVerticle -Dserver.port=9000 -Dai.chronon.metrics.host=$STATSD_HOST -conf config.json
+exec java -jar ${HUB_VERTX_JAR:-hub_vertx-0.1.0-SNAPSHOT.jar} run ai.chronon.hub.HubVerticle -Dserver.port=${SERVER_PORT:-9000} -Dai.chronon.metrics.host=$STATSD_HOST -conf ${CONFIG_PATH:-config.json}
hub_vertx/src/main/scala/ai/chronon/hub/handlers/JoinsHandler.scala (2)

31-34: Consider adding request tracing and metrics

For better observability during the migration period, consider adding request tracing and metrics collection.

Add timing metrics around the store operation:

+      val startTime = System.currentTimeMillis()
       val joins = monitoringStore.getJoins
       val paginatedResults = paginateResults(joins, offset, limit)
+      logger.debug(s"Join list retrieval took ${System.currentTimeMillis() - startTime}ms")
       val json = ListJoinResponse(offset, paginatedResults).asJson.noSpaces

47-48: Fix typo in error message

There's a typo in the error message for NotFound response.

-      case None       => ctx.NotFound(s"Unable to retrive $entityName")
+      case None       => ctx.NotFound(s"Unable to retrieve $entityName")
hub_vertx/src/main/resources/logback.xml (1)

46-49: Add package-specific logging levels

Consider adding package-specific logging levels for better granularity during the migration period.

Add before the root logger:

+    <!-- Package-specific logging levels -->
+    <logger name="ai.chronon.hub" level="${HUB_LOG_LEVEL:-${LOG_LEVEL}}"/>
+    <logger name="io.vertx" level="${VERTX_LOG_LEVEL:-INFO}"/>
     <root level="${LOG_LEVEL}">
service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerJsonSerDeTest.java (2)

1-57: Good test coverage, consider adding more edge cases.

The test suite provides good coverage of basic scenarios and error cases. Consider adding tests for:

  • Empty request body
  • Null values in JSON
  • Different data types (float, boolean, etc.)
  • Special characters in strings
  • Array fields

28-30: Consider using more specific assertions for type checking.

Instead of using generic type checking with getClass().equals(), consider using more specific assertions:

-assertTrue(req.keys.containsKey("user") && req.keys.get("user").getClass().equals(String.class));
-assertTrue(req.keys.containsKey("zip") && req.keys.get("zip").getClass().equals(Integer.class));
+assertTrue(req.keys.containsKey("user"));
+assertTrue(req.keys.get("user") instanceof String);
+assertTrue(req.keys.containsKey("zip"));
+assertTrue(req.keys.get("zip") instanceof Integer);
spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala (2)

Line range hint 32-37: Improve exception handling for dataset creation

The current implementation swallows all exceptions. Consider handling specific exceptions:

-    try {
-      api.genKvStore.create(datasetName)
-    } catch {
-      // swallows all exceptions right now
-      // TODO: swallow only already existing exception - move this into the kvstore.createIfNotExists
-      case e: Exception => e.printStackTrace()
-    }
+    try {
+      api.genKvStore.create(datasetName)
+    } catch {
+      case e: KVStore.DatasetAlreadyExistsException => 
+        // Expected case, dataset already exists
+      case e: Exception =>
+        throw new RuntimeException(s"Failed to create dataset $datasetName", e)
+    }

Line range hint 25-25: Implement rate limiting as indicated by TODO

The TODO comment indicates rate limiting should be implemented. This is important for production stability.

Would you like me to help implement the rate limiting logic or create a GitHub issue to track this task?

hub_vertx/src/test/scala/ai/chronon/hub/store/MonitoringModelStoreTest.scala (1)

70-75: Consider using test fixtures instead of hardcoded paths

Move test data to fixtures for better maintainability:

+  private val TestPaths = Seq(
+    "joins/user_transactions.txn_join",
+    "group_bys/transaction_events.txn_group_by_merchant",
+    "group_bys/transaction_events.txn_group_by_user",
+    "models/transaction_model.v1"
+  )

   private def generateListResponse(): Future[ListResponse] = {
-    val paths = Seq(
-      "joins/user_transactions.txn_join",
-      "group_bys/transaction_events.txn_group_by_merchant",
-      "group_bys/transaction_events.txn_group_by_user",
-      "models/transaction_model.v1"
-    )
+    val paths = TestPaths
hub_vertx/src/test/scala/ai/chronon/hub/handlers/ModelHandlerTest.scala (3)

63-63: Remove duplicate comments in test file.

The comment "// Trigger call" appears twice on multiple lines.

-    // Trigger call// Trigger call
+    // Trigger call

Also applies to: 79-79, 97-97, 129-129


55-69: Enhance error case tests with response body verification.

The error tests verify the status code but not the error message content.

Consider capturing and verifying the error response body:

     handler.handle(routingContext)
     vertx.setTimer(1000, _ => {
       verify(response).setStatusCode(400)
+      val responseCaptor = ArgumentCaptor.forClass(classOf[String])
+      verify(response).end(responseCaptor.capture)
+      val errorResponse = decode[ErrorResponse](responseCaptor.getValue)
+      assertEquals("Invalid offset/limit parameter", errorResponse.right.value.message)
       async.complete()
     })

Also applies to: 71-85


147-155: Improve mock data generation flexibility.

The mock data generation uses hardcoded values which could make tests brittle.

Consider parameterizing the mock data generation:

 object MockDataService {
-  def generateMockModel(id: String): Model = {
+  def generateMockModel(
+    id: String,
+    groupBys: Seq[GroupBy] = Seq(GroupBy("my_groupBy", Seq("g1", "g2"))),
+    team: String = "my team",
+    modelType: String = "XGBoost"
+  ): Model = {
     val groupBys = Seq(GroupBy("my_groupBy", Seq("g1", "g2")))
     val join = Join("my_join", Seq("ext_f1", "ext_f2", "d_1", "d2"), groupBys, true, true, Some("my_team"))
-    Model(id, join, online = true, production = true, "my team", "XGBoost")
+    Model(id, join, online = true, production = true, team, modelType)
   }
hub_vertx/src/test/scala/ai/chronon/hub/handlers/JoinHandlerTest.scala (4)

43-54: Add cleanup after tests

Consider adding a cleanup method to release resources after tests complete.

+  @After
+  def tearDown(): Unit = {
+    vertx.close()
+  }

64-65: Remove duplicate comment

The comment "// Trigger call" appears twice.

-    // Trigger call// Trigger call
+    // Trigger call

56-70: Extract magic numbers into constants

Consider extracting the timer delay into a constant for better maintainability.

+  private val TEST_TIMEOUT_MS = 1000
+
   @Test
   def testSend400BadOffset(context: TestContext) : Unit = {
     // ... existing code ...
-    vertx.setTimer(1000, _ => {
+    vertx.setTimer(TEST_TIMEOUT_MS, _ => {

106-131: Improve test data setup and assertions

Consider the following improvements:

  1. Extract test data setup into helper methods
  2. Add more specific assertions for error cases
  3. Consider testing edge cases (empty results, null values)
+  private def createTestData(): List[Join] = {
+    // Move test data creation here
+  }
+
+  private def assertResponseContent(response: String, expected: List[Join]): Unit = {
+    // Move assertions here
+  }
service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerTest.java (2)

51-65: Enhance test setup with cleanup and constants

Consider the following improvements:

  1. Add cleanup method
  2. Extract more magic values into constants
+    private static final long TEST_TIMEOUT_MS = 1000;
+    private static final String CONTENT_TYPE = "content-type";
+    private static final String APPLICATION_JSON = "application/json";
+
+    @After
+    public void tearDown() {
+        vertx.close();
+    }

284-285: Fix double semicolon

Remove the extra semicolon at the end of the line.

-            async.complete();;
+            async.complete();
hub_vertx/src/main/scala/ai/chronon/hub/model/Model.scala (1)

60-60: Ensure consistent comment styles throughout the code

At line 60, the comment uses /* ... */ style, whereas previous comments use ScalaDoc-style /** ... */. For consistency and to facilitate documentation generation, consider using ScalaDoc-style comments for all documentation comments.

hub_vertx/src/main/java/ai/chronon/hub/HubVerticle.java (1)

114-125: Externalize CORS configuration for flexibility across environments

Currently, the allowed origins in the CORS configuration are hardcoded to http://localhost:5173 and http://localhost:3000. For better flexibility and to ease deployment across different environments, consider externalizing the CORS settings to a configuration file or environment variables.

This change allows you to modify the CORS settings without altering the codebase, facilitating easier transitions between development, staging, and production environments.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 5bfa967 and 2f50469.

📒 Files selected for processing (35)
  • .github/workflows/test_scala_no_spark.yaml (1 hunks)
  • build.sbt (4 hunks)
  • docker-init/Dockerfile (1 hunks)
  • docker-init/build.sh (3 hunks)
  • docker-init/compose.yaml (2 hunks)
  • docker-init/hub/config.json (1 hunks)
  • docker-init/start.sh (1 hunks)
  • hub_vertx/src/main/java/ai/chronon/hub/HubVerticle.java (1 hunks)
  • hub_vertx/src/main/resources/example_config.json (1 hunks)
  • hub_vertx/src/main/resources/logback.xml (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/handlers/InMemKVStoreHandler.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/handlers/JoinsHandler.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/handlers/ModelsHandler.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/handlers/Paginate.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/handlers/SearchHandler.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/handlers/VertxExtensions.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/model/Model.scala (1 hunks)
  • hub_vertx/src/main/scala/ai/chronon/hub/store/MonitoringModelStore.scala (1 hunks)
  • hub_vertx/src/test/resources/group_bys/transaction_events.txn_group_by_merchant (1 hunks)
  • hub_vertx/src/test/resources/group_bys/transaction_events.txn_group_by_user (1 hunks)
  • hub_vertx/src/test/resources/joins/user_transactions.txn_join (1 hunks)
  • hub_vertx/src/test/resources/models/transaction_model.v1 (1 hunks)
  • hub_vertx/src/test/scala/ai/chronon/hub/handlers/JoinHandlerTest.scala (1 hunks)
  • hub_vertx/src/test/scala/ai/chronon/hub/handlers/ModelHandlerTest.scala (1 hunks)
  • hub_vertx/src/test/scala/ai/chronon/hub/handlers/SearchHandlerTest.scala (1 hunks)
  • hub_vertx/src/test/scala/ai/chronon/hub/handlers/TimeSeriesHandlerTest.scala (1 hunks)
  • hub_vertx/src/test/scala/ai/chronon/hub/store/MonitoringModelStoreTest.scala (1 hunks)
  • service/src/main/java/ai/chronon/service/WebServiceVerticle.java (1 hunks)
  • service/src/main/java/ai/chronon/service/handlers/FeaturesHandler.java (1 hunks)
  • service/src/main/java/ai/chronon/service/handlers/FeaturesRouter.java (1 hunks)
  • service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerJsonSerDeTest.java (1 hunks)
  • service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerTest.java (1 hunks)
  • service_commons/src/main/java/ai/chronon/service/ConfigStore.java (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala (3 hunks)
✅ Files skipped from review due to trivial changes (4)
  • service_commons/src/main/java/ai/chronon/service/ConfigStore.java
  • docker-init/hub/config.json
  • hub_vertx/src/main/resources/example_config.json
  • docker-init/build.sh
🧰 Additional context used
📓 Learnings (4)
hub_vertx/src/test/resources/group_bys/transaction_events.txn_group_by_merchant (1)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#44
File: hub/test/resources/group_bys/transaction_events.txn_group_by_merchant:2-15
Timestamp: 2024-11-12T09:38:33.532Z
Learning: Files like 'transaction_events.txn_group_by_merchant' are generated JSON files, and modifications to their structure may not be feasible.
spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala (3)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#50
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:37-40
Timestamp: 2024-11-12T09:38:33.532Z
Learning: Avoid using `Await.result` in production code; prefer handling `Future`s asynchronously when possible to prevent blocking.
Learnt from: chewy-zlai
PR: zipline-ai/chronon#62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-12T09:38:33.532Z
Learning: In Spark applications, when defining serializable classes, passing an implicit `ExecutionContext` parameter can cause serialization issues. In such cases, it's acceptable to use `scala.concurrent.ExecutionContext.Implicits.global`.
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#50
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:19-47
Timestamp: 2024-11-12T09:38:33.532Z
Learning: In Scala, the `grouped` method on collections returns an iterator, allowing for efficient batch processing without accumulating all records in memory.
hub_vertx/src/test/resources/joins/user_transactions.txn_join (1)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#30
File: api/py/test/sample/production/joins/risk/user_transactions.txn_join:217-218
Timestamp: 2024-11-12T09:38:33.532Z
Learning: The JSON files in this project are automatically generated and should not be manually modified or refactored.
hub_vertx/src/test/scala/ai/chronon/hub/store/MonitoringModelStoreTest.scala (1)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-11-12T09:38:33.532Z
Learning: In `hub/test/store/DynamoDBMonitoringStoreTest.scala`, the current implementation of the `generateListResponse` method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
🔇 Additional comments (9)
service/src/main/java/ai/chronon/service/handlers/FeaturesHandler.java (1)

58-59: Ensure thread safety of the static ObjectMapper instance

While ObjectMapper instances from Jackson are thread-safe after version 2.9, ensure that any custom configurations or modules added do not compromise thread safety. If there are concerns, consider using a thread-local ObjectMapper or synchronizing access.

hub_vertx/src/main/scala/ai/chronon/hub/store/MonitoringModelStore.scala (1)

48-51: Ensure thread-safe access to the configRegistryCache

The cache is initialized lazily, and concurrent access might lead to race conditions. Consider initializing the cache eagerly or implementing synchronization to ensure thread safety during initialization.

hub_vertx/src/test/resources/joins/user_transactions.txn_join (1)

1-248: Avoid manual modifications to auto-generated JSON files

According to project learnings, JSON files like user_transactions.txn_join are auto-generated and should not be manually edited. Modifying them can lead to inconsistencies and potential errors.

Please confirm if this file should be auto-generated. If so, revert manual changes and regenerate the file using the appropriate tools.

hub_vertx/src/test/resources/models/transaction_model.v1 (1)

1-268: Avoid manual modifications to auto-generated JSON files

As with other configuration files, transaction_model.v1 appears to be auto-generated. Manual edits may introduce inconsistencies. Please use the designated generation tools for modifications.

Would you like me to verify if this file is auto-generated and guide on proper procedures for updates?

service/src/main/java/ai/chronon/service/handlers/FeaturesRouter.java (1)

14-23: LGTM

The routing configuration correctly sets up the endpoints for handling feature requests. The use of BodyHandler and proper handler initialization is appropriate.

hub_vertx/src/main/resources/logback.xml (1)

31-36: Verify async appender configuration for production load

The current queue size might need adjustment based on production load.

Let's analyze the logging patterns in the existing Play implementation:

✅ Verification successful

Based on the analysis of both implementations and the logging patterns, I can now provide a final response:

AsyncAppender configuration is appropriate for the current load

The current queue size of 512 is well-suited for the Vertx implementation's logging volume. The codebase shows minimal logging, primarily for server lifecycle events (start/stop) and error conditions. The logging pattern indicates:

  • Low frequency logging (only server lifecycle events)
  • No high-volume transaction or request-level logging
  • Simple error logging with stack traces
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check logging patterns in the Play implementation
echo "Analyzing Play logs for queue size requirements..."
rg -A 5 "AsyncAppender|queueSize" hub/conf/

Length of output: 1026


Script:

#!/bin/bash
# Let's check the logging volume by analyzing log statements in both implementations
echo "Analyzing logging patterns in Play implementation..."
rg "logger|LOG\." hub/app/ -tc -C 2

echo -e "\nAnalyzing logging patterns in Vertx implementation..."
rg "logger|LOG\." hub_vertx/src/main/java/ -tc -C 2

Length of output: 329


Script:

#!/bin/bash
# Let's try a different approach to analyze logging patterns
echo "Analyzing logging patterns in Play implementation..."
rg -t java "Logger|LOG\." hub/app/

echo -e "\nAnalyzing logging patterns in Vertx implementation..."
rg -t java "Logger|LOG\." hub_vertx/src/main/java/

# Also check for any performance-related documentation
echo -e "\nChecking documentation for performance considerations..."
fd README -t f -X cat {}

Length of output: 44980


Script:

#!/bin/bash
# Let's analyze the logging configuration in both implementations to understand the logging volume and patterns
echo "Checking logging configuration and patterns in Vertx implementation..."
rg -A 10 "logger|LOG\." hub_vertx/src/main/java/ -t java

echo -e "\nChecking logging usage in the code..."
ast-grep --pattern 'logger.$_($$$)' hub_vertx/src/main/java/

Length of output: 4911

docker-init/compose.yaml (1)

56-56: LGTM: Proper service dependency and configuration.

The app service configuration correctly:

  • Depends on the statsd service
  • Configures the STATSD_HOST environment variable

Also applies to: 67-67

hub_vertx/src/main/scala/ai/chronon/hub/model/Model.scala (1)

1-66: Well-structured data models with clear definitions

The case classes and sealed traits are well-defined, making good use of Scala's type system to model domain entities. This enhances type safety and allows for robust pattern matching.

service/src/main/java/ai/chronon/service/WebServiceVerticle.java (1)

24-84: Well-structured HTTP server with proper route configurations

The WebServiceVerticle class effectively initializes the HTTP server, sets up the necessary routes, and handles server lifecycle events gracefully. The use of Vert.x features aligns with best practices for reactive web applications.

Comment on lines 88 to 115
(parseOffset(offset), parseAlgorithm(algorithm)) match {
case (None, _) => ctx.BadRequest(s"Unable to parse offset - $offset")
case (_, None) => ctx.BadRequest("Invalid drift algorithm. Expect JSD, PSI or Hellinger")
case (Some(o), Some(driftMetric)) =>
val window = new Window(o.toMinutes.toInt, TimeUnit.MINUTES)
val joinPath = name.replaceFirst("\\.", "/") // we need to look up in the drift store with this transformed name
val maybeDriftSeries = driftStore.getDriftSeries(joinPath, driftMetric, window, startTs, endTs)
maybeDriftSeries match {
case Failure(exception) =>
ctx.InternalServerError(s"Error computing join drift - ${exception.getMessage}")
case Success(driftSeriesFuture) =>
val scalaJsonResultFuture = driftSeriesFuture.map { driftSeries =>
// pull up a list of drift series objects for all the features in a group
val grpToDriftSeriesList: Map[String, Seq[TileDriftSeries]] = driftSeries.groupBy(_.key.groupName)
val groupByTimeSeries = grpToDriftSeriesList.map {
case (name, featureDriftSeriesInfoSeq) =>
GroupByTimeSeries(
name,
featureDriftSeriesInfoSeq.map(series => convertTileDriftSeriesInfoToTimeSeries(series, metric)))
}.toSeq

val tsData = JoinTimeSeriesResponse(name, groupByTimeSeries)
tsData.asJson.noSpaces
}(driftStore.executionContext)

transformScalaFutureToVertxResponse(ctx, scalaJsonResultFuture)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for exceptions during drift series retrieval

In the doFetchJoinDrift method, when calling driftStore.getDriftSeries, if an exception occurs (e.g., due to network issues or data store errors), it could lead to unhandled exceptions or incomplete error messages being sent to the client.

Consider wrapping the call to driftStore.getDriftSeries in a Try block to catch any exceptions and provide a meaningful error response to the client:

val maybeDriftSeries = Try {
  driftStore.getDriftSeries(joinPath, driftMetric, window, startTs, endTs)
}

maybeDriftSeries match {
  case Failure(exception) =>
    ctx.InternalServerError(s"Error retrieving drift series: ${exception.getMessage}")
  case Success(driftSeriesAttempt) =>
    driftSeriesAttempt match {
      case Failure(exception) =>
        ctx.InternalServerError(s"Error computing join drift: ${exception.getMessage}")
      case Success(driftSeriesFuture) =>
        val scalaJsonResultFuture = driftSeriesFuture.map { driftSeries =>
          // Existing logic...
        }(driftStore.executionContext)
        transformScalaFutureToVertxResponse(ctx, scalaJsonResultFuture, "Error fetching join drift")
    }
}

Comment on lines 224 to 246
ScalaFuture.sequence(Seq(currentSummarySeriesFuture, baselineSummarySeriesFuture)).map { merged =>
val currentSummarySeries = merged.head
val baselineSummarySeries = merged.last

val isCurrentNumeric = currentSummarySeries.headOption.forall(checkIfNumeric)
val isBaselineNumeric = baselineSummarySeries.headOption.forall(checkIfNumeric)

val currentFeatureTs = {
if (currentSummarySeries.isEmpty) Seq.empty
else convertTileSummarySeriesToTimeSeries(currentSummarySeries.head, isCurrentNumeric, metric)
}
val baselineFeatureTs = {
if (baselineSummarySeries.isEmpty) Seq.empty
else convertTileSummarySeriesToTimeSeries(baselineSummarySeries.head, isBaselineNumeric, metric)
}
val comparedTsData =
ComparedFeatureTimeSeries(name, isCurrentNumeric, baselineFeatureTs, currentFeatureTs)
comparedTsData.asJson.noSpaces
}
}

transformScalaFutureToVertxResponse(ctx, scalaJsonFuture)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle failed futures when fetching summary series

In the doFetchFeaturePercentileDrift method, you are using ScalaFuture.sequence to combine two futures. If either currentSummarySeriesFuture or baselineSummarySeriesFuture fails, the entire future will fail, and the exception isn't being handled, potentially resulting in an unhandled exception.

Add error handling to manage failed futures and ensure an appropriate error response is sent to the client:

val scalaJsonFuture = {
  ScalaFuture.sequence(Seq(currentSummarySeriesFuture, baselineSummarySeriesFuture)).map { merged =>
    // Existing logic...
  }(driftStore.executionContext).recover {
    case exception =>
      ctx.InternalServerError(s"Error fetching summary series: ${exception.getMessage}")
      ""
  }
}

Comment on lines 314 to 316
def getMandatoryQueryParam(ctx: RoutingContext, name: String): String = {
Option(ctx.queryParams.get(name)).getOrElse(throw new IllegalArgumentException(s"Missing $name"))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid throwing exceptions for missing query parameters

The getMandatoryQueryParam method throws an IllegalArgumentException if a required query parameter is missing. Throwing exceptions in this context may not be the best practice and could lead to unhandled exceptions.

Modify the method to return an Option[String] and handle missing parameters gracefully in the handlers:

-def getMandatoryQueryParam(ctx: RoutingContext, name: String): String = {
-  Option(ctx.queryParams.get(name)).getOrElse(throw new IllegalArgumentException(s"Missing $name"))
+def getMandatoryQueryParam(ctx: RoutingContext, name: String): Option[String] = {
+  Option(ctx.queryParams.get(name))
 }

Then, in the handler:

val maybeStartTs = getMandatoryQueryParam(ctx, "startTs").map(_.toLong)
val maybeEndTs = getMandatoryQueryParam(ctx, "endTs").map(_.toLong)

(maybeStartTs, maybeEndTs) match {
  case (Some(startTs), Some(endTs)) =>
    // Proceed with processing
  case _ =>
    ctx.BadRequest("Missing required query parameters: 'startTs' and/or 'endTs'.")
}

Comment on lines 73 to 77
(metricChoice, metricRollup) match {
case (None, _) => ctx.BadRequest("Invalid metric choice. Expect drift")
case (_, None) => ctx.BadRequest("Invalid metric rollup. Expect null / value")
case (Some(Drift), Some(rollup)) => doFetchJoinDrift(ctx, name, startTs, endTs, rollup, offset, algorithm)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle unsupported metric types gracefully

In the joinDriftHandler, the pattern matching on (metricChoice, metricRollup) does not account for unsupported metric types beyond drift. If a client provides an unsupported metricType, the request will not be handled properly, potentially leaving the client without a meaningful response.

Apply this diff to ensure that unsupported metric types return a proper error response:

 (metricChoice, metricRollup) match {
   case (None, _)                   => ctx.BadRequest("Invalid metric choice. Expected 'drift'.")
   case (_, None)                   => ctx.BadRequest("Invalid metric rollup. Expected 'null' or 'value'.")
+  case _                           => ctx.BadRequest("Unsupported metric type.")
   case (Some(Drift), Some(rollup)) => doFetchJoinDrift(ctx, name, startTs, endTs, rollup, offset, algorithm)
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 86 to 89
vertx.setTimer(1000, _ => {
verify(response).setStatusCode(400)
async.complete()
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace fixed timers with proper asynchronous handling

In the tests, fixed delays are used with vertx.setTimer(1000, _) => { ... } to wait for the handler to process the request. This approach can lead to flaky tests and unnecessarily extends test execution time.

Use Async objects and callbacks to properly handle asynchronous operations:

@Test
def test_joinTsLookup_Send400BadMetricChoice(context: TestContext): Unit = {
  val async = context.async()
  // Setup code...
  
  // Trigger the handler
  joinDriftHandler.handle(routingContext)
  
  vertx.runOnContext(_ => {
    verify(response).setStatusCode(400)
    async.complete()
  })
}

Additional methods like response.endHandler can also be utilized to detect when the response is completed.

Comment on lines 168 to 171
val multiMap = MultiMap.caseInsensitiveMultiMap
multiMap.add("offset", "10")
multiMap.add("limit", "-1")
when(routingContext.pathParam("name")).thenReturn("10")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix inconsistent test data

The test uses invalid values (offset: 10, limit: -1) which should trigger a 400 response, but the test expects a 200 response. Consider using valid values for this positive test case.

-    multiMap.add("offset", "10")
-    multiMap.add("limit", "-1")
+    // Remove offset and limit as they're not needed for this test

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 288 to 320
private void validateFailureResponse(String jsonResponse, TestContext context) {
JsonObject actualResponse = new JsonObject(jsonResponse);
context.assertTrue(actualResponse.containsKey("errors"));

String failureString = actualResponse.getJsonArray("errors").getString(0);
context.assertNotNull(failureString);
}

private void validateSuccessfulResponse(JsonObject actualResponse, List<GetFeaturesResponse.Result> expectedResults, TestContext context) {
context.assertTrue(actualResponse.containsKey("results"));
context.assertEquals(actualResponse.getJsonArray("results").size(), expectedResults.size());

JsonArray results = actualResponse.getJsonArray("results");
for (int i = 0; i < expectedResults.size(); i++) {
Map<String, Object> resultMap = results.getJsonObject(i).getMap();
context.assertTrue(resultMap.containsKey("status"));
context.assertEquals(resultMap.get("status"), expectedResults.get(i).getStatus().name());

context.assertTrue(resultMap.containsKey("entityKeys"));
Map<String, Object> returnedKeys = (Map<String, Object>) resultMap.get("entityKeys");
context.assertEquals(expectedResults.get(i).getEntityKeys(), returnedKeys);

if (expectedResults.get(i).getStatus().equals(Success)) {
context.assertTrue(resultMap.containsKey("features"));
Map<String, Object> returnedFeatureMap = (Map<String, Object>) resultMap.get("features");
context.assertEquals(expectedResults.get(i).getFeatures(), returnedFeatureMap);
} else {
context.assertTrue(resultMap.containsKey("error"));
String returnedErrorMsg = (String) resultMap.get("error");
context.assertEquals(expectedResults.get(i).getError(), returnedErrorMsg);
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve validation methods safety and clarity

The validation methods have several areas for improvement:

  1. Use type-safe casting with instanceof checks
  2. Add more descriptive assertion messages
  3. Consider extracting common validation logic
     private void validateSuccessfulResponse(JsonObject actualResponse, List<GetFeaturesResponse.Result> expectedResults, TestContext context) {
-        context.assertTrue(actualResponse.containsKey("results"));
+        context.assertTrue(actualResponse.containsKey("results"), "Response should contain 'results' field");
         // ... rest of the method ...
+        if (!(resultMap.get("entityKeys") instanceof Map)) {
+            context.fail("entityKeys should be a Map");
+        }
         Map<String, Object> returnedKeys = (Map<String, Object>) resultMap.get("entityKeys");

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 41 to 53
ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Success")
}
vertxResultFuture.onFailure { err =>
ctx.response
.setStatusCode(500)
.putHeader("content-type", "application/json")
.end(s"Write failed - ${err.getMessage}")
}
case Left(error) =>
ctx.response
.setStatusCode(400)
.putHeader("content-type", "application/json")
.end(s"Unable to parse - ${error.getMessage}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Return valid JSON responses when setting application/json content-type

Lines 41, 47, and 53 set the Content-Type header to application/json but return plain text responses, which is not valid JSON. This can lead to parsing errors on the client side expecting JSON-formatted responses.

Apply the following changes to return valid JSON responses:

- ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Success")
+ ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("{\"status\":\"Success\"}")

- ctx.response
-   .setStatusCode(500)
-   .putHeader("content-type", "application/json")
-   .end(s"Write failed - ${err.getMessage}")
+ ctx.response
+   .setStatusCode(500)
+   .putHeader("content-type", "application/json")
+   .end(s"""{"error":"Write failed - ${err.getMessage}"}""")

- ctx.response
-   .setStatusCode(400)
-   .putHeader("content-type", "application/json")
-   .end(s"Unable to parse - ${error.getMessage}")
+ ctx.response
+   .setStatusCode(400)
+   .putHeader("content-type", "application/json")
+   .end(s"""{"error":"Unable to parse - ${error.getMessage}"}""")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Success")
}
vertxResultFuture.onFailure { err =>
ctx.response
.setStatusCode(500)
.putHeader("content-type", "application/json")
.end(s"Write failed - ${err.getMessage}")
}
case Left(error) =>
ctx.response
.setStatusCode(400)
.putHeader("content-type", "application/json")
.end(s"Unable to parse - ${error.getMessage}")
ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("{\"status\":\"Success\"}")
}
vertxResultFuture.onFailure { err =>
ctx.response
.setStatusCode(500)
.putHeader("content-type", "application/json")
.end(s"""{"error":"Write failed - ${err.getMessage}"}""")
}
case Left(error) =>
ctx.response
.setStatusCode(400)
.putHeader("content-type", "application/json")
.end(s"""{"error":"Unable to parse - ${error.getMessage}"}""")

.putHeader("content-type", "application/json")
.end(s"Unable to parse - ${error.getMessage}")
}
case None => ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Empty body")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Return a 400 Bad Request for empty request bodies

When the request body is empty (None), the handler returns a 200 OK status with the message "Empty body". Since a valid request body is expected, it's more appropriate to return a 400 Bad Request status to indicate that the client did not provide the necessary data.

Apply this change:

- case None => ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Empty body")
+ case None => ctx.response.setStatusCode(400).putHeader("content-type", "application/json").end("{\"error\":\"Empty request body\"}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case None => ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Empty body")
case None => ctx.response.setStatusCode(400).putHeader("content-type", "application/json").end("{\"error\":\"Empty request body\"}")

Comment on lines 33 to 35
.map { responses =>
if (responses.contains(false)) logger.warn("Some write failures encountered")
}(kvStore.executionContext)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inform the client when write operations fail

Currently, if any write operation fails during multiPut, the handler logs a warning but still returns a 200 OK response. It's important to inform the client about partial failures so appropriate actions can be taken.

Consider modifying the response to reflect write failures:

if (responses.contains(false)) {
  logger.warn("Some write failures encountered")
+ ctx.response
+   .setStatusCode(207)
+   .putHeader("content-type", "application/json")
+   .end("{\"status\":\"Partial Success\", \"message\":\"Some write failures encountered\"}")
} else {
  ctx.response
    .setStatusCode(200)
    .putHeader("content-type", "application/json")
    .end("{\"status\":\"Success\"}")
}

Alternatively, you can collect the indices or identifiers of the failed writes and include them in the response for more detailed client feedback.

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
build.sbt (1)

302-322: Reduce duplication in assembly configurations

The assembly configuration is duplicated between service and hub_vertx projects.

Consider extracting common assembly settings into a val:

lazy val commonAssemblySettings = Seq(
  assembly / assemblyMergeStrategy := {
    case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
    case PathList("META-INF", xs @ _*) => MergeStrategy.first
    // ... rest of the merge strategy
  },
  assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),
  // ... other common settings
)

// Then in projects:
.settings(commonAssemblySettings)

Also applies to: 356-376

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2f50469 and 6866a8b.

📒 Files selected for processing (20)
  • build.sbt (4 hunks)
  • docker-init/Dockerfile (1 hunks)
  • docker-init/build.sh (1 hunks)
  • hub/app/controllers/ApplicationController.scala (0 hunks)
  • hub/app/controllers/InMemKVStoreController.scala (0 hunks)
  • hub/app/controllers/JoinController.scala (0 hunks)
  • hub/app/controllers/ModelController.scala (0 hunks)
  • hub/app/controllers/SearchController.scala (0 hunks)
  • hub/app/controllers/TimeSeriesController.scala (0 hunks)
  • hub/app/module/DriftStoreModule.scala (0 hunks)
  • hub/app/module/InMemoryKVStoreModule.scala (0 hunks)
  • hub/app/module/ModelStoreModule.scala (0 hunks)
  • hub/conf/application.conf (0 hunks)
  • hub/conf/application.test.conf (0 hunks)
  • hub/conf/routes (0 hunks)
  • hub/test/controllers/JoinControllerSpec.scala (0 hunks)
  • hub/test/controllers/ModelControllerSpec.scala (0 hunks)
  • hub/test/controllers/SearchControllerSpec.scala (0 hunks)
  • hub/test/controllers/TimeSeriesControllerSpec.scala (0 hunks)
  • project/plugins.sbt (0 hunks)
💤 Files with no reviewable changes (17)
  • hub/app/controllers/ApplicationController.scala
  • hub/conf/application.test.conf
  • hub/app/module/InMemoryKVStoreModule.scala
  • hub/app/module/ModelStoreModule.scala
  • hub/conf/application.conf
  • hub/app/controllers/SearchController.scala
  • hub/app/module/DriftStoreModule.scala
  • hub/test/controllers/SearchControllerSpec.scala
  • hub/test/controllers/ModelControllerSpec.scala
  • hub/app/controllers/ModelController.scala
  • hub/app/controllers/InMemKVStoreController.scala
  • hub/app/controllers/JoinController.scala
  • hub/test/controllers/JoinControllerSpec.scala
  • hub/test/controllers/TimeSeriesControllerSpec.scala
  • hub/app/controllers/TimeSeriesController.scala
  • hub/conf/routes
  • project/plugins.sbt
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-init/Dockerfile
🔇 Additional comments (4)
build.sbt (4)

59-59: LGTM: Project structure changes align with migration objectives

The restructuring of projects (service_commons, hub_vertx) and their dependencies aligns well with the PR objectives of migrating from Play to Vert.x.

Also applies to: 261-262, 324-326


35-35: LGTM: Vert.x dependencies are properly configured

The Vert.x version is appropriately defined and all necessary modules (core, web, config, metrics) are included.

Also applies to: 104-110


345-346: Use stable version for json4s-core dependency


264-274: Verify dependency version compatibility

The project includes multiple dependencies that interact with each other (netty, vertx, micrometer). Please verify their compatibility.

Also applies to: 333-353

✅ Verification successful

Let me gather more information about the Vert.x version to ensure compatibility with the dependencies.


Dependencies versions are compatible

The dependency versions are compatible with each other:

  • Vert.x 4.5.10 is using Netty 4.1.111.Final internally, which matches the enforced version
  • Micrometer 1.13.6 is compatible with both Vert.x 4.5.x and the vertx-micrometer-metrics module
  • The explicit Netty version override is properly documented to resolve conflicts with Spark dependencies
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for potential version conflicts in dependencies

# Search for all netty version references
echo "Checking netty versions:"
rg "netty.*Version|netty.*Final" -A 2

# Search for all vertx dependencies to ensure consistent versioning
echo "Checking vertx dependencies:"
rg "io.vertx.*%.*vertx-[a-z-]+" -A 2

# Search for potential version conflicts in micrometer
echo "Checking micrometer dependencies:"
rg "micrometer.*Version|micrometer.*[0-9]+\.[0-9]+\.[0-9]+" -A 2

Length of output: 2034


Script:

#!/bin/bash
# Check Vert.x version and its compatibility requirements

# Find vertx version definition
echo "Checking Vert.x version:"
rg "vertxVersion.*=" -A 1

# Look for any version compatibility notes in README or documentation
echo "Checking documentation for version requirements:"
rg -i "compatibility|version requirement" README.md || true

Length of output: 308

@@ -63,7 +62,7 @@ fi
if [ "$BUILD_ALL" = true ]; then
echo "Building all modules..."
sbt assembly
sbt dist
# sbt dist
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove obsolete hub build configuration

The sbt dist command is commented out, but the script still accepts the --hub option. This could confuse users.

Apply these changes to clean up the hub-related code:

- echo "  --hub      Build Hub modules only"  # In print_usage function
- --hub)
-     BUILD_HUB=true
-     shift
-     ;;  # In case statement
- # sbt dist

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@ken-zlai ken-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pulled this into my latest and the endpoints seem to be working as they were before

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🧹 Outside diff range and nitpick comments (16)
hub/src/test/scala/ai/chronon/hub/handlers/TimeSeriesHandlerTest.scala (2)

65-66: Use MockitoAnnotations.initMocks(this) for compatibility

While MockitoAnnotations.openMocks(this) initializes mocks in recent versions of Mockito, using initMocks(this) ensures compatibility with older versions.

Consider updating the mock initialization for broader compatibility:

- MockitoAnnotations.openMocks(this)
+ MockitoAnnotations.initMocks(this)

529-544: Ensure unit tests cover the NullMetric case

The generateSummarySeries method handles the NullMetric case, but it's unclear if tests are verifying this scenario. To improve test coverage, add test cases that specifically test the behavior when metric == NullMetric.

Add a test method to cover the NullMetric scenario:

@Test
def test_featureTsLookup_HandleNullMetric(context: TestContext): Unit = {
  // Setup code with metric set to NullMetric
  // Invoke the handler
  // Verify the response
}
hub/src/main/scala/ai/chronon/hub/handlers/InMemKVStoreHandler.scala (1)

19-57: Use VertxExtensions for consistent response handling

Consider importing VertxExtensions and utilizing its methods (ctx.Ok, ctx.BadRequest, ctx.InternalServerError) to standardize HTTP responses across the application.

Apply this diff:

+ import ai.chronon.hub.handlers.VertxExtensions._

...

- ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end(...)
+ ctx.Ok(...)

- ctx.response.setStatusCode(500).putHeader("content-type", "application/json").end(...)
+ ctx.InternalServerError(...)

- ctx.response.setStatusCode(400).putHeader("content-type", "application/json").end(...)
+ ctx.BadRequest(...)

- ctx.response.setStatusCode(400).putHeader("content-type", "application/json").end(...)
+ ctx.BadRequest(...)
hub/src/main/java/ai/chronon/hub/HubVerticle.java (1)

115-116: Avoid hardcoding CORS origins in code

Currently, the CORS configuration includes hardcoded localhost origins. Consider externalizing the CORS origins to a configuration file or environment variable to allow flexibility and secure handling in different environments (development, staging, production).

This change enhances security and makes the application more adaptable to different deployment scenarios.

hub/src/main/scala/ai/chronon/hub/handlers/JoinsHandler.scala (2)

45-49: Optimize case-insensitive name lookup

The current implementation performs case-insensitive comparison on every join, which could be inefficient for large datasets.

Consider maintaining a case-insensitive index or using a more efficient lookup strategy:

-    val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(entityName))
+    val searchName = entityName.toLowerCase
+    val maybeJoin = monitoringStore.getJoins.find(j => j.name.toLowerCase == searchName)

47-47: Fix typo in error message

There's a typo in "retrive" in the error message.

-      case None       => ctx.NotFound(s"Unable to retrive $entityName")
+      case None       => ctx.NotFound(s"Unable to retrieve $entityName")
hub/src/test/scala/ai/chronon/hub/handlers/ModelHandlerTest.scala (3)

63-63: Fix duplicate comments.

The comment "// Trigger call" is duplicated on these lines.

-    // Trigger call// Trigger call
+    // Trigger call

Also applies to: 79-79, 97-97


65-65: Consider extracting the timer delay as a constant.

The hardcoded delay of 1000ms appears multiple times. Consider extracting it as a constant for better maintainability.

+  private val TestTimeout = 1000L  // milliseconds
+
   @Test
   def testSend400BadOffset(context: TestContext) : Unit = {
     // ...
-    vertx.setTimer(1000, _ => {
+    vertx.setTimer(TestTimeout, _ => {

Also applies to: 81-81, 99-99, 131-131


148-152: Consider enhancing mock data to be more realistic.

The current mock data uses very simple values. Consider making it more realistic by:

  • Using meaningful team names
  • Adding varied feature names
  • Including different model types
   def generateMockModel(id: String): Model = {
-    val groupBys = Seq(GroupBy("my_groupBy", Seq("g1", "g2")))
-    val join = Join("my_join", Seq("ext_f1", "ext_f2", "d_1", "d2"), groupBys, true, true, Some("my_team"))
-    Model(id, join, online = true, production = true, "my team", "XGBoost")
+    val groupBys = Seq(GroupBy("daily_stats", Seq("date", "user_id")))
+    val join = Join(
+      "user_activity",
+      Seq("clicks", "views", "conversion_rate", "session_duration"),
+      groupBys,
+      online = true,
+      production = true,
+      Some("data_science_team")
+    )
+    Model(id, join, online = true, production = true, "data_science_team", "RandomForest")
hub/src/test/scala/ai/chronon/hub/handlers/SearchHandlerTest.scala (2)

129-131: Improve test data documentation.

The current comments about test data could be clearer. Consider using a more structured format.

-    // we have names: 0, 1, 2, .. 99
-    // our result should give us: 1, 10, 11, 12, .. 19, 21, 31, .. 91
-    val expected = Set(12, 13, 14, 15, 16, 17)
+    // Test data structure:
+    // - Full dataset: IDs from 0 to 99
+    // - Search term "1" matches: 1, 10-19, 21, 31, ..., 91
+    // - With offset=3 and limit=6, expect: 12-17
+    val expected = Set(12, 13, 14, 15, 16, 17)

154-161: Consider sharing mock data generation logic with ModelHandlerTest.

The mock data generation is very similar to ModelHandlerTest. Consider creating a shared test utility class.

Create a new file TestUtils.scala:

object TestUtils {
  def generateMockGroupBys(prefix: String): Seq[GroupBy] = {
    Seq(GroupBy(s"${prefix}_stats", Seq("date", "user_id")))
  }
  
  def generateMockJoin(id: String): Join = {
    val groupBys = generateMockGroupBys("daily")
    Join(
      id,
      Seq("clicks", "views", "conversion_rate", "session_duration"),
      groupBys,
      online = true,
      production = true,
      Some("data_science_team")
    )
  }
}
hub/src/main/scala/ai/chronon/hub/store/MonitoringModelStore.scala (2)

98-99: Use constants for pagination parameters.

The string literals "limit" and "continuation-key" are used in multiple places. Consider extracting these as constants to improve maintainability and prevent typos.

+ private val LIMIT_KEY = "limit"
+ private val CONTINUATION_KEY = "continuation-key"
  val propsMap = {
    paginationKey match {
-     case Some(key) => Map("limit" -> defaultListLookupLimit, "continuation-key" -> key)
-     case None      => Map("limit" -> defaultListLookupLimit)
+     case Some(key) => Map(LIMIT_KEY -> defaultListLookupLimit, CONTINUATION_KEY -> key)
+     case None      => Map(LIMIT_KEY -> defaultListLookupLimit)
    }
  }

112-113: Consistent key usage in pagination check.

The code checks for "continuation-key" which matches the key used earlier. However, consider using the same constant to ensure consistency.

- if (response.resultProps.contains("continuation-key")) {
-   retrieveAllListConfs(newAcc, response.resultProps.get("continuation-key"))
+ if (response.resultProps.contains(CONTINUATION_KEY)) {
+   retrieveAllListConfs(newAcc, response.resultProps.get(CONTINUATION_KEY))
hub/src/test/scala/ai/chronon/hub/handlers/JoinHandlerTest.scala (2)

64-64: Remove duplicate comments.

The comment "// Trigger call" appears multiple times, and in some cases, it's even duplicated on the same line. Remove these duplicates to improve code readability.

- // Trigger call// Trigger call
+ // Trigger call

Also applies to: 80-80, 98-98, 116-116, 148-148, 178-178


56-191: Enhance test coverage with additional scenarios.

While the current tests cover basic functionality, consider adding tests for:

  1. Edge cases with empty join registry
  2. Cases with malformed query parameters
  3. Error handling for failed store operations
  4. Concurrent request handling

Would you like me to help generate additional test cases for these scenarios?

build.sbt (1)

302-321: Reduce duplication in assembly settings.

The assembly settings are duplicated between the service and hub projects. Consider extracting these common settings into a shared configuration.

+ lazy val commonAssemblySettings = Seq(
+   assembly / assemblyMergeStrategy := {
+     case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
+     case PathList("META-INF", xs @ _*) => MergeStrategy.first
+     case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
+     case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first
+     case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first
+     case "application.conf" => MergeStrategy.concat
+     case "reference.conf" => MergeStrategy.concat
+     case x =>
+       val oldStrategy = (assembly / assemblyMergeStrategy).value
+       oldStrategy(x)
+   }
+ )

  lazy val service = (project in file("service"))
    .dependsOn(online, service_commons)
    .settings(
+     commonAssemblySettings,
-     assembly / assemblyMergeStrategy := {
-       // ... duplicate settings ...
-     }
    )

  lazy val hub = (project in file("hub"))
    .dependsOn(online, service_commons, spark)
    .settings(
+     commonAssemblySettings,
-     assembly / assemblyMergeStrategy := {
-       // ... duplicate settings ...
-     }
    )

Also applies to: 356-375

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 6866a8b and b02d589.

📒 Files selected for processing (21)
  • .github/workflows/test_scala_no_spark.yaml (1 hunks)
  • build.sbt (4 hunks)
  • docker-init/Dockerfile (1 hunks)
  • docker-init/start.sh (1 hunks)
  • hub/src/main/java/ai/chronon/hub/HubVerticle.java (1 hunks)
  • hub/src/main/resources/example_config.json (1 hunks)
  • hub/src/main/resources/logback.xml (2 hunks)
  • hub/src/main/scala/ai/chronon/hub/handlers/InMemKVStoreHandler.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/handlers/JoinsHandler.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/handlers/ModelsHandler.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/handlers/Paginate.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/handlers/SearchHandler.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/handlers/VertxExtensions.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/model/Model.scala (1 hunks)
  • hub/src/main/scala/ai/chronon/hub/store/MonitoringModelStore.scala (3 hunks)
  • hub/src/test/scala/ai/chronon/hub/handlers/JoinHandlerTest.scala (1 hunks)
  • hub/src/test/scala/ai/chronon/hub/handlers/ModelHandlerTest.scala (1 hunks)
  • hub/src/test/scala/ai/chronon/hub/handlers/SearchHandlerTest.scala (1 hunks)
  • hub/src/test/scala/ai/chronon/hub/handlers/TimeSeriesHandlerTest.scala (1 hunks)
  • hub/src/test/scala/ai/chronon/hub/store/MonitoringModelStoreTest.scala (1 hunks)
✅ Files skipped from review due to trivial changes (4)
  • hub/src/main/scala/ai/chronon/hub/handlers/Paginate.scala
  • hub/src/main/scala/ai/chronon/hub/model/Model.scala
  • hub/src/main/resources/example_config.json
  • hub/src/test/scala/ai/chronon/hub/store/MonitoringModelStoreTest.scala
🚧 Files skipped from review as they are similar to previous changes (2)
  • .github/workflows/test_scala_no_spark.yaml
  • docker-init/start.sh
🧰 Additional context used
📓 Learnings (1)
hub/src/main/scala/ai/chronon/hub/store/MonitoringModelStore.scala (1)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:29-30
Timestamp: 2024-11-12T09:38:33.532Z
Learning: In the codebase, the `KVStore` implementation provides an implicit `ExecutionContext` in scope, so it's unnecessary to import another.
🔇 Additional comments (6)
hub/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala (1)

61-78: ⚠️ Potential issue

Validate and sanitize user input when constructing error messages

In joinDriftHandler, user-provided input like offset is included in error messages without sanitization. This could expose the application to injection attacks or reveal sensitive information. Ensure that any user input included in responses is properly validated and sanitized.

Apply this diff to fix the issue:

case (None, _) =>
- ctx.BadRequest(s"Unable to parse offset - $offset")
+ ctx.BadRequest("Unable to parse offset.")

case (_, None) =>
- ctx.BadRequest("Invalid drift algorithm. Expect JSD, PSI or Hellinger")
+ ctx.BadRequest("Invalid drift algorithm. Expected one of: JSD, PSI, or Hellinger.")

Similarly, update other error messages that include user inputs.

Likely invalid or redundant comment.

docker-init/Dockerfile (1)

41-42: Config.json structure is incompatible with Vert.x codebase

The config.json structure needs to be updated to match Vert.x requirements.

hub/src/main/resources/logback.xml (2)

3-5: LGTM! Well-structured environment variable configuration.

The use of environment variables with default values provides good flexibility for different deployment environments while ensuring safe defaults.


14-27: LGTM! Robust file rotation and retention policy.

The file appender configuration implements best practices:

  • Size-based rotation (100MB per file)
  • Total size cap (10GB)
  • Retention period (30 days)
hub/src/test/scala/ai/chronon/hub/handlers/SearchHandlerTest.scala (1)

63-63: Fix duplicate comments (similar to ModelHandlerTest).

The comment "// Trigger call" is duplicated on these lines.

See the fix suggested in ModelHandlerTest.

Also applies to: 80-80, 99-99, 136-136

build.sbt (1)

345-346: Use stable version for json4s-core dependency.

The current version "3.7.0-M11" is a milestone release which may not be stable.

val maybeDriftSeries = driftStore.getDriftSeries(joinPath, driftMetric, window, startTs, endTs)
maybeDriftSeries match {
case Failure(exception) =>
ctx.InternalServerError(s"Error computing join drift - ${exception.getMessage}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid exposing internal exception messages in HTTP responses

Including internal exception messages in HTTP error responses can expose sensitive information and potentially lead to security vulnerabilities. It's recommended to return a generic error message to the client and log the detailed exception on the server side.

Apply this diff to fix the issue:

- ctx.InternalServerError(s"Error computing join drift - ${exception.getMessage}")
+ ctx.InternalServerError("Error computing join drift")

- ctx.InternalServerError(
-   s"Error computing feature percentiles for current + offset time window.\nCurrent window error: ${exceptionA.getMessage}\nOffset window error: ${exceptionB.getMessage}")
+ ctx.InternalServerError("Error computing feature percentiles for current and offset time windows.")

- ctx.InternalServerError(s"Error computing feature percentiles for offset time window - ${exception.getMessage}")
+ ctx.InternalServerError("Error computing feature percentiles for offset time window.")

- ctx.InternalServerError(s"Error computing feature percentiles for current time window - ${exception.getMessage}")
+ ctx.InternalServerError("Error computing feature percentiles for current time window.")

- ctx.InternalServerError(s"Error fetching join drift: ${err.getMessage}")
+ ctx.InternalServerError("Error fetching join drift.")

Also applies to: 215-221, 262-262, 306-306

Comment on lines +318 to +326
def parseOffset(offset: Option[String]): Option[Duration] = {
val hourPattern = """(\d+)h""".r
val dayPattern = """(\d+)d""".r
offset.map(_.toLowerCase) match {
case Some(hourPattern(num)) => Some(num.toInt.hours)
case Some(dayPattern(num)) => Some(num.toInt.days)
case _ => None
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential NumberFormatException in parseOffset

The method parseOffset uses num.toInt, which can throw a NumberFormatException if the numeric value exceeds the range of Int. To prevent the application from crashing, consider handling this exception and returning a BadRequest when the offset value is invalid.

Apply this diff to fix the issue:

def parseOffset(offset: Option[String]): Option[Duration] = {
  val hourPattern = """(\d+)h""".r
  val dayPattern = """(\d+)d""".r
  offset.map(_.toLowerCase) match {
    case Some(hourPattern(num)) =>
-     Some(num.toInt.hours)
+     Try(num.toInt).toOption.map(_.hours)
    case Some(dayPattern(num))  =>
-     Some(num.toInt.days)
+     Try(num.toInt).toOption.map(_.days)
    case _                      => None
  }
}

Don't forget to import scala.util.Try at the beginning of the file:

+ import scala.util.Try

Comment on lines +86 to +89
vertx.setTimer(1000, _ => {
verify(response).setStatusCode(400)
async.complete()
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve asynchronous testing by using Async objects instead of fixed delays

Using vertx.setTimer with a fixed delay can lead to flaky tests and unnecessarily long test durations. Vert.x Unit provides Async objects to coordinate asynchronous operations more effectively.

Refactor your test methods to use Async as shown below:

@Test
def test_joinTsLookup_Send400BadMetricChoice(context: TestContext): Unit = {
- val async = context.async
+ val async = context.async()

  // Test setup code...

- joinDriftHandler.handle(routingContext)
- vertx.setTimer(1000, _ => {
-   verify(response).setStatusCode(400)
-   async.complete()
- })
+ vertx.runOnContext { _ =>
+   joinDriftHandler.handle(routingContext)
+   verify(response).setStatusCode(400)
+   async.complete()
+ }
}

Apply similar changes to other test methods to replace fixed delays with proper asynchronous handling.

Also applies to: 102-105, 117-120, 131-135, 146-150, 166-170, 190-210, 222-226, 239-243, 255-259, 271-275, 287-291, 303-307, 319-323, 345-361, 385-409

.putHeader("content-type", "application/json")
.end(s"Unable to parse - ${error.getMessage}")
}
case None => ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Empty body")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Return 400 Bad Request when request body is missing

If the request expects a body and none is provided, it's appropriate to return a 400 Bad Request status code to indicate client error.

Apply this diff:

- case None => ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Empty body")
+ case None => ctx.response.setStatusCode(400).putHeader("content-type", "application/json").end("{\"error\": \"Empty body\"}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case None => ctx.response.setStatusCode(200).putHeader("content-type", "application/json").end("Empty body")
case None => ctx.response.setStatusCode(400).putHeader("content-type", "application/json").end("{\"error\": \"Empty body\"}")

ctx.response
.setStatusCode(400)
.putHeader("content-type", "application/json")
.end(s"Unable to parse - ${error.getMessage}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure error responses are valid JSON when content-type is 'application/json'

The response for JSON decoding errors sends a plain string while the content-type is set to "application/json". Adjust the response to send valid JSON.

Apply this diff:

- .end(s"Unable to parse - ${error.getMessage}")
+ .end(s"""{"error": "Unable to parse - ${error.getMessage}"}""")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.end(s"Unable to parse - ${error.getMessage}")
.end(s"""{"error": "Unable to parse - ${error.getMessage}"}""")

Comment on lines +20 to +21
val offset = Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
val limit = Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential NumberFormatException when parsing query parameters

Using .toInt on query parameters may throw a NumberFormatException if the provided value is not a valid integer. Wrap the parsing in a try-catch block or use a safe conversion method to handle invalid input gracefully.

Apply this diff to handle parsing exceptions:

- val offset = Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
- val limit = Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)
+ val offsetResult = scala.util.Try {
+   Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
+ }
+ val limitResult = scala.util.Try {
+   Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)
+ }
+ if (offsetResult.isFailure) {
+   ctx.BadRequest("Invalid offset - expect a positive integer")
+   return
+ }
+ if (limitResult.isFailure) {
+   ctx.BadRequest("Invalid limit - expect a positive integer")
+   return
+ }
+ val offset = offsetResult.get
+ val limit = limitResult.get
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val offset = Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
val limit = Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)
val offsetResult = scala.util.Try {
Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
}
val limitResult = scala.util.Try {
Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)
}
if (offsetResult.isFailure) {
ctx.BadRequest("Invalid offset - expect a positive integer")
return
}
if (limitResult.isFailure) {
ctx.BadRequest("Invalid limit - expect a positive integer")
return
}
val offset = offsetResult.get
val limit = limitResult.get

Comment on lines +21 to +25
override def handle(ctx: RoutingContext): Unit = {
val term = Option(ctx.queryParams.get("term")).getOrElse("")
val offset = Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
val limit = Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation for search term

The search term is not sanitized and could potentially contain malicious patterns.

Add input validation:

-    val term = Option(ctx.queryParams.get("term")).getOrElse("")
+    val term = Option(ctx.queryParams.get("term"))
+      .map(_.trim)
+      .filter(_.length <= 100) // Add reasonable length limit
+      .getOrElse("")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
override def handle(ctx: RoutingContext): Unit = {
val term = Option(ctx.queryParams.get("term")).getOrElse("")
val offset = Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
val limit = Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)
override def handle(ctx: RoutingContext): Unit = {
val term = Option(ctx.queryParams.get("term"))
.map(_.trim)
.filter(_.length <= 100) // Add reasonable length limit
.getOrElse("")
val offset = Option(ctx.queryParams.get("offset")).map(_.toInt).getOrElse(defaultOffset)
val limit = Option(ctx.queryParams.get("limit")).map(l => math.min(l.toInt, maxLimit)).getOrElse(defaultLimit)

Comment on lines +26 to +30
if (offset < 0) {
ctx.BadRequest("Invalid offset - expect a positive number")
} else if (limit < 0) {
ctx.BadRequest("Invalid limit - expect a positive number")
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add upper bound validation for offset

While negative values are checked, there's no upper bound validation for the offset parameter.

Add upper bound check:

     if (offset < 0) {
       ctx.BadRequest("Invalid offset - expect a positive number")
+    } else if (offset > 10000) { // Add reasonable upper bound
+      ctx.BadRequest("Invalid offset - value too large")
     } else if (limit < 0) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (offset < 0) {
ctx.BadRequest("Invalid offset - expect a positive number")
} else if (limit < 0) {
ctx.BadRequest("Invalid limit - expect a positive number")
} else {
if (offset < 0) {
ctx.BadRequest("Invalid offset - expect a positive number")
} else if (offset > 10000) { // Add reasonable upper bound
ctx.BadRequest("Invalid offset - value too large")
} else if (limit < 0) {
ctx.BadRequest("Invalid limit - expect a positive number")
} else {

Comment on lines +39 to +42
private def searchRegistry(term: String): Seq[Join] = {
val joins = monitoringStore.getJoins
joins.filter(j => j.name.contains(term))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve search implementation

The current search implementation has several limitations:

  1. Case-sensitive matching may miss relevant results
  2. Direct string contains check could be inefficient for large datasets

Consider this improvement:

   private def searchRegistry(term: String): Seq[Join] = {
     val joins = monitoringStore.getJoins
-    joins.filter(j => j.name.contains(term))
+    val searchTerm = term.toLowerCase
+    joins.filter(j => j.name.toLowerCase.contains(searchTerm))
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private def searchRegistry(term: String): Seq[Join] = {
val joins = monitoringStore.getJoins
joins.filter(j => j.name.contains(term))
}
private def searchRegistry(term: String): Seq[Join] = {
val joins = monitoringStore.getJoins
val searchTerm = term.toLowerCase
joins.filter(j => j.name.toLowerCase.contains(searchTerm))
}

@piyush-zlai
Copy link
Contributor Author

@nikhil-zlai - fixed the rebase snafu you caught, should be good now.

@piyush-zlai piyush-zlai merged commit 1c2205c into main Dec 13, 2024
9 checks passed
@piyush-zlai piyush-zlai deleted the piyush/play_to_vertx branch December 13, 2024 15:49
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary
Migrate from Play to Vert.x for our Hub service to settle on one
backend.

Most of the code is a copy / fork of the existing Hub code. Code is
copied / identical where possible (e.g. the core underlying stuff that
backs the play controllers / vert.x handlers). Some pieces like the play
controllers -> vertx handlers are different in the web framework
interfacing logic.

## Checklist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced `WebServiceVerticle` for managing the web service entry
point.
- Added `FeaturesHandler`, `FeaturesRouter`, and `TimeSeriesHandler` for
handling feature-related requests and time series data.
- Implemented `InMemKVStoreHandler` for managing in-memory key-value
store operations.

- **Improvements**
- Enhanced error handling and response management across various
handlers, including pagination and validation.
- Updated logging configuration for better flexibility and readability.

- **Bug Fixes**
- Improved validation and error responses for pagination parameters
across handlers.

- **Chores**
  - Removed obsolete controllers and tests to streamline the codebase.
- Restructured package organization for better clarity and
maintainability.
- Updated project dependencies and SBT plugins for improved build
management.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary
Migrate from Play to Vert.x for our Hub service to settle on one
backend.

Most of the code is a copy / fork of the existing Hub code. Code is
copied / identical where possible (e.g. the core underlying stuff that
backs the play controllers / vert.x handlers). Some pieces like the play
controllers -> vertx handlers are different in the web framework
interfacing logic.

## Checklist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced `WebServiceVerticle` for managing the web service entry
point.
- Added `FeaturesHandler`, `FeaturesRouter`, and `TimeSeriesHandler` for
handling feature-related requests and time series data.
- Implemented `InMemKVStoreHandler` for managing in-memory key-value
store operations.

- **Improvements**
- Enhanced error handling and response management across various
handlers, including pagination and validation.
- Updated logging configuration for better flexibility and readability.

- **Bug Fixes**
- Improved validation and error responses for pagination parameters
across handlers.

- **Chores**
  - Removed obsolete controllers and tests to streamline the codebase.
- Restructured package organization for better clarity and
maintainability.
- Updated project dependencies and SBT plugins for improved build
management.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
Migrate from Play to Vert.x for our Hub service to settle on one
backend.

Most of the code is a copy / fork of the existing Hub code. Code is
copied / identical where possible (e.g. the core underlying stuff that
backs the play controllers / vert.x handlers). Some pieces like the play
controllers -> vertx handlers are different in the web framework
interfacing logic.

## Checklist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced `WebServiceVerticle` for managing the web service entry
point.
- Added `FeaturesHandler`, `FeaturesRouter`, and `TimeSeriesHandler` for
handling feature-related requests and time series data.
- Implemented `InMemKVStoreHandler` for managing in-memory key-value
store operations.

- **Improvements**
- Enhanced error handling and response management across various
handlers, including pagination and validation.
- Updated logging configuration for better flexibility and readability.

- **Bug Fixes**
- Improved validation and error responses for pagination parameters
across handlers.

- **Chores**
  - Removed obsolete controllers and tests to streamline the codebase.
- Restructured package organization for better clarity and
maintainability.
- Updated project dependencies and SBT plugins for improved build
management.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
Migrate from Play to Vert.x for our Hub service to settle on one
baour clientsend.

Most of the code is a copy / fork of the existing Hub code. Code is
copied / identical where possible (e.g. the core underlying stuff that
baour clientss the play controllers / vert.x handlers). Some pieces like the play
controllers -> vertx handlers are different in the web framework
interfacing logic.

## Cheour clientslist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced `WebServiceVerticle` for managing the web service entry
point.
- Added `FeaturesHandler`, `FeaturesRouter`, and `TimeSeriesHandler` for
handling feature-related requests and time series data.
- Implemented `InMemKVStoreHandler` for managing in-memory key-value
store operations.

- **Improvements**
- Enhanced error handling and response management across various
handlers, including pagination and validation.
- Updated logging configuration for better flexibility and readability.

- **Bug Fixes**
- Improved validation and error responses for pagination parameters
across handlers.

- **Chores**
  - Removed obsolete controllers and tests to streamline the codebase.
- Restructured paour clientsage organization for better clarity and
maintainability.
- Updated project dependencies and SBT plugins for improved build
management.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants