Skip to content

Commit 9aeb4d2

Browse files
fmt
Co-authored-by: Thomas Chow <[email protected]>
1 parent d3ed8e5 commit 9aeb4d2

File tree

5 files changed

+25
-24
lines changed

5 files changed

+25
-24
lines changed

online/src/main/scala/ai/chronon/online/ExternalSourceRegistry.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ class ExternalSourceRegistry extends Serializable {
5050
// 1. keys match
5151
// 2. report missing & extra values
5252
// 3. schema integrity of returned values
53-
def fetchRequests(requests: Seq[Request], context: Context)(implicit
54-
ec: ExecutionContext): Future[Seq[Response]] = {
53+
def fetchRequests(requests: Seq[Request], context: Context)(implicit ec: ExecutionContext): Future[Seq[Response]] = {
5554
val startTime = System.currentTimeMillis()
5655
// we make issue one batch request per external source and flatten out it later
5756
val responsesByNameF: List[Future[Seq[Response]]] = requests

online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ class Fetcher(val kvStore: KVStore,
433433

434434
def fetchJoinSchema(joinName: String): Try[JoinSchemaResponse] = {
435435
val startTime = System.currentTimeMillis()
436-
val ctx =Metrics.Context(Metrics.Environment.JoinSchemaFetching, join = joinName)
436+
val ctx = Metrics.Context(Metrics.Environment.JoinSchemaFetching, join = joinName)
437437

438438
val joinCodecTry = joinCodecCache(joinName)
439439

@@ -478,7 +478,6 @@ class Fetcher(val kvStore: KVStore,
478478
joinRequest: Request,
479479
part: ExternalPart) {
480480

481-
482481
lazy val context: Metrics.Context =
483482
Metrics.Context(Metrics.Environment.JoinFetching, join = joinRequest.name, groupBy = part.fullName)
484483
}

online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
4646
.map { groupByServingInfo =>
4747
import ai.chronon.online.metrics
4848
val context =
49-
request.context.getOrElse(metrics.Metrics.Context(metrics.Metrics.Environment.GroupByFetching, groupByServingInfo.groupBy))
49+
request.context.getOrElse(
50+
metrics.Metrics.Context(metrics.Metrics.Environment.GroupByFetching, groupByServingInfo.groupBy))
5051
context.increment("group_by_request.count")
5152
var batchKeyBytes: Array[Byte] = null
5253
var streamingKeyBytes: Array[Byte] = null

online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ class MetadataStore(fetchContext: FetchContext) {
123123
}
124124
},
125125
{ team =>
126-
Metrics.Context(environment = "group_by.list.fetch", groupBy = team) }
126+
Metrics.Context(environment = "group_by.list.fetch", groupBy = team)
127+
}
127128
)
128129
}
129130

@@ -137,8 +138,10 @@ class MetadataStore(fetchContext: FetchContext) {
137138
throw e
138139
}
139140
},
140-
{ team => import ai.chronon.online.metrics
141-
metrics.Metrics.Context(environment = "join.list.fetch", groupBy = team) }
141+
{ team =>
142+
import ai.chronon.online.metrics
143+
metrics.Metrics.Context(environment = "join.list.fetch", groupBy = team)
144+
}
142145
)
143146
}
144147

@@ -167,8 +170,10 @@ class MetadataStore(fetchContext: FetchContext) {
167170
.distribution(metrics.Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs)
168171
result
169172
},
170-
{ join => import ai.chronon.online.metrics
171-
metrics.Metrics.Context(environment = "join.meta.fetch", join = join) }
173+
{ join =>
174+
import ai.chronon.online.metrics
175+
metrics.Metrics.Context(environment = "join.meta.fetch", join = join)
176+
}
172177
)
173178

174179
def putJoinConf(join: Join): Unit = {
@@ -252,8 +257,10 @@ class MetadataStore(fetchContext: FetchContext) {
252257

253258
new TTLCache[String, Try[JoinCodec]](
254259
codecBuilder,
255-
{ join: String => import ai.chronon.online.metrics
256-
metrics.Metrics.Context(environment = "join.codec.fetch", join = join) },
260+
{ join: String =>
261+
import ai.chronon.online.metrics
262+
metrics.Metrics.Context(environment = "join.codec.fetch", join = join)
263+
},
257264
onCreateFunc = onCreateFunc
258265
)
259266
}
@@ -374,8 +381,10 @@ class MetadataStore(fetchContext: FetchContext) {
374381
Success(new GroupByServingInfoParsed(groupByServingInfo, partitionSpec))
375382
}
376383
},
377-
{ gb => import ai.chronon.online.metrics
378-
metrics.Metrics.Context(environment = "group_by.serving_info.fetch", groupBy = gb) }
384+
{ gb =>
385+
import ai.chronon.online.metrics
386+
metrics.Metrics.Context(environment = "group_by.serving_info.fetch", groupBy = gb)
387+
}
379388
)
380389

381390
def put(

spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSe
3030
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
3131
import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType}
3232
import org.slf4j.{Logger, LoggerFactory}
33+
import ai.chronon.online.metrics
3334

3435
import java.time.{Instant, ZoneId, ZoneOffset}
3536
import java.time.format.DateTimeFormatter
@@ -64,11 +65,10 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
6465
apiImpl: Api)
6566
extends Serializable {
6667

67-
import ai.chronon.online.metrics
68-
6968
@transient implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
7069

71-
val context: metrics.Metrics.Context = metrics.Metrics.Context(metrics.Metrics.Environment.GroupByStreaming, groupByConf)
70+
val context: metrics.Metrics.Context =
71+
metrics.Metrics.Context(metrics.Metrics.Environment.GroupByStreaming, groupByConf)
7272

7373
private case class Schemas(leftStreamSchema: StructType,
7474
leftSourceSchema: StructType,
@@ -123,7 +123,6 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
123123
private val streamingDataset: String = groupByConf.streamingDataset
124124

125125
def toPutRequest(input: Row): KVStore.PutRequest = {
126-
import ai.chronon.online.metrics
127126
val keys = keyIndices.map(input.get)
128127
val values = valueIndices.map(input.get)
129128

@@ -230,7 +229,6 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
230229
val deserialized: Dataset[Mutation] = df
231230
.as[Array[Byte]]
232231
.map { arr =>
233-
import ai.chronon.online.metrics
234232
ingressContext.increment(metrics.Metrics.Name.RowCount)
235233
ingressContext.count(metrics.Metrics.Name.Bytes, arr.length)
236234
try {
@@ -289,7 +287,6 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
289287
}
290288

291289
def chainedStreamingQuery: DataStreamWriter[Row] = {
292-
import ai.chronon.online.metrics
293290
val joinSource = groupByConf.streamingSource.get.getJoinSource
294291
val left = joinSource.join.left
295292
val topic = TopicInfo.parse(left.topic)
@@ -357,7 +354,6 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
357354

358355
val rowsScala = rows.toScala.toArray
359356
val requests = rowsScala.map { row =>
360-
import ai.chronon.online.metrics
361357
val keyMap = row.getValuesMap[AnyRef](leftColumns)
362358
val eventTs = row.get(leftTimeIndex).asInstanceOf[Long]
363359
context.distribution(metrics.Metrics.Name.LagMillis, System.currentTimeMillis() - eventTs)
@@ -368,12 +364,10 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
368364
val microBatchTimestamp =
369365
percentile(rowsScala.map(_.get(leftTimeIndex).asInstanceOf[Long]), timePercentile)
370366
if (microBatchTimestamp.isDefined) {
371-
import ai.chronon.online.metrics
372367
val microBatchLag = System.currentTimeMillis() - microBatchTimestamp.get
373368
context.distribution(metrics.Metrics.Name.BatchLagMillis, microBatchLag)
374369

375370
if (minimumQueryDelayMs > 0 && microBatchLag >= 0 && microBatchLag < minimumQueryDelayMs) {
376-
import ai.chronon.online.metrics
377371
val sleepMillis = minimumQueryDelayMs - microBatchLag
378372
Thread.sleep(sleepMillis)
379373
context.distribution(metrics.Metrics.Name.QueryDelaySleepMillis, sleepMillis)
@@ -416,7 +410,6 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
416410

417411
def emitRequestMetric(request: PutRequest, context: metrics.Metrics.Context): Unit = {
418412
request.tsMillis.foreach { ts: Long =>
419-
import ai.chronon.online.metrics
420413
context.distribution(metrics.Metrics.Name.FreshnessMillis, System.currentTimeMillis() - ts)
421414
context.increment(metrics.Metrics.Name.RowCount)
422415
context.distribution(metrics.Metrics.Name.ValueBytes, request.valueBytes.length)

0 commit comments

Comments
 (0)