Skip to content

Commit fafcffb

Browse files
chore: pull metrics into its own target (#592)
## Summary - Metrics is actually used in many places, so let's pull it into its own slim target first. - Follow up in the spark offline side to further reduce the dep on online. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Launched a dedicated metrics library to enhance performance monitoring and reporting. - **Refactor** - Consolidated and updated metrics imports and context handling across services for improved consistency. - **Tests** - Aligned test configurations with the new metrics framework to ensure robust validation. These improvements streamline performance tracking and dependency management internally while bolstering the overall reliability of the application. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent b1591cd commit fafcffb

32 files changed

+155
-104
lines changed

cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import ai.chronon.online.KVStore.ListRequest
99
import ai.chronon.online.KVStore.ListResponse
1010
import ai.chronon.online.KVStore.ListValue
1111
import ai.chronon.online.KVStore.TimedValue
12-
import ai.chronon.online.Metrics
13-
import ai.chronon.online.Metrics.Context
12+
import ai.chronon.online.metrics.Metrics.Context
13+
import ai.chronon.online.metrics.Metrics
1414
import com.google.common.util.concurrent.RateLimiter
1515
import software.amazon.awssdk.core.SdkBytes
1616
import software.amazon.awssdk.services.dynamodb.DynamoDbClient

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import ai.chronon.online.KVStore
1313
import ai.chronon.online.KVStore.ListRequest
1414
import ai.chronon.online.KVStore.ListResponse
1515
import ai.chronon.online.KVStore.ListValue
16-
import ai.chronon.online.Metrics
16+
import ai.chronon.online.metrics.Metrics
1717
import com.google.cloud.RetryOption
1818
import com.google.cloud.bigquery.BigQuery
1919
import com.google.cloud.bigquery.BigQueryErrorMessages

online/BUILD.bazel

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
scala_library(
2+
name = "metrics_lib",
3+
srcs = glob(["src/main/scala/ai/chronon/online/metrics/*.scala"]),
4+
format = select({
5+
"//tools/config:scala_2_13": False, # Disable for 2.13
6+
"//conditions:default": True, # Enable for other versions
7+
}),
8+
visibility = ["//visibility:public"],
9+
deps = [
10+
"//api:lib",
11+
"//api:thrift_java",
12+
maven_artifact("com.datadoghq:java-dogstatsd-client"),
13+
maven_artifact("org.slf4j:slf4j-api"),
14+
maven_artifact("org.apache.logging.log4j:log4j-api"),
15+
maven_artifact("org.apache.logging.log4j:log4j-core"),
16+
],
17+
)
18+
119
scala_library(
220
name = "lib",
321
srcs = glob(["src/main/**/*.scala"]) + glob(["src/main/**/*.java"]),
@@ -7,6 +25,7 @@ scala_library(
725
}),
826
visibility = ["//visibility:public"],
927
deps = [
28+
":metrics_lib",
1029
"//aggregator:lib",
1130
"//api:lib",
1231
"//api:thrift_java",

online/src/main/java/ai/chronon/online/JavaFetcher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import scala.concurrent.Future;
2828
import scala.concurrent.ExecutionContext;
2929
import scala.util.Try;
30+
import ai.chronon.online.metrics.Metrics;
3031

3132
import java.util.ArrayList;
3233
import java.util.List;

online/src/main/scala/ai/chronon/online/Api.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ object KVStore {
6060
// used for streaming writes, batch bulk uploads & fetching
6161
trait KVStore {
6262
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
63-
implicit val executionContext: ExecutionContext = FlexibleExecutionContext.buildExecutionContext
63+
implicit val executionContext: ExecutionContext = metrics.FlexibleExecutionContext.buildExecutionContext
6464
def create(dataset: String): Unit
6565

6666
def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)
@@ -205,7 +205,7 @@ trait StreamBuilder {
205205
}
206206

207207
object ExternalSourceHandler {
208-
private[ExternalSourceHandler] val executor = FlexibleExecutionContext.buildExecutionContext
208+
private[ExternalSourceHandler] val executor = metrics.FlexibleExecutionContext.buildExecutionContext
209209
}
210210

211211
// user facing class that needs to be implemented for external sources defined in a join

online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@ package ai.chronon.online
1818

1919
import ai.chronon.api.Constants
2020
import ai.chronon.online.fetcher.Fetcher.{Request, Response}
21-
22-
import scala.collection.Seq
23-
import scala.collection.mutable
24-
import scala.concurrent.ExecutionContext
25-
import scala.concurrent.Future
26-
import scala.util.Failure
27-
import scala.util.Success
21+
import ai.chronon.online.metrics.Metrics.Context
22+
import scala.collection.{Seq, mutable}
23+
import scala.concurrent.{ExecutionContext, Future}
24+
import scala.util.{Failure, Success}
2825

2926
// users can simply register external endpoints with a lambda that can return the future of a response given keys
3027
// keys and values need to match schema in ExternalSource - chronon will validate automatically
@@ -53,8 +50,7 @@ class ExternalSourceRegistry extends Serializable {
5350
// 1. keys match
5451
// 2. report missing & extra values
5552
// 3. schema integrity of returned values
56-
def fetchRequests(requests: Seq[Request], context: Metrics.Context)(implicit
57-
ec: ExecutionContext): Future[Seq[Response]] = {
53+
def fetchRequests(requests: Seq[Request], context: Context)(implicit ec: ExecutionContext): Future[Seq[Response]] = {
5854
val startTime = System.currentTimeMillis()
5955
// we make issue one batch request per external source and flatten out it later
6056
val responsesByNameF: List[Future[Seq[Response]]] = requests

online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package ai.chronon.online.fetcher
22
import ai.chronon.api.Constants.MetadataDataset
33
import ai.chronon.api.ScalaJavaConversions.JMapOps
4-
import ai.chronon.online.{FlagStore, FlagStoreConstants, FlexibleExecutionContext, KVStore}
4+
import ai.chronon.online.metrics.FlexibleExecutionContext
5+
import ai.chronon.online.{FlagStore, FlagStoreConstants, KVStore}
56

67
import scala.concurrent.ExecutionContext
78

online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import ai.chronon.api
2121
import ai.chronon.api.Constants.UTF8
2222
import ai.chronon.api.Extensions.{ExternalPartOps, JoinOps, StringOps, ThrowableOps}
2323
import ai.chronon.api._
24-
import ai.chronon.online.Metrics.Environment
2524
import ai.chronon.online.OnlineDerivationUtil.applyDeriveFunc
25+
import ai.chronon.online._
2626
import ai.chronon.online.fetcher.Fetcher.{JoinSchemaResponse, Request, Response, ResponseWithContext}
27-
import ai.chronon.online.{serde, _}
27+
import ai.chronon.online.metrics.{Metrics, TTLCache}
2828
import com.google.gson.Gson
2929
import com.timgroup.statsd.Event
3030
import com.timgroup.statsd.Event.AlertType
@@ -41,10 +41,13 @@ import scala.concurrent.{ExecutionContext, Future}
4141
import scala.util.{Failure, Success, Try}
4242

4343
object Fetcher {
44+
45+
import ai.chronon.online.metrics
46+
4447
case class Request(name: String,
4548
keys: Map[String, AnyRef],
4649
atMillis: Option[Long] = None,
47-
context: Option[Metrics.Context] = None)
50+
context: Option[metrics.Metrics.Context] = None)
4851

4952
case class PrefixedRequest(prefix: String, request: Request)
5053
case class Response(request: Request, values: Try[Map[String, AnyRef]])
@@ -59,17 +62,18 @@ object Fetcher {
5962
prefix: Option[String],
6063
keyMapping: Option[Map[String, AnyRef]])
6164

62-
def logResponseStats(response: Response, context: Metrics.Context): Unit = {
65+
def logResponseStats(response: Response, context: metrics.Metrics.Context): Unit = {
66+
import ai.chronon.online.metrics
6367
val responseMap = response.values.get
6468
var exceptions = 0
6569
var nulls = 0
6670
responseMap.foreach { case (_, v) =>
6771
if (v == null) nulls += 1
6872
else if (v.isInstanceOf[Throwable]) exceptions += 1
6973
}
70-
context.distribution(Metrics.Name.FetchNulls, nulls)
71-
context.distribution(Metrics.Name.FetchExceptions, exceptions)
72-
context.distribution(Metrics.Name.FetchCount, responseMap.size)
74+
context.distribution(metrics.Metrics.Name.FetchNulls, nulls)
75+
context.distribution(metrics.Metrics.Name.FetchExceptions, exceptions)
76+
context.distribution(metrics.Metrics.Name.FetchCount, responseMap.size)
7377
}
7478

7579
/** Response for a join schema request
@@ -107,7 +111,7 @@ class Fetcher(val kvStore: KVStore,
107111
private def reportCallerNameFetcherVersion(): Unit = {
108112
val message =
109113
s"CallerName: ${Option(callerName).getOrElse("N/A")}, FetcherVersion: ${BuildInfo.version}"
110-
val ctx = Metrics.Context(Environment.Fetcher)
114+
val ctx = Metrics.Context(Metrics.Environment.Fetcher)
111115
val event = Event
112116
.builder()
113117
.withTitle("FetcherInitialization")
@@ -141,6 +145,7 @@ class Fetcher(val kvStore: KVStore,
141145
val combinedResponsesF =
142146
internalResponsesF.zip(externalResponsesF).map { case (internalResponses, externalResponses) =>
143147
internalResponses.zip(externalResponses).map { case (internalResponse, externalResponse) =>
148+
import ai.chronon.online.metrics
144149
if (debug) {
145150
logger.info(internalResponse.values.get.keys.toSeq.mkString(","))
146151
logger.info(externalResponse.values.get.keys.toSeq.mkString(","))
@@ -161,7 +166,7 @@ class Fetcher(val kvStore: KVStore,
161166
Map("external_part_fetch_exception" -> externalResponse.values.failed.get.traceString))
162167
val derivationStartTs = System.currentTimeMillis()
163168
val joinName = internalResponse.request.name
164-
val ctx = Metrics.Context(Environment.JoinFetching, join = joinName)
169+
val ctx = Metrics.Context(Metrics.Environment.JoinFetching, join = joinName)
165170
val joinCodecTry = joinCodecCache(internalResponse.request.name)
166171
joinCodecTry match {
167172
case Success(joinCodec) =>
@@ -323,6 +328,7 @@ class Fetcher(val kvStore: KVStore,
323328

324329
// Pulling external features in a batched fashion across services in-parallel
325330
private def fetchExternal(joinRequests: Seq[Request]): Future[Seq[Response]] = {
331+
import ai.chronon.online.metrics
326332
val startTime = System.currentTimeMillis()
327333
val resultMap = new mutable.LinkedHashMap[Request, Try[mutable.HashMap[String, Any]]]
328334
var invalidCount = 0
@@ -374,7 +380,7 @@ class Fetcher(val kvStore: KVStore,
374380
.toMap
375381

376382
val context =
377-
Metrics.Context(environment = Environment.JoinFetching,
383+
Metrics.Context(environment = Metrics.Environment.JoinFetching,
378384
join = validRequests.iterator.map(_.name.sanitize).toSeq.distinct.mkString(","))
379385
context.distribution("response.external_pre_processing.latency", System.currentTimeMillis() - startTime)
380386
context.count("response.external_invalid_joins.count", invalidCount)
@@ -417,7 +423,7 @@ class Fetcher(val kvStore: KVStore,
417423
// step-4 convert the resultMap into Responses
418424
joinRequests.map { req =>
419425
Metrics
420-
.Context(Environment.JoinFetching, join = req.name)
426+
.Context(Metrics.Environment.JoinFetching, join = req.name)
421427
.distribution("external.latency.millis", System.currentTimeMillis() - startTime)
422428
Response(req, resultMap(req).map(_.mapValues(_.asInstanceOf[AnyRef]).toMap))
423429
}
@@ -426,7 +432,7 @@ class Fetcher(val kvStore: KVStore,
426432

427433
def fetchJoinSchema(joinName: String): Try[JoinSchemaResponse] = {
428434
val startTime = System.currentTimeMillis()
429-
val ctx = Metrics.Context(Environment.JoinSchemaFetching, join = joinName)
435+
val ctx = Metrics.Context(Metrics.Environment.JoinSchemaFetching, join = joinName)
430436

431437
val joinCodecTry = joinCodecCache(joinName)
432438

@@ -470,7 +476,8 @@ class Fetcher(val kvStore: KVStore,
470476
private case class ExternalToJoinRequest(externalRequest: Either[Request, KeyMissingException],
471477
joinRequest: Request,
472478
part: ExternalPart) {
479+
473480
lazy val context: Metrics.Context =
474-
Metrics.Context(Environment.JoinFetching, join = joinRequest.name, groupBy = part.fullName)
481+
Metrics.Context(Metrics.Environment.JoinFetching, join = joinRequest.name, groupBy = part.fullName)
475482
}
476483
}

online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import ai.chronon.aggregator.windowing.FinalBatchIr
44
import ai.chronon.api.GroupBy
55
import ai.chronon.online.KVStore.{GetRequest, TimedValue}
66
import ai.chronon.online.fetcher.FetcherCache._
7-
import ai.chronon.online.{GroupByServingInfoParsed, Metrics}
7+
import ai.chronon.online.GroupByServingInfoParsed
8+
import ai.chronon.online.metrics.Metrics
89
import com.github.benmanes.caffeine.cache.{Cache => CaffeineCache}
910
import org.slf4j.{Logger, LoggerFactory}
1011

online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import ai.chronon.api.Extensions._
66
import ai.chronon.api._
77
import ai.chronon.online.KVStore.{GetRequest, GetResponse, TimedValue}
88
import ai.chronon.online.OnlineDerivationUtil.{applyDeriveFunc, buildRenameOnlyDerivationFunction}
9-
import ai.chronon.online._
9+
import ai.chronon.online.{metrics, _}
1010
import ai.chronon.online.fetcher.Fetcher.{ColumnSpec, PrefixedRequest, Request, Response}
1111
import ai.chronon.online.fetcher.FetcherCache.{BatchResponses, CachedBatchResponse}
1212
import org.slf4j.{Logger, LoggerFactory}
@@ -45,7 +45,8 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
4545
.getGroupByServingInfo(request.name)
4646
.map { groupByServingInfo =>
4747
val context =
48-
request.context.getOrElse(Metrics.Context(Metrics.Environment.GroupByFetching, groupByServingInfo.groupBy))
48+
request.context.getOrElse(
49+
metrics.Metrics.Context(metrics.Metrics.Environment.GroupByFetching, groupByServingInfo.groupBy))
4950
context.increment("group_by_request.count")
5051
var batchKeyBytes: Array[Byte] = null
5152
var streamingKeyBytes: Array[Byte] = null
@@ -338,4 +339,4 @@ case class LambdaKvRequest(groupByServingInfoParsed: GroupByServingInfoParsed,
338339
batchRequest: GetRequest,
339340
streamingRequestOpt: Option[GetRequest],
340341
endTs: Option[Long],
341-
context: Metrics.Context)
342+
context: metrics.Metrics.Context)

online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ import ai.chronon.aggregator.windowing.{FinalBatchIr, SawtoothOnlineAggregator,
44
import ai.chronon.api.Extensions.WindowOps
55
import ai.chronon.api.ScalaJavaConversions.{IteratorOps, JMapOps}
66
import ai.chronon.api.{DataModel, Row, Window}
7-
import ai.chronon.online.{AvroConversions, GroupByServingInfoParsed, Metrics}
7+
import ai.chronon.online.{AvroConversions, GroupByServingInfoParsed}
88
import ai.chronon.online.KVStore.TimedValue
9-
import ai.chronon.online.Metrics.Name
9+
import ai.chronon.online.metrics.Metrics.Name
1010
import ai.chronon.online.fetcher.FetcherCache.{BatchResponses, CachedBatchResponse, KvStoreBatchResponse}
11+
import ai.chronon.online.metrics.Metrics
1112
import com.google.gson.Gson
1213
import org.slf4j.{Logger, LoggerFactory}
1314

online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,21 @@ class JoinPartFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
6161
val joinDecomposed: Seq[(Request, Try[Seq[Either[PrefixedRequest, KeyMissingException]]])] =
6262
requests.map { request =>
6363
// use passed-in join or fetch one
64+
import ai.chronon.online.metrics
6465
val joinTry: Try[JoinOps] = joinConf
6566
.map(conf => Success(JoinOps(conf)))
6667
.getOrElse(metadataStore.getJoinConf(request.name))
6768

68-
var joinContext: Option[Metrics.Context] = None
69+
var joinContext: Option[metrics.Metrics.Context] = None
6970

7071
val decomposedTry = joinTry.map { join =>
71-
joinContext = Some(Metrics.Context(Metrics.Environment.JoinFetching, join.join))
72+
import ai.chronon.online.metrics
73+
joinContext = Some(metrics.Metrics.Context(metrics.Metrics.Environment.JoinFetching, join.join))
7274
joinContext.get.increment("join_request.count")
7375

7476
join.joinPartOps.map { part =>
75-
val joinContextInner = Metrics.Context(joinContext.get, part)
77+
import ai.chronon.online.metrics
78+
val joinContextInner = metrics.Metrics.Context(joinContext.get, part)
7679
val missingKeys = part.leftToRight.keys.filterNot(request.keys.contains)
7780

7881
if (missingKeys.nonEmpty) {

online/src/main/scala/ai/chronon/online/fetcher/LRUCache.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package ai.chronon.online.fetcher
22

3-
import ai.chronon.online.Metrics
3+
import ai.chronon.online.metrics.Metrics
44
import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CaffeineCache}
55
import org.slf4j.{Logger, LoggerFactory}
66

0 commit comments

Comments
 (0)