Skip to content

Commit 8cd16dd

Browse files
Connect drift metrics computation in Spark with Hub for serving to frontend (#95)
## Summary Builds on a couple of the summary computation PRs and data generation to wire things up so that Hub can serve them. * Yanked out mock data based endpoints (model perf / drift, join & feature skew) - decided it would be confusing to have a mix of mock and generated data so we just have the generated data served * Dropped a few of the scripts introduced in #87. We bring up our containers the way and we have a script `load_summaries.sh` that we can trigger that leverages the existing app container to load data. * DDB ingestion was taking too long and we were dropping a lot of data due to rejected execution exceptions. To unblock for now, we've gone with an approach of making a bulk put HTTP call from the ObservabilityDemo app -> Hub and Hub utilizing a InMemoryKV store to persist and serve up features. * Added an endpoint to serve the join that are configured as we've switched from the model based world. There's still an issue to resolve around fetching individual feature series data. Once I resolve that, we can switch this PR out of wip mode. To test / run: start up our docker containers: ``` $ docker-compose -f docker-init/compose.yaml up --build ... ``` In a different term load data: ``` $ ./docker-init/demo/load_summaries.sh Done uploading summaries! 🥳 ``` You can now curl join & feature time series data. Join drift (null ratios) ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=null&offset=10h&algorithm=psi' ``` Join drift (value drift) ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=10h&algorithm=psi' ``` Feature drift: ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=aggregates' ``` Feature summaries: ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=percentile' ``` Join metadata ``` curl -X GET 'http://localhost:9000/api/v1/joins' curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join' ``` ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Introduced a new `JoinController` for managing joins with pagination support. - Added functionality for an in-memory key-value store with bulk data upload capabilities. - Implemented observability demo data loading within a Spark application. - Added a new `HTTPKVStore` class for remote key-value store interactions over HTTP. - **Improvements** - Enhanced the `ModelController` and `SearchController` to align with the new join data structure. - Updated the `TimeSeriesController` to support asynchronous operations and improved error handling. - Refined dependency management in the build configuration for better clarity and maintainability. - Updated API routes to include new endpoints for listing and retrieving joins. - Updated configuration to replace the `DynamoDBModule` with `ModelStoreModule`, adding `InMemoryKVStoreModule` and `DriftStoreModule`. - **Documentation** - Revised README instructions for Docker container setup and demo data loading. - Updated API routes documentation to reflect new endpoints for joins and in-memory data operations. - **Bug Fixes** - Resolved issues related to error handling in various controllers and improved logging for better traceability. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: nikhil-zlai <[email protected]>
1 parent 0fb2e3d commit 8cd16dd

34 files changed

+908
-457
lines changed

.github/workflows/test_scala_no_spark.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,9 @@ jobs:
6060
6161
- name: Run api tests
6262
run: |
63-
sbt "++ 2.12.18 api/test"
63+
sbt "++ 2.12.18 api/test"
64+
65+
- name: Run hub tests
66+
run: |
67+
export SBT_OPTS="-Xmx8G -Xms2G"
68+
sbt "++ 2.12.18 hub/test"

build.sbt

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ val jackson = Seq(
8080
"com.fasterxml.jackson.module" %% "jackson-module-scala"
8181
).map(_ % jackson_2_15)
8282

83+
// Circe is used to ser / deser case class payloads for the Hub Play webservice
84+
val circe = Seq(
85+
"io.circe" %% "circe-core",
86+
"io.circe" %% "circe-generic",
87+
"io.circe" %% "circe-parser",
88+
).map(_ % circeVersion)
89+
8390
val flink_all = Seq(
8491
"org.apache.flink" %% "flink-streaming-scala",
8592
"org.apache.flink" % "flink-metrics-dropwizard",
@@ -129,6 +136,8 @@ lazy val online = project
129136
"com.github.ben-manes.caffeine" % "caffeine" % "3.1.8"
130137
),
131138
libraryDependencies ++= jackson,
139+
// dep needed for HTTPKvStore - yank when we rip this out
140+
libraryDependencies += "com.softwaremill.sttp.client3" %% "core" % "3.9.7",
132141
libraryDependencies ++= spark_all.map(_ % "provided"),
133142
libraryDependencies ++= flink_all.map(_ % "provided")
134143
)
@@ -236,20 +245,18 @@ lazy val frontend = (project in file("frontend"))
236245
// build interop between one module solely on 2.13 and others on 2.12 is painful
237246
lazy val hub = (project in file("hub"))
238247
.enablePlugins(PlayScala)
239-
.dependsOn(cloud_aws)
248+
.dependsOn(cloud_aws, spark)
240249
.settings(
241250
name := "hub",
242251
libraryDependencies ++= Seq(
243252
guice,
244253
"org.scalatestplus.play" %% "scalatestplus-play" % "5.1.0" % Test,
245254
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
246-
"io.circe" %% "circe-core" % circeVersion,
247-
"io.circe" %% "circe-generic" % circeVersion,
248-
"io.circe" %% "circe-parser" % circeVersion,
249255
"org.scala-lang.modules" %% "scala-xml" % "2.1.0",
250256
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0",
251257
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
252258
),
259+
libraryDependencies ++= circe,
253260
libraryDependencySchemes ++= Seq(
254261
"org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always,
255262
"org.scala-lang.modules" %% "scala-parser-combinators" % VersionScheme.Always,
@@ -258,7 +265,10 @@ lazy val hub = (project in file("hub"))
258265
excludeDependencies ++= Seq(
259266
ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"),
260267
ExclusionRule(organization = "log4j", name = "log4j"),
261-
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-to-slf4j")
268+
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-to-slf4j"),
269+
ExclusionRule("org.apache.logging.log4j", "log4j-slf4j-impl"),
270+
ExclusionRule("org.apache.logging.log4j", "log4j-core"),
271+
ExclusionRule("org.apache.logging.log4j", "log4j-api")
262272
),
263273
// Ensure consistent versions of logging libraries
264274
dependencyOverrides ++= Seq(

docker-init/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ ENV CHRONON_DRIVER_JAR="/app/cli/spark.jar"
4343
# Set up Spark dependencies to help with launching CLI
4444
# Copy Spark JARs from the Bitnami image
4545
COPY --from=spark-source /opt/bitnami/spark/jars /opt/spark/jars
46+
COPY --from=spark-source /opt/bitnami/spark/bin /opt/spark/bin
4647

4748
# Add all Spark JARs to the classpath
4849
ENV CLASSPATH=/opt/spark/jars/*

docker-init/demo/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
1+
# Populate Observability Demo Data
2+
To populate the observability demo data:
3+
* Launch the set of docker containers:
4+
```bash
5+
~/workspace/chronon $ docker-compose -f docker-init/compose.yaml up --build
6+
...
7+
app-1 | [info] 2024-11-26 05:10:45,758 [main] INFO play.api.Play - Application started (Prod) (no global state)
8+
app-1 | [info] 2024-11-26 05:10:45,958 [main] INFO play.core.server.AkkaHttpServer - Listening for HTTP on /[0:0:0:0:0:0:0:0]:9000
9+
```
10+
(you can skip the --build if you don't wish to rebuild your code)
11+
12+
Now you can trigger the script to load summary data:
13+
```bash
14+
~/workspace/chronon $ docker-init/demo/load_summaries.sh
15+
...
16+
Done uploading summaries! 🥳
17+
```
18+
19+
# Streamlit local experimentation
120
run build.sh once, and you can repeatedly exec to quickly visualize
221

322
In first terminal: `sbt spark/assembly`
423
In second terminal: `./run.sh` to load the built jar and serve the data on localhost:8181
524
In third terminal: `streamlit run viz.py`
25+

docker-init/demo/load_summaries.sh

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Kick off the ObsDemo spark job in the app container
2+
3+
docker-compose -f docker-init/compose.yaml exec app /opt/spark/bin/spark-submit \
4+
--master "local[*]" \
5+
--driver-memory 8g \
6+
--conf "spark.driver.maxResultSize=6g" \
7+
--conf "spark.driver.memory=8g" \
8+
--driver-class-path "/opt/spark/jars/*:/app/cli/*" \
9+
--conf "spark.driver.host=localhost" \
10+
--conf "spark.driver.bindAddress=0.0.0.0" \
11+
--class ai.chronon.spark.scripts.ObservabilityDemoDataLoader \
12+
/app/cli/spark.jar

docker-init/demo/log4j2.properties

Lines changed: 0 additions & 17 deletions
This file was deleted.

docker-init/start.sh

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,6 @@ echo "DynamoDB Table created successfully!"
3939

4040
start_time=$(date +%s)
4141

42-
if ! java \
43-
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
44-
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
45-
-cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver summarize-and-upload \
46-
--online-jar=$CLOUD_AWS_JAR \
47-
--online-class=$ONLINE_CLASS \
48-
--parquet-path="$(pwd)/drift_data" \
49-
--conf-path=/chronon_sample/production/ \
50-
--time-column=transaction_time; then
51-
echo "Error: Failed to load summary data into DynamoDB" >&2
52-
exit 1
53-
else
54-
end_time=$(date +%s)
55-
elapsed_time=$((end_time - start_time))
56-
echo "Summary load completed successfully! Took $elapsed_time seconds."
57-
fi
58-
5942
# Add these java options as without them we hit the below error:
6043
# throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @36328710
6144
export JAVA_OPTS="--add-opens java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED"
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package controllers
2+
3+
import ai.chronon.online.KVStore
4+
import ai.chronon.online.KVStore.PutRequest
5+
import io.circe.Codec
6+
import io.circe.Decoder
7+
import io.circe.Encoder
8+
import io.circe.generic.semiauto.deriveCodec
9+
import io.circe.parser.decode
10+
import play.api.Logger
11+
import play.api.mvc
12+
import play.api.mvc.BaseController
13+
import play.api.mvc.ControllerComponents
14+
import play.api.mvc.RawBuffer
15+
16+
import java.util.Base64
17+
import javax.inject.Inject
18+
import scala.concurrent.ExecutionContext
19+
import scala.concurrent.Future
20+
21+
class InMemKVStoreController @Inject() (val controllerComponents: ControllerComponents, kvStore: KVStore)(implicit
22+
ec: ExecutionContext)
23+
extends BaseController {
24+
25+
import PutRequestCodec._
26+
27+
val logger: Logger = Logger(this.getClass)
28+
29+
def bulkPut(): mvc.Action[RawBuffer] =
30+
Action(parse.raw).async { request =>
31+
request.body.asBytes() match {
32+
case Some(bytes) =>
33+
decode[Array[PutRequest]](bytes.utf8String) match {
34+
case Right(putRequests) =>
35+
logger.debug(s"Attempting a bulkPut with ${putRequests.length} items")
36+
val resultFuture = kvStore.multiPut(putRequests)
37+
resultFuture.map { responses =>
38+
if (responses.contains(false)) {
39+
logger.warn("Some write failures encountered")
40+
}
41+
Ok("Success")
42+
}
43+
case Left(error) => Future.successful(BadRequest(error.getMessage))
44+
}
45+
case None => Future.successful(BadRequest("Empty body"))
46+
}
47+
}
48+
}
49+
50+
object PutRequestCodec {
51+
// Custom codec for byte arrays using Base64
52+
implicit val byteArrayEncoder: Encoder[Array[Byte]] =
53+
Encoder.encodeString.contramap[Array[Byte]](Base64.getEncoder.encodeToString)
54+
55+
implicit val byteArrayDecoder: Decoder[Array[Byte]] =
56+
Decoder.decodeString.map(Base64.getDecoder.decode)
57+
58+
// Derive codec for PutRequest
59+
implicit val putRequestCodec: Codec[PutRequest] = deriveCodec[PutRequest]
60+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package controllers
2+
3+
import io.circe.generic.auto._
4+
import io.circe.syntax._
5+
import model.ListJoinResponse
6+
import play.api.mvc._
7+
import store.MonitoringModelStore
8+
9+
import javax.inject._
10+
11+
/**
12+
* Controller for the Zipline Join entities
13+
*/
14+
@Singleton
15+
class JoinController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore)
16+
extends BaseController
17+
with Paginate {
18+
19+
/**
20+
* Powers the /api/v1/joins endpoint. Returns a list of models
21+
* @param offset - For pagination. We skip over offset entries before returning results
22+
* @param limit - Number of elements to return
23+
*/
24+
def list(offset: Option[Int], limit: Option[Int]): Action[AnyContent] =
25+
Action { implicit request: Request[AnyContent] =>
26+
// Default values if the parameters are not provided
27+
val offsetValue = offset.getOrElse(defaultOffset)
28+
val limitValue = limit.map(l => math.min(l, maxLimit)).getOrElse(defaultLimit)
29+
30+
if (offsetValue < 0) {
31+
BadRequest("Invalid offset - expect a positive number")
32+
} else if (limitValue < 0) {
33+
BadRequest("Invalid limit - expect a positive number")
34+
} else {
35+
val joins = monitoringStore.getJoins
36+
val paginatedResults = paginateResults(joins, offsetValue, limitValue)
37+
val json = ListJoinResponse(offsetValue, paginatedResults).asJson.noSpaces
38+
Ok(json)
39+
}
40+
}
41+
42+
/**
43+
* Returns a specific join by name
44+
*/
45+
def get(name: String): Action[AnyContent] = {
46+
Action { implicit request: Request[AnyContent] =>
47+
val maybeJoin = monitoringStore.getJoins.find(j => j.name.equalsIgnoreCase(name))
48+
maybeJoin match {
49+
case None => NotFound(s"Join: $name wasn't found")
50+
case Some(join) => Ok(join.asJson.noSpaces)
51+
}
52+
}
53+
}
54+
}

hub/app/controllers/ModelController.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@ import io.circe.generic.auto._
44
import io.circe.syntax._
55
import model.ListModelResponse
66
import play.api.mvc._
7-
import store.DynamoDBMonitoringStore
7+
import store.MonitoringModelStore
88

99
import javax.inject._
1010

1111
/**
1212
* Controller for the Zipline models entities
1313
*/
1414
@Singleton
15-
class ModelController @Inject() (val controllerComponents: ControllerComponents,
16-
monitoringStore: DynamoDBMonitoringStore)
15+
class ModelController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore)
1716
extends BaseController
1817
with Paginate {
1918

hub/app/controllers/Paginate.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package controllers
22

3-
import model.Model
4-
53
trait Paginate {
64
val defaultOffset = 0
75
val defaultLimit = 10
86
val maxLimit = 100
97

10-
def paginateResults(results: Seq[Model], offset: Int, limit: Int): Seq[Model] = {
8+
def paginateResults[T](results: Seq[T], offset: Int, limit: Int): Seq[T] = {
119
results.slice(offset, offset + limit)
1210
}
1311
}

hub/app/controllers/SearchController.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,23 @@ package controllers
22

33
import io.circe.generic.auto._
44
import io.circe.syntax._
5-
import model.Model
6-
import model.SearchModelResponse
5+
import model.Join
6+
import model.SearchJoinResponse
77
import play.api.mvc._
8-
import store.DynamoDBMonitoringStore
8+
import store.MonitoringModelStore
99

1010
import javax.inject._
1111

1212
/**
1313
* Controller to power search related APIs
1414
*/
15-
class SearchController @Inject() (val controllerComponents: ControllerComponents,
16-
monitoringStore: DynamoDBMonitoringStore)
15+
class SearchController @Inject() (val controllerComponents: ControllerComponents, monitoringStore: MonitoringModelStore)
1716
extends BaseController
1817
with Paginate {
1918

2019
/**
21-
* Powers the /api/v1/search endpoint. Returns a list of models
22-
* @param term - Search term to search for (currently we only support searching model names)
20+
* Powers the /api/v1/search endpoint. Returns a list of joins
21+
* @param term - Search term to search for (currently we only support searching join names)
2322
* @param offset - For pagination. We skip over offset entries before returning results
2423
* @param limit - Number of elements to return
2524
*/
@@ -36,14 +35,14 @@ class SearchController @Inject() (val controllerComponents: ControllerComponents
3635
} else {
3736
val searchResults = searchRegistry(term)
3837
val paginatedResults = paginateResults(searchResults, offsetValue, limitValue)
39-
val json = SearchModelResponse(offsetValue, paginatedResults).asJson.noSpaces
38+
val json = SearchJoinResponse(offsetValue, paginatedResults).asJson.noSpaces
4039
Ok(json)
4140
}
4241
}
4342

44-
// a trivial search where we check the model name for similarity with the search term
45-
private def searchRegistry(term: String): Seq[Model] = {
46-
val models = monitoringStore.getModels
47-
models.filter(m => m.name.contains(term))
43+
// a trivial search where we check the join name for similarity with the search term
44+
private def searchRegistry(term: String): Seq[Join] = {
45+
val joins = monitoringStore.getJoins
46+
joins.filter(j => j.name.contains(term))
4847
}
4948
}

0 commit comments

Comments
 (0)