-
Notifications
You must be signed in to change notification settings - Fork 0
Connect drift metrics computation in Spark with Hub for serving to frontend #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 34 commits
45b4af3
e3d2115
c0e5d8f
e718f91
d3ff253
bbf1ddd
d61c83a
ce39428
787b664
e576dad
3ca60fa
ea115e3
1af7b76
1228488
86b1cfd
3657bb7
622405a
db0619c
12db7cd
0a8c8b3
e7e2d16
281fc7c
7fc0637
4d78fa5
7e7d2a1
470fd9e
18e260c
de044c4
dfc4e1f
6ab315c
62687b9
5316680
92f0d11
4965c8a
06ccac7
3089feb
4930a98
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,25 @@ | ||
# Populate Observability Demo Data | ||
To populate the observability demo data: | ||
* Launch the set of docker containers: | ||
```bash | ||
~/workspace/chronon $ docker-compose -f docker-init/compose.yaml up --build | ||
... | ||
app-1 | [info] 2024-11-26 05:10:45,758 [main] INFO play.api.Play - Application started (Prod) (no global state) | ||
app-1 | [info] 2024-11-26 05:10:45,958 [main] INFO play.core.server.AkkaHttpServer - Listening for HTTP on /[0:0:0:0:0:0:0:0]:9000 | ||
``` | ||
(you can skip the --build if you don't wish to rebuild your code) | ||
Comment on lines
+1
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance Docker setup instructions The setup instructions should include prerequisites and verification steps. Add the following sections: # Populate Observability Demo Data
+
+## Prerequisites
+- Docker v20.10 or higher
+- Docker Compose v2.0 or higher
+- Minimum 4GB RAM available for containers
+
+## Setup
To populate the observability demo data:
* Launch the set of docker containers:
```bash
~/workspace/chronon $ docker-compose -f docker-init/compose.yaml up --build
...
app-1 | [info] 2024-11-26 05:10:45,758 [main] INFO play.api.Play - Application started (Prod) (no global state)
app-1 | [info] 2024-11-26 05:10:45,958 [main] INFO play.core.server.AkkaHttpServer - Listening for HTTP on /[0:0:0:0:0:0:0:0]:9000 (you can skip the --build if you don't wish to rebuild your code)
|
||
|
||
Now you can trigger the script to load summary data: | ||
```bash | ||
~/workspace/chronon $ docker-init/demo/load_summaries.sh | ||
... | ||
Done uploading summaries! 🥳 | ||
``` | ||
Comment on lines
+12
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance data loading documentation The data loading instructions could be more informative about:
Consider expanding the documentation: Now you can trigger the script to load summary data:
```bash
~/workspace/chronon $ docker-init/demo/load_summaries.sh
...
Done uploading summaries! 🥳 +The script will:
Comment on lines
+2
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add cleanup and shutdown instructions The documentation should include instructions for cleaning up resources and shutting down the environment properly. Add a cleanup section: +## Cleanup
+
+To stop and remove the containers:
+```bash
+~/workspace/chronon $ docker-compose -f docker-init/compose.yaml down
+```
+
+To clean up all data and start fresh:
+```bash
+~/workspace/chronon $ docker-compose -f docker-init/compose.yaml down -v
+```
piyush-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Streamlit local experimentation | ||
run build.sh once, and you can repeatedly exec to quickly visualize | ||
|
||
In first terminal: `sbt spark/assembly` | ||
In second terminal: `./run.sh` to load the built jar and serve the data on localhost:8181 | ||
In third terminal: `streamlit run viz.py` | ||
|
||
Comment on lines
+19
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Restructure and clarify Streamlit instructions The Streamlit section needs better formatting, clearer instructions, and proper grammar. Replace the section with: -# Streamlit local experimentation
-run build.sh once, and you can repeatedly exec to quickly visualize
-
-In first terminal: `sbt spark/assembly`
-In second terminal: `./run.sh` to load the built jar and serve the data on localhost:8181
-In third terminal: `streamlit run viz.py`
+# Local Development with Streamlit
+
+For local experimentation with the Streamlit visualization:
+
+1. Build the Spark assembly (required once):
+ ```bash
+ ~/workspace/chronon $ sbt spark/assembly
+ ```
+
+2. Start the data server:
+ ```bash
+ ~/workspace/chronon $ ./run.sh
+ ```
+ This loads the built JAR and serves data at http://localhost:8181
+
+3. Launch the Streamlit application:
+ ```bash
+ ~/workspace/chronon $ streamlit run viz.py
+ ```
+ Access the visualization at http://localhost:8501 🧰 Tools🪛 LanguageTool[grammar] ~20-~20: Replace ‘exec’ with an appropriate verb. (MD_NN) [uncategorized] ~22-~22: You might be missing the article “the” here. (AI_EN_LECTOR_MISSING_DETERMINER_THE) [uncategorized] ~23-~23: You might be missing the article “the” here. (AI_EN_LECTOR_MISSING_DETERMINER_THE) [uncategorized] ~24-~24: You might be missing the article “the” here. (AI_EN_LECTOR_MISSING_DETERMINER_THE) |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,12 @@ | ||||||||||||||||||||||||||||||
# Kick off the ObsDemo spark job in the app container | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Comment on lines
+1
to
+2
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add shell script essentials The script is missing crucial shell script components:
Apply this diff to improve the script header: +#!/bin/bash
+
+# Script: load_summaries.sh
+# Purpose: Initiates ObservabilityDemo Spark job in the app container
+#
+# Requirements:
+# - Docker and docker-compose must be running
+# - Containers must be up (see docker-init/demo/README.md)
+#
+# Usage: ./load_summaries.sh
+
# Kick off the ObsDemo spark job in the app container
📝 Committable suggestion
Suggested change
🧰 Tools🪛 Shellcheck (0.10.0)[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive. (SC2148) |
||||||||||||||||||||||||||||||
docker-compose -f docker-init/compose.yaml exec app /opt/spark/bin/spark-submit \ | ||||||||||||||||||||||||||||||
--master "local[*]" \ | ||||||||||||||||||||||||||||||
--driver-memory 8g \ | ||||||||||||||||||||||||||||||
--conf "spark.driver.maxResultSize=6g" \ | ||||||||||||||||||||||||||||||
--conf "spark.driver.memory=8g" \ | ||||||||||||||||||||||||||||||
Comment on lines
+4
to
+7
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Memory configuration in demo environment needs adjustment The verification reveals inconsistency in memory configurations across the codebase. While the demo environment uses fixed high values (8GB driver memory, 6GB max result size), other scripts in the test environment use a more flexible approach with environment variables defaulting to 1GB, which is more suitable for demo purposes. Suggested changes:
🔗 Analysis chainReview memory configuration for demo environment The current configuration allocates 8GB of memory and 6GB for max result size, which might be excessive for a demo environment and could cause issues on machines with limited resources. Let's check if these memory settings are documented or consistent across the codebase: 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for similar Spark memory configurations
rg -A 2 "driver-memory|maxResultSize" --type yaml --type sh
Length of output: 935 |
||||||||||||||||||||||||||||||
--driver-class-path "/opt/spark/jars/*:/app/cli/*" \ | ||||||||||||||||||||||||||||||
--conf "spark.driver.host=localhost" \ | ||||||||||||||||||||||||||||||
--conf "spark.driver.bindAddress=0.0.0.0" \ | ||||||||||||||||||||||||||||||
--class ai.chronon.spark.scripts.ObservabilityDemoDataLoader \ | ||||||||||||||||||||||||||||||
/app/cli/spark.jar |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package controllers | ||
|
||
import ai.chronon.online.KVStore | ||
import ai.chronon.online.KVStore.PutRequest | ||
import io.circe.Codec | ||
import io.circe.Decoder | ||
import io.circe.Encoder | ||
import io.circe.generic.semiauto.deriveCodec | ||
import io.circe.parser.decode | ||
import play.api.Logger | ||
import play.api.mvc | ||
import play.api.mvc.BaseController | ||
import play.api.mvc.ControllerComponents | ||
import play.api.mvc.RawBuffer | ||
|
||
import java.util.Base64 | ||
import javax.inject.Inject | ||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.Future | ||
|
||
class InMemKVStoreController @Inject() (val controllerComponents: ControllerComponents, kvStore: KVStore)(implicit | ||
ec: ExecutionContext) | ||
extends BaseController { | ||
|
||
import PutRequestCodec._ | ||
|
||
val logger: Logger = Logger(this.getClass) | ||
|
||
def bulkPut(): mvc.Action[RawBuffer] = | ||
Action(parse.raw).async { request => | ||
request.body.asBytes() match { | ||
case Some(bytes) => | ||
decode[Array[PutRequest]](bytes.utf8String) match { | ||
case Right(putRequests) => | ||
logger.debug(s"Attempting a bulkPut with ${putRequests.length} items") | ||
val resultFuture = kvStore.multiPut(putRequests) | ||
resultFuture.map { responses => | ||
if (responses.contains(false)) { | ||
logger.warn("Some write failures encountered") | ||
} | ||
Ok("Success") | ||
} | ||
case Left(error) => Future.successful(BadRequest(error.getMessage)) | ||
} | ||
case None => Future.successful(BadRequest("Empty body")) | ||
} | ||
} | ||
} | ||
|
||
object PutRequestCodec { | ||
// Custom codec for byte arrays using Base64 | ||
implicit val byteArrayEncoder: Encoder[Array[Byte]] = | ||
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Encoder.encodeString.contramap[Array[Byte]](Base64.getEncoder.encodeToString) | ||
|
||
implicit val byteArrayDecoder: Decoder[Array[Byte]] = | ||
Decoder.decodeString.map(Base64.getDecoder.decode) | ||
|
||
// Derive codec for PutRequest | ||
implicit val putRequestCodec: Codec[PutRequest] = deriveCodec[PutRequest] | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,54 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
package controllers | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import io.circe.generic.auto._ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import io.circe.syntax._ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import model.ListJoinResponse | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import play.api.mvc._ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import store.MonitoringModelStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import javax.inject._ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Controller for the Zipline Join entities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Singleton | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class JoinController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
extends BaseController | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
with Paginate { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Powers the /api/v1/joins endpoint. Returns a list of models | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param offset - For pagination. We skip over offset entries before returning results | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param limit - Number of elements to return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def list(offset: Option[Int], limit: Option[Int]): Action[AnyContent] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Action { implicit request: Request[AnyContent] => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Default values if the parameters are not provided | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val offsetValue = offset.getOrElse(defaultOffset) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val limitValue = limit.map(l => math.min(l, maxLimit)).getOrElse(defaultLimit) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (offsetValue < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BadRequest("Invalid offset - expect a positive number") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else if (limitValue < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BadRequest("Invalid limit - expect a positive number") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val joins = monitoringStore.getJoins | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val paginatedResults = paginateResults(joins, offsetValue, limitValue) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val json = ListJoinResponse(offsetValue, paginatedResults).asJson.noSpaces | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(json) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+24
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling and improve response structure The current implementation has several areas for improvement:
Consider applying these improvements: def list(offset: Option[Int], limit: Option[Int]): Action[AnyContent] =
Action { implicit request: Request[AnyContent] =>
+ Logger.debug(s"Fetching joins with offset=$offset, limit=$limit")
val offsetValue = offset.getOrElse(defaultOffset)
val limitValue = limit.map(l => math.min(l, maxLimit)).getOrElse(defaultLimit)
if (offsetValue < 0) {
BadRequest("Invalid offset - expect a positive number")
} else if (limitValue < 0) {
BadRequest("Invalid limit - expect a positive number")
} else {
- val joins = monitoringStore.getJoins
- val paginatedResults = paginateResults(joins, offsetValue, limitValue)
- val json = ListJoinResponse(offsetValue, paginatedResults).asJson.noSpaces
- Ok(json)
+ Try {
+ val joins = monitoringStore.getJoins
+ val totalCount = joins.size
+ val paginatedResults = paginateResults(joins, offsetValue, limitValue)
+ val response = ListJoinResponse(
+ offset = offsetValue,
+ total = totalCount,
+ results = paginatedResults
+ )
+ Ok(response.asJson.noSpaces)
+ } recover {
+ case ex: Exception =>
+ Logger.error("Failed to fetch joins", ex)
+ InternalServerError(s"Failed to fetch joins: ${ex.getMessage}")
+ } get
}
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Returns a specific join by name | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def get(name: String): Action[AnyContent] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Action { implicit request: Request[AnyContent] => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(name)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maybeJoin match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case None => NotFound(s"Join: $name wasn't found") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case Some(join) => Ok(join.asJson.noSpaces) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+45
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve error handling and input validation in get endpoint The endpoint needs several improvements:
Apply these improvements: def get(name: String): Action[AnyContent] = {
Action { implicit request: Request[AnyContent] =>
+ Logger.info(s"Received request to get join with name=$name")
+
+ if (name.trim.isEmpty) {
+ BadRequest("Join name cannot be empty")
+ } else Try {
val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(name))
maybeJoin match {
case None => NotFound(s"Join: $name wasn't found")
- case Some(join) => Ok(join.asJson.noSpaces)
+ case Some(join) => Ok(Map("data" -> join).asJson.noSpaces)
}
+ } recover {
+ case ex: Exception =>
+ Logger.error(s"Failed to fetch join: $name", ex)
+ InternalServerError(s"Failed to fetch join: ${ex.getMessage}")
+ } get
}
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,11 @@ | ||
package controllers | ||
|
||
import model.Model | ||
|
||
trait Paginate { | ||
val defaultOffset = 0 | ||
val defaultLimit = 10 | ||
val maxLimit = 100 | ||
|
||
def paginateResults(results: Seq[Model], offset: Int, limit: Int): Seq[Model] = { | ||
def paginateResults[T](results: Seq[T], offset: Int, limit: Int): Seq[T] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice!! |
||
results.slice(offset, offset + limit) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,24 +2,23 @@ package controllers | |
|
||
import io.circe.generic.auto._ | ||
import io.circe.syntax._ | ||
import model.Model | ||
import model.SearchModelResponse | ||
import model.Join | ||
import model.SearchJoinResponse | ||
import play.api.mvc._ | ||
import store.DynamoDBMonitoringStore | ||
import store.MonitoringModelStore | ||
|
||
import javax.inject._ | ||
|
||
/** | ||
* Controller to power search related APIs | ||
*/ | ||
class SearchController @Inject() (val controllerComponents: ControllerComponents, | ||
monitoringStore: DynamoDBMonitoringStore) | ||
class SearchController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore) | ||
extends BaseController | ||
with Paginate { | ||
|
||
/** | ||
* Powers the /api/v1/search endpoint. Returns a list of models | ||
* @param term - Search term to search for (currently we only support searching model names) | ||
* Powers the /api/v1/search endpoint. Returns a list of joins | ||
* @param term - Search term to search for (currently we only support searching join names) | ||
* @param offset - For pagination. We skip over offset entries before returning results | ||
* @param limit - Number of elements to return | ||
*/ | ||
|
@@ -36,14 +35,14 @@ class SearchController @Inject() (val controllerComponents: ControllerComponents | |
} else { | ||
val searchResults = searchRegistry(term) | ||
val paginatedResults = paginateResults(searchResults, offsetValue, limitValue) | ||
val json = SearchModelResponse(offsetValue, paginatedResults).asJson.noSpaces | ||
val json = SearchJoinResponse(offsetValue, paginatedResults).asJson.noSpaces | ||
Ok(json) | ||
} | ||
} | ||
|
||
// a trivial search where we check the model name for similarity with the search term | ||
private def searchRegistry(term: String): Seq[Model] = { | ||
val models = monitoringStore.getModels | ||
models.filter(m => m.name.contains(term)) | ||
// a trivial search where we check the join name for similarity with the search term | ||
private def searchRegistry(term: String): Seq[Join] = { | ||
val joins = monitoringStore.getJoins | ||
joins.filter(j => j.name.contains(term)) | ||
Comment on lines
+43
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider enhancing search functionality and error handling The current implementation has several areas for improvement:
Consider this improvement: private def searchRegistry(term: String): Seq[Join] = {
- val joins = monitoringStore.getJoins
- joins.filter(j => j.name.contains(term))
+ try {
+ val normalizedTerm = term.toLowerCase
+ monitoringStore.getJoins.filter(j =>
+ j.name.toLowerCase.contains(normalizedTerm)
+ )
+ } catch {
+ case e: Exception =>
+ logger.error(s"Failed to search joins: ${e.getMessage}", e)
+ Seq.empty
+ }
}
|
||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.