Skip to content

Commit e0e83d8

Browse files
Connect drift metrics computation in Spark with Hub for serving to frontend (#95)
## Summary Builds on a couple of the summary computation PRs and data generation to wire things up so that Hub can serve them. * Yanked out mock data based endpoints (model perf / drift, join & feature skew) - decided it would be confusing to have a mix of mock and generated data so we just have the generated data served * Dropped a few of the scripts introduced in #87. We bring up our containers the way and we have a script `load_summaries.sh` that we can trigger that leverages the existing app container to load data. * DDB ingestion was taking too long and we were dropping a lot of data due to rejected execution exceptions. To unblock for now, we've gone with an approach of making a bulk put HTTP call from the ObservabilityDemo app -> Hub and Hub utilizing a InMemoryKV store to persist and serve up features. * Added an endpoint to serve the join that are configured as we've switched from the model based world. There's still an issue to resolve around fetching individual feature series data. Once I resolve that, we can switch this PR out of wip mode. To test / run: start up our docker containers: ``` $ docker-compose -f docker-init/compose.yaml up --build ... ``` In a different term load data: ``` $ ./docker-init/demo/load_summaries.sh Done uploading summaries! 🥳 ``` You can now curl join & feature time series data. Join drift (null ratios) ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=null&offset=10h&algorithm=psi' ``` Join drift (value drift) ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=10h&algorithm=psi' ``` Feature drift: ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=aggregates' ``` Feature summaries: ``` curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=percentile' ``` Join metadata ``` curl -X GET 'http://localhost:9000/api/v1/joins' curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join' ``` ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Introduced a new `JoinController` for managing joins with pagination support. - Added functionality for an in-memory key-value store with bulk data upload capabilities. - Implemented observability demo data loading within a Spark application. - Added a new `HTTPKVStore` class for remote key-value store interactions over HTTP. - **Improvements** - Enhanced the `ModelController` and `SearchController` to align with the new join data structure. - Updated the `TimeSeriesController` to support asynchronous operations and improved error handling. - Refined dependency management in the build configuration for better clarity and maintainability. - Updated API routes to include new endpoints for listing and retrieving joins. - Updated configuration to replace the `DynamoDBModule` with `ModelStoreModule`, adding `InMemoryKVStoreModule` and `DriftStoreModule`. - **Documentation** - Revised README instructions for Docker container setup and demo data loading. - Updated API routes documentation to reflect new endpoints for joins and in-memory data operations. - **Bug Fixes** - Resolved issues related to error handling in various controllers and improved logging for better traceability. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: nikhil-zlai <[email protected]>
1 parent 594a65a commit e0e83d8

File tree

9 files changed

+217
-26
lines changed

9 files changed

+217
-26
lines changed

build.sbt

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ val jackson = Seq(
8080
"com.fasterxml.jackson.module" %% "jackson-module-scala"
8181
).map(_ % jackson_2_15)
8282

83+
// Circe is used to ser / deser case class payloads for the Hub Play webservice
84+
val circe = Seq(
85+
"io.circe" %% "circe-core",
86+
"io.circe" %% "circe-generic",
87+
"io.circe" %% "circe-parser",
88+
).map(_ % circeVersion)
89+
8390
val flink_all = Seq(
8491
"org.apache.flink" %% "flink-streaming-scala",
8592
"org.apache.flink" % "flink-metrics-dropwizard",
@@ -129,6 +136,8 @@ lazy val online = project
129136
"com.github.ben-manes.caffeine" % "caffeine" % "3.1.8"
130137
),
131138
libraryDependencies ++= jackson,
139+
// dep needed for HTTPKvStore - yank when we rip this out
140+
libraryDependencies += "com.softwaremill.sttp.client3" %% "core" % "3.9.7",
132141
libraryDependencies ++= spark_all.map(_ % "provided"),
133142
libraryDependencies ++= flink_all.map(_ % "provided")
134143
)
@@ -236,20 +245,18 @@ lazy val frontend = (project in file("frontend"))
236245
// build interop between one module solely on 2.13 and others on 2.12 is painful
237246
lazy val hub = (project in file("hub"))
238247
.enablePlugins(PlayScala)
239-
.dependsOn(cloud_aws)
248+
.dependsOn(cloud_aws, spark)
240249
.settings(
241250
name := "hub",
242251
libraryDependencies ++= Seq(
243252
guice,
244253
"org.scalatestplus.play" %% "scalatestplus-play" % "5.1.0" % Test,
245254
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
246-
"io.circe" %% "circe-core" % circeVersion,
247-
"io.circe" %% "circe-generic" % circeVersion,
248-
"io.circe" %% "circe-parser" % circeVersion,
249255
"org.scala-lang.modules" %% "scala-xml" % "2.1.0",
250256
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0",
251257
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
252258
),
259+
libraryDependencies ++= circe,
253260
libraryDependencySchemes ++= Seq(
254261
"org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always,
255262
"org.scala-lang.modules" %% "scala-parser-combinators" % VersionScheme.Always,
@@ -258,7 +265,10 @@ lazy val hub = (project in file("hub"))
258265
excludeDependencies ++= Seq(
259266
ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"),
260267
ExclusionRule(organization = "log4j", name = "log4j"),
261-
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-to-slf4j")
268+
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-to-slf4j"),
269+
ExclusionRule("org.apache.logging.log4j", "log4j-slf4j-impl"),
270+
ExclusionRule("org.apache.logging.log4j", "log4j-core"),
271+
ExclusionRule("org.apache.logging.log4j", "log4j-api")
262272
),
263273
// Ensure consistent versions of logging libraries
264274
dependencyOverrides ++= Seq(
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package ai.chronon.online
2+
3+
import ai.chronon.online.KVStore.PutRequest
4+
import sttp.client3._
5+
import sttp.model.StatusCode
6+
7+
import java.util.Base64
8+
import scala.concurrent.Future
9+
10+
// Hacky test kv store that we use to send objects to the in-memory KV store that lives in a different JVM (e.g spark -> hub)
11+
class HTTPKVStore(host: String = "localhost", port: Int = 9000) extends KVStore with Serializable {
12+
13+
val backend: SttpBackend[Identity, Any] = HttpClientSyncBackend()
14+
val baseUrl: String = s"http://$host:$port/api/v1/dataset"
15+
16+
override def multiGet(requests: collection.Seq[KVStore.GetRequest]): Future[collection.Seq[KVStore.GetResponse]] = ???
17+
18+
override def multiPut(putRequests: collection.Seq[KVStore.PutRequest]): Future[collection.Seq[Boolean]] = {
19+
if (putRequests.isEmpty) {
20+
Future.successful(Seq.empty)
21+
} else {
22+
Future {
23+
basicRequest
24+
.post(uri"$baseUrl/data")
25+
.header("Content-Type", "application/json")
26+
.body(jsonList(putRequests))
27+
.send(backend)
28+
}.map { response =>
29+
response.code match {
30+
case StatusCode.Ok => Seq(true)
31+
case _ =>
32+
logger.error(s"HTTP multiPut failed with status ${response.code}: ${response.body}")
33+
Seq(false)
34+
}
35+
}
36+
}
37+
}
38+
39+
override def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit = ???
40+
41+
override def create(dataset: String): Unit = {
42+
logger.warn(s"Skipping creation of $dataset in HTTP kv store implementation")
43+
}
44+
45+
// wire up json conversion manually to side step serialization issues in spark executors
46+
def jsonString(request: PutRequest): String = {
47+
val keyBase64 = Base64.getEncoder.encodeToString(request.keyBytes)
48+
val valueBase64 = Base64.getEncoder.encodeToString(request.valueBytes)
49+
s"""{ "keyBytes": "${keyBase64}", "valueBytes": "${valueBase64}", "dataset": "${request.dataset}", "tsMillis": ${request.tsMillis.orNull}}""".stripMargin
50+
}
51+
52+
def jsonList(requests: Seq[PutRequest]): String = {
53+
val requestsJson = requests.map(jsonString(_)).mkString(", ")
54+
55+
s"[ $requestsJson ]"
56+
}
57+
}

online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ case class MetadataEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag](
2222
object MetadataEndPoint {
2323
@transient implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
2424

25-
val ConfByKeyEndPointName = "ZIPLINE_METADATA"
25+
val ConfByKeyEndPointName = "CHRONON_METADATA"
2626
val NameByTeamEndPointName = "CHRONON_ENTITY_BY_TEAM"
2727

2828
private def getTeamFromMetadata(metaData: MetaData): String = {

online/src/main/scala/ai/chronon/online/stats/DriftStore.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import ai.chronon.api.thrift.protocol.TProtocolFactory
1212
import ai.chronon.online.KVStore
1313
import ai.chronon.online.KVStore.GetRequest
1414
import ai.chronon.online.MetadataStore
15-
import ai.chronon.online.stats.DriftStore.compactDeserializer
16-
import ai.chronon.online.stats.DriftStore.compactSerializer
15+
import ai.chronon.online.stats.DriftStore.binaryDeserializer
16+
import ai.chronon.online.stats.DriftStore.binarySerializer
1717

1818
import java.io.Serializable
1919
import scala.concurrent.Future
@@ -52,8 +52,6 @@ class DriftStore(kvStore: KVStore,
5252
}
5353
}
5454

55-
private val deserializer: TDeserializer = compactDeserializer
56-
5755
private case class SummaryRequestContext(request: GetRequest, tileKey: TileKey, groupName: String)
5856
private case class SummaryResponseContext(summaries: Array[(TileSummary, Long)], tileKey: TileKey, groupName: String)
5957

@@ -76,8 +74,8 @@ class DriftStore(kvStore: KVStore,
7674
endMs: Option[Long],
7775
columnPrefix: Option[String]): Future[Seq[TileSummaryInfo]] = {
7876

79-
val serializer: TSerializer = compactSerializer
80-
val tileKeyMap = tileKeysForJoin(joinConf, columnPrefix)
77+
val serializer: TSerializer = binarySerializer.get()
78+
val tileKeyMap = tileKeysForJoin(joinConf, None, columnPrefix)
8179
val requestContextMap: Map[GetRequest, SummaryRequestContext] = tileKeyMap.flatMap {
8280
case (group, keys) =>
8381
keys.map { key =>
@@ -90,6 +88,7 @@ class DriftStore(kvStore: KVStore,
9088
val responseFuture = kvStore.multiGet(requestContextMap.keys.toSeq)
9189

9290
responseFuture.map { responses =>
91+
val deserializer = binaryDeserializer.get()
9392
// deserialize the responses and surround with context
9493
val responseContextTries: Seq[Try[SummaryResponseContext]] = responses.map { response =>
9594
val valuesTry = response.values
@@ -200,7 +199,17 @@ object DriftStore {
200199
class SerializableSerializer(factory: TProtocolFactory) extends TSerializer(factory) with Serializable
201200

202201
// crazy bug in compact protocol - do not change to compact
203-
def compactSerializer: SerializableSerializer = new SerializableSerializer(new TBinaryProtocol.Factory())
204202

205-
def compactDeserializer: TDeserializer = new TDeserializer(new TBinaryProtocol.Factory())
203+
@transient
204+
lazy val binarySerializer: ThreadLocal[TSerializer] = new ThreadLocal[TSerializer] {
205+
override def initialValue(): TSerializer = new TSerializer(new TBinaryProtocol.Factory())
206+
}
207+
208+
@transient
209+
lazy val binaryDeserializer: ThreadLocal[TDeserializer] = new ThreadLocal[TDeserializer] {
210+
override def initialValue(): TDeserializer = new TDeserializer(new TBinaryProtocol.Factory())
211+
}
212+
213+
// todo - drop this hard-coded list in favor of a well known list or exposing as part of summaries
214+
def breaks(count: Int): Seq[String] = (0 to count).map(_ * (100 / count)).map("p" + _.toString)
206215
}

online/src/main/scala/ai/chronon/online/stats/TileDriftCalculator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ object TileDriftCalculator {
8181
result
8282
}
8383

84-
// for each summary with ts >= startMs, use spec.lookBack to find the previous summary and calculate dirft
84+
// for each summary with ts >= startMs, use spec.lookBack to find the previous summary and calculate drift
8585
// we do this by first creating a map of summaries by timestamp
8686
def toTileDrifts(summariesWithTimestamps: Array[(TileSummary, Long)],
8787
metric: DriftMetric,

spark/src/main/scala/ai/chronon/spark/Driver.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package ai.chronon.spark
1818

1919
import ai.chronon.api
2020
import ai.chronon.api.Constants
21+
import ai.chronon.api.Constants.MetadataDataset
2122
import ai.chronon.api.Extensions.GroupByOps
2223
import ai.chronon.api.Extensions.MetadataOps
2324
import ai.chronon.api.Extensions.SourceOps
@@ -565,7 +566,7 @@ object Driver {
565566
lazy val api: Api = impl(serializableProps)
566567

567568
def metaDataStore =
568-
new MetadataStore(impl(serializableProps).genKvStore, "ZIPLINE_METADATA", timeoutMillis = 10000)
569+
new MetadataStore(impl(serializableProps).genKvStore, MetadataDataset, timeoutMillis = 10000)
569570

570571
def impl(props: Map[String, String]): Api = {
571572
val urls = Array(new File(onlineJar()).toURI.toURL)

spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import ai.chronon.api.TileSeriesKey
55
import ai.chronon.api.TileSummarySeries
66
import ai.chronon.api.thrift.TBase
77
import ai.chronon.online.stats.DriftStore
8-
import ai.chronon.online.stats.DriftStore.SerializableSerializer
98
import com.fasterxml.jackson.databind.ObjectMapper
109
import com.fasterxml.jackson.databind.SerializationFeature
1110
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -19,7 +18,6 @@ import io.netty.handler.codec.http._
1918
import io.netty.util.CharsetUtil
2019

2120
import java.util.Base64
22-
import java.util.function.Supplier
2321
import scala.reflect.ClassTag
2422

2523
class DataServer(driftSeries: Seq[TileDriftSeries], summarySeries: Seq[TileSummarySeries], port: Int = 8181) {
@@ -35,15 +33,10 @@ class DataServer(driftSeries: Seq[TileDriftSeries], summarySeries: Seq[TileSumma
3533
ctx.flush()
3634
}
3735

38-
private val serializer: ThreadLocal[SerializableSerializer] =
39-
ThreadLocal.withInitial(new Supplier[SerializableSerializer] {
40-
override def get(): SerializableSerializer = DriftStore.compactSerializer
41-
})
42-
4336
private def convertToBytesMap[T <: TBase[_, _]: Manifest: ClassTag](
4437
series: T,
4538
keyF: T => TileSeriesKey): Map[String, String] = {
46-
val serializerInstance = serializer.get()
39+
val serializerInstance = DriftStore.binarySerializer.get()
4740
val encoder = Base64.getEncoder
4841
val keyBytes = serializerInstance.serialize(keyF(series))
4942
val valueBytes = serializerInstance.serialize(series)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package ai.chronon.spark.scripts
2+
3+
import ai.chronon.api.ColorPrinter.ColorString
4+
import ai.chronon.api.Constants
5+
import ai.chronon.api.Extensions.MetadataOps
6+
import ai.chronon.online.HTTPKVStore
7+
import ai.chronon.online.KVStore
8+
import ai.chronon.spark.SparkSessionBuilder
9+
import ai.chronon.spark.TableUtils
10+
import ai.chronon.spark.stats.drift.Summarizer
11+
import ai.chronon.spark.stats.drift.SummaryUploader
12+
import ai.chronon.spark.stats.drift.scripts.PrepareData
13+
import ai.chronon.spark.utils.InMemoryKvStore
14+
import ai.chronon.spark.utils.MockApi
15+
import org.rogach.scallop.ScallopConf
16+
import org.rogach.scallop.ScallopOption
17+
import org.slf4j.Logger
18+
import org.slf4j.LoggerFactory
19+
20+
object ObservabilityDemoDataLoader {
21+
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
22+
23+
def time(message: String)(block: => Unit): Unit = {
24+
logger.info(s"$message..".yellow)
25+
val start = System.currentTimeMillis()
26+
block
27+
val end = System.currentTimeMillis()
28+
logger.info(s"$message took ${end - start} ms".green)
29+
}
30+
31+
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
32+
val startDs: ScallopOption[String] = opt[String](
33+
name = "start-ds",
34+
default = Some("2023-01-01"),
35+
descr = "Start date in YYYY-MM-DD format"
36+
)
37+
38+
val endDs: ScallopOption[String] = opt[String](
39+
name = "end-ds",
40+
default = Some("2023-02-30"),
41+
descr = "End date in YYYY-MM-DD format"
42+
)
43+
44+
val rowCount: ScallopOption[Int] = opt[Int](
45+
name = "row-count",
46+
default = Some(700000),
47+
descr = "Number of rows to generate"
48+
)
49+
50+
val namespace: ScallopOption[String] = opt[String](
51+
name = "namespace",
52+
default = Some("observability_demo"),
53+
descr = "Namespace for the demo"
54+
)
55+
56+
verify()
57+
}
58+
59+
def main(args: Array[String]): Unit = {
60+
61+
val config = new Conf(args)
62+
val startDs = config.startDs()
63+
val endDs = config.endDs()
64+
val rowCount = config.rowCount()
65+
val namespace = config.namespace()
66+
67+
val spark = SparkSessionBuilder.build(namespace, local = true)
68+
implicit val tableUtils: TableUtils = TableUtils(spark)
69+
tableUtils.createDatabase(namespace)
70+
71+
// generate anomalous data (join output)
72+
val prepareData = PrepareData(namespace)
73+
val join = prepareData.generateAnomalousFraudJoin
74+
75+
time("Preparing data") {
76+
val df = prepareData.generateFraudSampleData(rowCount, startDs, endDs, join.metaData.loggedTable)
77+
df.show(10, truncate = false)
78+
}
79+
80+
// mock api impl for online fetching and uploading
81+
val inMemKvStoreFunc: () => KVStore = () => {
82+
// cannot reuse the variable - or serialization error
83+
val result = InMemoryKvStore.build(namespace, () => null)
84+
result
85+
}
86+
val inMemoryApi = new MockApi(inMemKvStoreFunc, namespace)
87+
88+
time("Summarizing data") {
89+
// compute summary table and packed table (for uploading)
90+
Summarizer.compute(inMemoryApi, join.metaData, ds = endDs, useLogs = true)
91+
}
92+
93+
val packedTable = join.metaData.packedSummaryTable
94+
95+
// create necessary tables in kvstore - we now publish to the HTTP KV store as we need this available to the Hub
96+
val httpKvStoreFunc: () => KVStore = () => {
97+
// cannot reuse the variable - or serialization error
98+
val result = new HTTPKVStore()
99+
result
100+
}
101+
val hubApi = new MockApi(httpKvStoreFunc, namespace)
102+
103+
val kvStore = hubApi.genKvStore
104+
kvStore.create(Constants.MetadataDataset)
105+
kvStore.create(Constants.TiledSummaryDataset)
106+
107+
// upload join conf
108+
hubApi.buildFetcher().putJoinConf(join)
109+
110+
time("Uploading summaries") {
111+
val uploader = new SummaryUploader(tableUtils.loadTable(packedTable), hubApi)
112+
uploader.run()
113+
}
114+
115+
println("Done uploading summaries! \uD83E\uDD73".green)
116+
// clean up spark session and force jvm exit
117+
spark.stop()
118+
System.exit(0)
119+
}
120+
}

spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import ai.chronon.api._
66
import ai.chronon.online.Api
77
import ai.chronon.online.KVStore.GetRequest
88
import ai.chronon.online.KVStore.PutRequest
9-
import ai.chronon.online.stats.DriftStore.compactSerializer
9+
import ai.chronon.online.stats.DriftStore.binarySerializer
1010
import ai.chronon.spark.TableUtils
1111
import ai.chronon.spark.stats.drift.Expressions.CardinalityExpression
1212
import ai.chronon.spark.stats.drift.Expressions.SummaryExpression
@@ -322,9 +322,10 @@ class SummaryPacker(confPath: String,
322322
val func: sql.Row => Seq[TileRow] =
323323
Expressions.summaryPopulatorFunc(summaryExpressions, df.schema, keyBuilder, tu.partitionColumn)
324324

325-
val serializer = compactSerializer
326325
val packedRdd: RDD[sql.Row] = df.rdd.flatMap(func).map { tileRow =>
327326
// pack into bytes
327+
val serializer = binarySerializer.get()
328+
328329
val partition = tileRow.partition
329330
val timestamp = tileRow.tileTs
330331
val summaries = tileRow.summaries

0 commit comments

Comments
 (0)