Skip to content

Connect drift metrics computation in Spark with Hub for serving to frontend #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
45b4af3
Changes
nikhil-zlai Nov 3, 2024
e3d2115
changes so far
nikhil-zlai Nov 3, 2024
c0e5d8f
remove unused file
nikhil-zlai Nov 3, 2024
e718f91
fix
nikhil-zlai Nov 3, 2024
d3ff253
adding back color printer
nikhil-zlai Nov 3, 2024
bbf1ddd
scalafmt fix
nikhil-zlai Nov 4, 2024
d61c83a
assign intervals
nikhil-zlai Nov 7, 2024
ce39428
assign intervals
nikhil-zlai Nov 7, 2024
787b664
tile summary distance
nikhil-zlai Nov 7, 2024
e576dad
histogram drift
nikhil-zlai Nov 8, 2024
3ca60fa
tile drift
nikhil-zlai Nov 8, 2024
ea115e3
test wiring
nikhil-zlai Nov 12, 2024
1af7b76
Rename DynamoDB store to monitoring model store
piyush-zlai Nov 20, 2024
1228488
First cut wiring up with passing tests
piyush-zlai Nov 22, 2024
86b1cfd
Rip out mock data generation and corresponding endpoints
piyush-zlai Nov 22, 2024
3657bb7
Add joins endpoints and switch search to use joins
piyush-zlai Nov 22, 2024
622405a
Switch to correct metadata table
piyush-zlai Nov 25, 2024
db0619c
observability script for demo
nikhil-zlai Nov 24, 2024
12db7cd
running observability demo
nikhil-zlai Nov 25, 2024
0a8c8b3
Add support for in-memory controller + kv store module
piyush-zlai Nov 26, 2024
e7e2d16
Clean up scripts to load data and query via time series controller
piyush-zlai Nov 26, 2024
281fc7c
Address scalafix + fmt
piyush-zlai Nov 26, 2024
7fc0637
Update colPrefix pass through in drift store
piyush-zlai Nov 26, 2024
4d78fa5
Add details to join response + join get endpoint
piyush-zlai Nov 26, 2024
7e7d2a1
Rebase + comments
piyush-zlai Nov 27, 2024
470fd9e
Revert TableUtils for now
piyush-zlai Nov 27, 2024
18e260c
Swap to new uploader app and revert old code
piyush-zlai Nov 27, 2024
de044c4
Downgrade in mem controller log to debug
piyush-zlai Nov 27, 2024
dfc4e1f
style: Apply scalafix and scalafmt changes
piyush-zlai Nov 27, 2024
6ab315c
Handle empty responses
piyush-zlai Nov 27, 2024
62687b9
Remove redundant log4j props file
piyush-zlai Nov 27, 2024
5316680
Use thread locals for thrift serializers
piyush-zlai Nov 27, 2024
92f0d11
Rebase + comments
piyush-zlai Nov 27, 2024
4965c8a
style: Apply scalafix and scalafmt changes
piyush-zlai Nov 27, 2024
06ccac7
Add breaks method
piyush-zlai Nov 27, 2024
3089feb
style: Apply scalafix and scalafmt changes
piyush-zlai Nov 27, 2024
4930a98
Wrap team in optional
piyush-zlai Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/test_scala_no_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ jobs:
- name: Run api tests
run: |
sbt "++ 2.12.18 api/test"
sbt "++ 2.12.18 api/test"
- name: Run hub tests
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 hub/test"
20 changes: 15 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ val jackson = Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala"
).map(_ % jackson_2_15)

// Circe is used to ser / deser case class payloads for the Hub Play webservice
val circe = Seq(
"io.circe" %% "circe-core",
"io.circe" %% "circe-generic",
"io.circe" %% "circe-parser",
).map(_ % circeVersion)

val flink_all = Seq(
"org.apache.flink" %% "flink-streaming-scala",
"org.apache.flink" % "flink-metrics-dropwizard",
Expand Down Expand Up @@ -129,6 +136,8 @@ lazy val online = project
"com.github.ben-manes.caffeine" % "caffeine" % "3.1.8"
),
libraryDependencies ++= jackson,
// dep needed for HTTPKvStore - yank when we rip this out
libraryDependencies += "com.softwaremill.sttp.client3" %% "core" % "3.9.7",
libraryDependencies ++= spark_all.map(_ % "provided"),
libraryDependencies ++= flink_all.map(_ % "provided")
)
Expand Down Expand Up @@ -236,20 +245,18 @@ lazy val frontend = (project in file("frontend"))
// build interop between one module solely on 2.13 and others on 2.12 is painful
lazy val hub = (project in file("hub"))
.enablePlugins(PlayScala)
.dependsOn(cloud_aws)
.dependsOn(cloud_aws, spark)
.settings(
name := "hub",
libraryDependencies ++= Seq(
guice,
"org.scalatestplus.play" %% "scalatestplus-play" % "5.1.0" % Test,
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"org.scala-lang.modules" %% "scala-xml" % "2.1.0",
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0",
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
),
libraryDependencies ++= circe,
libraryDependencySchemes ++= Seq(
"org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always,
"org.scala-lang.modules" %% "scala-parser-combinators" % VersionScheme.Always,
Expand All @@ -258,7 +265,10 @@ lazy val hub = (project in file("hub"))
excludeDependencies ++= Seq(
ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"),
ExclusionRule(organization = "log4j", name = "log4j"),
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-to-slf4j")
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-to-slf4j"),
ExclusionRule("org.apache.logging.log4j", "log4j-slf4j-impl"),
ExclusionRule("org.apache.logging.log4j", "log4j-core"),
ExclusionRule("org.apache.logging.log4j", "log4j-api")
),
// Ensure consistent versions of logging libraries
dependencyOverrides ++= Seq(
Expand Down
1 change: 1 addition & 0 deletions docker-init/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ ENV CHRONON_DRIVER_JAR="/app/cli/spark.jar"
# Set up Spark dependencies to help with launching CLI
# Copy Spark JARs from the Bitnami image
COPY --from=spark-source /opt/bitnami/spark/jars /opt/spark/jars
COPY --from=spark-source /opt/bitnami/spark/bin /opt/spark/bin

# Add all Spark JARs to the classpath
ENV CLASSPATH=/opt/spark/jars/*
Expand Down
20 changes: 20 additions & 0 deletions docker-init/demo/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Populate Observability Demo Data
To populate the observability demo data:
* Launch the set of docker containers:
```bash
~/workspace/chronon $ docker-compose -f docker-init/compose.yaml up --build
...
app-1 | [info] 2024-11-26 05:10:45,758 [main] INFO play.api.Play - Application started (Prod) (no global state)
app-1 | [info] 2024-11-26 05:10:45,958 [main] INFO play.core.server.AkkaHttpServer - Listening for HTTP on /[0:0:0:0:0:0:0:0]:9000
```
(you can skip the --build if you don't wish to rebuild your code)
Comment on lines +1 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance Docker setup instructions

The setup instructions should include prerequisites and verification steps.

Add the following sections:

 # Populate Observability Demo Data
+
+## Prerequisites
+- Docker v20.10 or higher
+- Docker Compose v2.0 or higher
+- Minimum 4GB RAM available for containers
+
+## Setup
 To populate the observability demo data:
 * Launch the set of docker containers:
 ```bash
 ~/workspace/chronon $ docker-compose -f docker-init/compose.yaml up --build
 ...
 app-1           | [info] 2024-11-26 05:10:45,758 [main] INFO  play.api.Play - Application started (Prod) (no global state)
 app-1           | [info] 2024-11-26 05:10:45,958 [main] INFO  play.core.server.AkkaHttpServer - Listening for HTTP on /[0:0:0:0:0:0:0:0]:9000

(you can skip the --build if you don't wish to rebuild your code)
+
+Expected startup time: ~2-3 minutes
+
+### Verify Setup
+Ensure all containers are running:
+bash +~/workspace/chronon $ docker-compose -f docker-init/compose.yaml ps +
+
+All services should show status as "running".


<!-- This is an auto-generated comment by CodeRabbit -->


Now you can trigger the script to load summary data:
```bash
~/workspace/chronon $ docker-init/demo/load_summaries.sh
...
Done uploading summaries! 🥳
```
Comment on lines +12 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance data loading documentation

The data loading instructions could be more informative about:

  1. The expected duration of the loading process
  2. What data is being loaded
  3. How to verify successful loading
  4. Troubleshooting steps

Consider expanding the documentation:

 Now you can trigger the script to load summary data:
 ```bash
 ~/workspace/chronon $ docker-init/demo/load_summaries.sh
 ...
 Done uploading summaries! 🥳

+The script will:
+- Load drift metrics data
+- Configure joins
+- Populate the in-memory KV store
+
+Expected duration: ~5 minutes
+
+## Verification
+
+You can verify the data loading by accessing the following endpoints:
+
+bash +# Check join drift metrics +curl http://localhost:9000/api/v1/joins/drift + +# Check metadata +curl http://localhost:9000/api/v1/metadata +
+
+## Troubleshooting
+
+If you encounter issues:
+1. Check Docker container logs: docker-compose logs app
+2. Ensure all containers are running: docker-compose ps
+3. Verify system resources (memory usage, disk space)


<!-- This is an auto-generated comment by CodeRabbit -->

Comment on lines +2 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add cleanup and shutdown instructions

The documentation should include instructions for cleaning up resources and shutting down the environment properly.

Add a cleanup section:

+## Cleanup
+
+To stop and remove the containers:
+```bash
+~/workspace/chronon $ docker-compose -f docker-init/compose.yaml down
+```
+
+To clean up all data and start fresh:
+```bash
+~/workspace/chronon $ docker-compose -f docker-init/compose.yaml down -v
+```


# Streamlit local experimentation
run build.sh once, and you can repeatedly exec to quickly visualize

In first terminal: `sbt spark/assembly`
In second terminal: `./run.sh` to load the built jar and serve the data on localhost:8181
In third terminal: `streamlit run viz.py`

Comment on lines +19 to +25
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Restructure and clarify Streamlit instructions

The Streamlit section needs better formatting, clearer instructions, and proper grammar.

Replace the section with:

-# Streamlit local experimentation
-run build.sh once, and you can repeatedly exec to quickly visualize 
-
-In first terminal: `sbt spark/assembly` 
-In second terminal: `./run.sh` to load the built jar and serve the data on localhost:8181
-In third terminal: `streamlit run viz.py`
+# Local Development with Streamlit
+
+For local experimentation with the Streamlit visualization:
+
+1. Build the Spark assembly (required once):
+   ```bash
+   ~/workspace/chronon $ sbt spark/assembly
+   ```
+
+2. Start the data server:
+   ```bash
+   ~/workspace/chronon $ ./run.sh
+   ```
+   This loads the built JAR and serves data at http://localhost:8181
+
+3. Launch the Streamlit application:
+   ```bash
+   ~/workspace/chronon $ streamlit run viz.py
+   ```
+   Access the visualization at http://localhost:8501
🧰 Tools
🪛 LanguageTool

[grammar] ~20-~20: Replace ‘exec’ with an appropriate verb.
Context: ...n build.sh once, and you can repeatedly exec to quickly visualize In first termina...

(MD_NN)


[uncategorized] ~22-~22: You might be missing the article “the” here.
Context: ...peatedly exec to quickly visualize In first terminal: sbt spark/assembly In seco...

(AI_EN_LECTOR_MISSING_DETERMINER_THE)


[uncategorized] ~23-~23: You might be missing the article “the” here.
Context: ...irst terminal: sbt spark/assembly In second terminal: ./run.sh to load the built ...

(AI_EN_LECTOR_MISSING_DETERMINER_THE)


[uncategorized] ~24-~24: You might be missing the article “the” here.
Context: ...and serve the data on localhost:8181 In third terminal: streamlit run viz.py

(AI_EN_LECTOR_MISSING_DETERMINER_THE)

12 changes: 12 additions & 0 deletions docker-init/demo/load_summaries.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Kick off the ObsDemo spark job in the app container

Comment on lines +1 to +2
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add shell script essentials

The script is missing crucial shell script components:

  • Shebang directive
  • Execution permissions
  • Usage instructions and requirements

Apply this diff to improve the script header:

+#!/bin/bash
+
+# Script: load_summaries.sh
+# Purpose: Initiates ObservabilityDemo Spark job in the app container
+#
+# Requirements:
+# - Docker and docker-compose must be running
+# - Containers must be up (see docker-init/demo/README.md)
+#
+# Usage: ./load_summaries.sh
+
 # Kick off the ObsDemo spark job in the app container
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Kick off the ObsDemo spark job in the app container
#!/bin/bash
# Script: load_summaries.sh
# Purpose: Initiates ObservabilityDemo Spark job in the app container
#
# Requirements:
# - Docker and docker-compose must be running
# - Containers must be up (see docker-init/demo/README.md)
#
# Usage: ./load_summaries.sh
# Kick off the ObsDemo spark job in the app container
🧰 Tools
🪛 Shellcheck (0.10.0)

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

docker-compose -f docker-init/compose.yaml exec app /opt/spark/bin/spark-submit \
--master "local[*]" \
--driver-memory 8g \
--conf "spark.driver.maxResultSize=6g" \
--conf "spark.driver.memory=8g" \
Comment on lines +4 to +7
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Memory configuration in demo environment needs adjustment

The verification reveals inconsistency in memory configurations across the codebase. While the demo environment uses fixed high values (8GB driver memory, 6GB max result size), other scripts in the test environment use a more flexible approach with environment variables defaulting to 1GB, which is more suitable for demo purposes.

Suggested changes:

  • Consider adopting the same environment variable approach used in test scripts (DRIVER_MEMORY:-1G)
  • Reduce the default memory allocation to be more accommodating for demo environments
  • Remove the duplicate memory configuration (--driver-memory and spark.driver.memory)
🔗 Analysis chain

Review memory configuration for demo environment

The current configuration allocates 8GB of memory and 6GB for max result size, which might be excessive for a demo environment and could cause issues on machines with limited resources.

Let's check if these memory settings are documented or consistent across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for similar Spark memory configurations
rg -A 2 "driver-memory|maxResultSize" --type yaml --type sh

Length of output: 935

--driver-class-path "/opt/spark/jars/*:/app/cli/*" \
--conf "spark.driver.host=localhost" \
--conf "spark.driver.bindAddress=0.0.0.0" \
--class ai.chronon.spark.scripts.ObservabilityDemoDataLoader \
/app/cli/spark.jar
17 changes: 0 additions & 17 deletions docker-init/demo/log4j2.properties

This file was deleted.

17 changes: 0 additions & 17 deletions docker-init/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,6 @@ echo "DynamoDB Table created successfully!"

start_time=$(date +%s)

if ! java \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
-cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver summarize-and-upload \
--online-jar=$CLOUD_AWS_JAR \
--online-class=$ONLINE_CLASS \
--parquet-path="$(pwd)/drift_data" \
--conf-path=/chronon_sample/production/ \
--time-column=transaction_time; then
echo "Error: Failed to load summary data into DynamoDB" >&2
exit 1
else
end_time=$(date +%s)
elapsed_time=$((end_time - start_time))
echo "Summary load completed successfully! Took $elapsed_time seconds."
fi

# Add these java options as without them we hit the below error:
# throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @36328710
export JAVA_OPTS="--add-opens java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED"
Expand Down
60 changes: 60 additions & 0 deletions hub/app/controllers/InMemKVStoreController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package controllers

import ai.chronon.online.KVStore
import ai.chronon.online.KVStore.PutRequest
import io.circe.Codec
import io.circe.Decoder
import io.circe.Encoder
import io.circe.generic.semiauto.deriveCodec
import io.circe.parser.decode
import play.api.Logger
import play.api.mvc
import play.api.mvc.BaseController
import play.api.mvc.ControllerComponents
import play.api.mvc.RawBuffer

import java.util.Base64
import javax.inject.Inject
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

class InMemKVStoreController @Inject() (val controllerComponents: ControllerComponents, kvStore: KVStore)(implicit
ec: ExecutionContext)
extends BaseController {

import PutRequestCodec._

val logger: Logger = Logger(this.getClass)

def bulkPut(): mvc.Action[RawBuffer] =
Action(parse.raw).async { request =>
request.body.asBytes() match {
case Some(bytes) =>
decode[Array[PutRequest]](bytes.utf8String) match {
case Right(putRequests) =>
logger.debug(s"Attempting a bulkPut with ${putRequests.length} items")
val resultFuture = kvStore.multiPut(putRequests)
resultFuture.map { responses =>
if (responses.contains(false)) {
logger.warn("Some write failures encountered")
}
Ok("Success")
}
case Left(error) => Future.successful(BadRequest(error.getMessage))
}
case None => Future.successful(BadRequest("Empty body"))
}
}
}

object PutRequestCodec {
// Custom codec for byte arrays using Base64
implicit val byteArrayEncoder: Encoder[Array[Byte]] =
Encoder.encodeString.contramap[Array[Byte]](Base64.getEncoder.encodeToString)

implicit val byteArrayDecoder: Decoder[Array[Byte]] =
Decoder.decodeString.map(Base64.getDecoder.decode)

// Derive codec for PutRequest
implicit val putRequestCodec: Codec[PutRequest] = deriveCodec[PutRequest]
}
54 changes: 54 additions & 0 deletions hub/app/controllers/JoinController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package controllers

import io.circe.generic.auto._
import io.circe.syntax._
import model.ListJoinResponse
import play.api.mvc._
import store.MonitoringModelStore

import javax.inject._

/**
* Controller for the Zipline Join entities
*/
@Singleton
class JoinController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore)
extends BaseController
with Paginate {

/**
* Powers the /api/v1/joins endpoint. Returns a list of models
* @param offset - For pagination. We skip over offset entries before returning results
* @param limit - Number of elements to return
*/
def list(offset: Option[Int], limit: Option[Int]): Action[AnyContent] =
Action { implicit request: Request[AnyContent] =>
// Default values if the parameters are not provided
val offsetValue = offset.getOrElse(defaultOffset)
val limitValue = limit.map(l => math.min(l, maxLimit)).getOrElse(defaultLimit)

if (offsetValue < 0) {
BadRequest("Invalid offset - expect a positive number")
} else if (limitValue < 0) {
BadRequest("Invalid limit - expect a positive number")
} else {
val joins = monitoringStore.getJoins
val paginatedResults = paginateResults(joins, offsetValue, limitValue)
val json = ListJoinResponse(offsetValue, paginatedResults).asJson.noSpaces
Ok(json)
}
}
Comment on lines +24 to +40
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and improve response structure

The current implementation has several areas for improvement:

  1. No exception handling for store operations
  2. Missing total count in response for proper pagination
  3. No input sanitization beyond negative checks

Consider applying these improvements:

 def list(offset: Option[Int], limit: Option[Int]): Action[AnyContent] =
   Action { implicit request: Request[AnyContent] =>
+    Logger.debug(s"Fetching joins with offset=$offset, limit=$limit")
     val offsetValue = offset.getOrElse(defaultOffset)
     val limitValue = limit.map(l => math.min(l, maxLimit)).getOrElse(defaultLimit)

     if (offsetValue < 0) {
       BadRequest("Invalid offset - expect a positive number")
     } else if (limitValue < 0) {
       BadRequest("Invalid limit - expect a positive number")
     } else {
-      val joins = monitoringStore.getJoins
-      val paginatedResults = paginateResults(joins, offsetValue, limitValue)
-      val json = ListJoinResponse(offsetValue, paginatedResults).asJson.noSpaces
-      Ok(json)
+      Try {
+        val joins = monitoringStore.getJoins
+        val totalCount = joins.size
+        val paginatedResults = paginateResults(joins, offsetValue, limitValue)
+        val response = ListJoinResponse(
+          offset = offsetValue,
+          total = totalCount,
+          results = paginatedResults
+        )
+        Ok(response.asJson.noSpaces)
+      } recover {
+        case ex: Exception =>
+          Logger.error("Failed to fetch joins", ex)
+          InternalServerError(s"Failed to fetch joins: ${ex.getMessage}")
+      } get
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def list(offset: Option[Int], limit: Option[Int]): Action[AnyContent] =
Action { implicit request: Request[AnyContent] =>
// Default values if the parameters are not provided
val offsetValue = offset.getOrElse(defaultOffset)
val limitValue = limit.map(l => math.min(l, maxLimit)).getOrElse(defaultLimit)
if (offsetValue < 0) {
BadRequest("Invalid offset - expect a positive number")
} else if (limitValue < 0) {
BadRequest("Invalid limit - expect a positive number")
} else {
val joins = monitoringStore.getJoins
val paginatedResults = paginateResults(joins, offsetValue, limitValue)
val json = ListJoinResponse(offsetValue, paginatedResults).asJson.noSpaces
Ok(json)
}
}
def list(offset: Option[Int], limit: Option[Int]): Action[AnyContent] =
Action { implicit request: Request[AnyContent] =>
Logger.debug(s"Fetching joins with offset=$offset, limit=$limit")
val offsetValue = offset.getOrElse(defaultOffset)
val limitValue = limit.map(l => math.min(l, maxLimit)).getOrElse(defaultLimit)
if (offsetValue < 0) {
BadRequest("Invalid offset - expect a positive number")
} else if (limitValue < 0) {
BadRequest("Invalid limit - expect a positive number")
} else {
Try {
val joins = monitoringStore.getJoins
val totalCount = joins.size
val paginatedResults = paginateResults(joins, offsetValue, limitValue)
val response = ListJoinResponse(
offset = offsetValue,
total = totalCount,
results = paginatedResults
)
Ok(response.asJson.noSpaces)
} recover {
case ex: Exception =>
Logger.error("Failed to fetch joins", ex)
InternalServerError(s"Failed to fetch joins: ${ex.getMessage}")
} get
}
}


/**
* Returns a specific join by name
*/
def get(name: String): Action[AnyContent] = {
Action { implicit request: Request[AnyContent] =>
val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(name))
maybeJoin match {
case None => NotFound(s"Join: $name wasn't found")
case Some(join) => Ok(join.asJson.noSpaces)
}
}
}
Comment on lines +45 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling and input validation in get endpoint

The endpoint needs several improvements:

  1. Add error handling for store operations
  2. Validate the name parameter
  3. Make response format consistent with list endpoint

Apply these improvements:

 def get(name: String): Action[AnyContent] = {
   Action { implicit request: Request[AnyContent] =>
+    Logger.info(s"Received request to get join with name=$name")
+    
+    if (name.trim.isEmpty) {
+      BadRequest("Join name cannot be empty")
+    } else Try {
       val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(name))
       maybeJoin match {
         case None       => NotFound(s"Join: $name wasn't found")
-        case Some(join) => Ok(join.asJson.noSpaces)
+        case Some(join) => Ok(Map("data" -> join).asJson.noSpaces)
       }
+    } recover {
+      case ex: Exception =>
+        Logger.error(s"Failed to fetch join: $name", ex)
+        InternalServerError(s"Failed to fetch join: ${ex.getMessage}")
+    } get
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get(name: String): Action[AnyContent] = {
Action { implicit request: Request[AnyContent] =>
val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(name))
maybeJoin match {
case None => NotFound(s"Join: $name wasn't found")
case Some(join) => Ok(join.asJson.noSpaces)
}
}
}
def get(name: String): Action[AnyContent] = {
Action { implicit request: Request[AnyContent] =>
Logger.info(s"Received request to get join with name=$name")
if (name.trim.isEmpty) {
BadRequest("Join name cannot be empty")
} else Try {
val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(name))
maybeJoin match {
case None => NotFound(s"Join: $name wasn't found")
case Some(join) => Ok(Map("data" -> join).asJson.noSpaces)
}
} recover {
case ex: Exception =>
Logger.error(s"Failed to fetch join: $name", ex)
InternalServerError(s"Failed to fetch join: ${ex.getMessage}")
} get
}
}

}
5 changes: 2 additions & 3 deletions hub/app/controllers/ModelController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import io.circe.generic.auto._
import io.circe.syntax._
import model.ListModelResponse
import play.api.mvc._
import store.DynamoDBMonitoringStore
import store.MonitoringModelStore

import javax.inject._

/**
* Controller for the Zipline models entities
*/
@Singleton
class ModelController @Inject() (val controllerComponents: ControllerComponents,
monitoringStore: DynamoDBMonitoringStore)
class ModelController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore)
extends BaseController
with Paginate {

Expand Down
4 changes: 1 addition & 3 deletions hub/app/controllers/Paginate.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package controllers

import model.Model

trait Paginate {
val defaultOffset = 0
val defaultLimit = 10
val maxLimit = 100

def paginateResults(results: Seq[Model], offset: Int, limit: Int): Seq[Model] = {
def paginateResults[T](results: Seq[T], offset: Int, limit: Int): Seq[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!!

results.slice(offset, offset + limit)
}
}
23 changes: 11 additions & 12 deletions hub/app/controllers/SearchController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@ package controllers

import io.circe.generic.auto._
import io.circe.syntax._
import model.Model
import model.SearchModelResponse
import model.Join
import model.SearchJoinResponse
import play.api.mvc._
import store.DynamoDBMonitoringStore
import store.MonitoringModelStore

import javax.inject._

/**
* Controller to power search related APIs
*/
class SearchController @Inject() (val controllerComponents: ControllerComponents,
monitoringStore: DynamoDBMonitoringStore)
class SearchController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore)
extends BaseController
with Paginate {

/**
* Powers the /api/v1/search endpoint. Returns a list of models
* @param term - Search term to search for (currently we only support searching model names)
* Powers the /api/v1/search endpoint. Returns a list of joins
* @param term - Search term to search for (currently we only support searching join names)
* @param offset - For pagination. We skip over offset entries before returning results
* @param limit - Number of elements to return
*/
Expand All @@ -36,14 +35,14 @@ class SearchController @Inject() (val controllerComponents: ControllerComponents
} else {
val searchResults = searchRegistry(term)
val paginatedResults = paginateResults(searchResults, offsetValue, limitValue)
val json = SearchModelResponse(offsetValue, paginatedResults).asJson.noSpaces
val json = SearchJoinResponse(offsetValue, paginatedResults).asJson.noSpaces
Ok(json)
}
}

// a trivial search where we check the model name for similarity with the search term
private def searchRegistry(term: String): Seq[Model] = {
val models = monitoringStore.getModels
models.filter(m => m.name.contains(term))
// a trivial search where we check the join name for similarity with the search term
private def searchRegistry(term: String): Seq[Join] = {
val joins = monitoringStore.getJoins
joins.filter(j => j.name.contains(term))
Comment on lines +43 to +46
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing search functionality and error handling

The current implementation has several areas for improvement:

  1. Case-sensitive search might not be user-friendly
  2. No error handling for monitoringStore.getJoins
  3. Potential performance impact of filtering all joins in memory

Consider this improvement:

  private def searchRegistry(term: String): Seq[Join] = {
-   val joins = monitoringStore.getJoins
-   joins.filter(j => j.name.contains(term))
+   try {
+     val normalizedTerm = term.toLowerCase
+     monitoringStore.getJoins.filter(j => 
+       j.name.toLowerCase.contains(normalizedTerm)
+     )
+   } catch {
+     case e: Exception =>
+       logger.error(s"Failed to search joins: ${e.getMessage}", e)
+       Seq.empty
+   }
  }

Committable suggestion skipped: line range outside the PR's diff.

}
}
Loading