Skip to content

Commit 302b496

Browse files
authored
fix: refresh ttlcache upon failures (#964)
1 parent 7ec719b commit 302b496

File tree

9 files changed

+189
-58
lines changed

9 files changed

+189
-58
lines changed

online/src/main/scala/ai/chronon/online/Api.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,27 +58,29 @@ trait KVStore {
5858

5959
// helper method to blocking read a string - used for fetching metadata & not in hotpath.
6060
def getString(key: String, dataset: String, timeoutMillis: Long): Try[String] = {
61-
val response = getResponse(key, dataset, timeoutMillis)
62-
if (response.values.isFailure) {
63-
Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get))
64-
} else {
65-
Success(new String(response.latest.get.bytes, Constants.UTF8))
66-
}
61+
val bytesTry = getResponse(key, dataset, timeoutMillis)
62+
bytesTry.map(bytes => new String(bytes, Constants.UTF8))
6763
}
6864

6965
def getStringArray(key: String, dataset: String, timeoutMillis: Long): Try[Seq[String]] = {
70-
val response = getResponse(key, dataset, timeoutMillis)
71-
if (response.values.isFailure) {
72-
Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get))
73-
} else {
74-
Success(StringArrayConverter.bytesToStrings(response.latest.get.bytes))
75-
}
66+
val bytesTry = getResponse(key, dataset, timeoutMillis)
67+
bytesTry.map(bytes => StringArrayConverter.bytesToStrings(bytes))
7668
}
7769

78-
private def getResponse(key: String, dataset: String, timeoutMillis: Long): GetResponse = {
70+
private def getResponse(key: String, dataset: String, timeoutMillis: Long): Try[Array[Byte]] = {
7971
val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset)
8072
val responseFutureOpt = get(fetchRequest)
81-
Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))
73+
def buildException(e: Throwable) = new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", e)
74+
Try(Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))) match {
75+
case Failure(e) =>
76+
Failure(buildException(e))
77+
case Success(resp) =>
78+
if (resp.values.isFailure) {
79+
Failure(buildException(resp.values.failed.get))
80+
} else {
81+
Success(resp.latest.get.bytes)
82+
}
83+
}
8284
}
8385
def get(request: GetRequest): Future[GetResponse] = {
8486
multiGet(Seq(request))

online/src/main/scala/ai/chronon/online/Fetcher.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,14 @@ class Fetcher(val kvStore: KVStore,
203203
lazy val getJoinCodecs = new TTLCache[String, Try[(JoinCodec, Boolean)]](
204204
{ joinName: String =>
205205
val startTimeMs = System.currentTimeMillis()
206-
val result: Try[(JoinCodec, Boolean)] = getJoinConf(joinName)
207-
.map(_.join)
208-
.map(join => buildJoinCodec(join, refreshOnFail = true))
209-
.recoverWith {
206+
val result: Try[(JoinCodec, Boolean)] =
207+
try {
208+
getJoinConf(joinName)
209+
.map(_.join)
210+
.map(join => buildJoinCodec(join, refreshOnFail = true))
211+
} catch {
210212
case th: Throwable =>
213+
getJoinConf.refresh(joinName)
211214
Failure(
212215
new RuntimeException(
213216
s"Couldn't fetch joinName = ${joinName} or build join codec due to ${th.traceString}",
@@ -317,6 +320,7 @@ class Fetcher(val kvStore: KVStore,
317320
response
318321
case Failure(exception) =>
319322
// more validation logic will be covered in compile.py to avoid this case
323+
getJoinCodecs.refresh(joinName)
320324
ctx.incrementException(exception)
321325
ResponseWithContext(internalResponse.request,
322326
Map("join_codec_fetch_exception" -> exception.traceString),
@@ -490,6 +494,7 @@ class Fetcher(val kvStore: KVStore,
490494
val joinName = request.name
491495
val joinConfTry: Try[JoinOps] = getJoinConf(request.name)
492496
if (joinConfTry.isFailure) {
497+
getJoinConf.refresh(request.name)
493498
resultMap.update(
494499
request,
495500
Failure(
@@ -509,8 +514,11 @@ class Fetcher(val kvStore: KVStore,
509514
// step-2 dedup external requests across joins
510515
val externalToJoinRequests: Seq[ExternalToJoinRequest] = validRequests
511516
.flatMap { joinRequest =>
512-
val parts =
513-
getJoinConf(joinRequest.name).get.join.onlineExternalParts // cheap since it is cached, valid since step-1
517+
val joinConf = getJoinConf(joinRequest.name)
518+
if (joinConf.isFailure) {
519+
getJoinConf.refresh(joinRequest.name)
520+
}
521+
val parts = joinConf.get.join.onlineExternalParts // cheap since it is cached, valid since step-1
514522
parts.iterator().asScala.map { part =>
515523
val externalRequest = Try(part.applyMapping(joinRequest.keys)) match {
516524
case Success(mappedKeys) => Left(Request(part.source.metadata.name, mappedKeys))

online/src/main/scala/ai/chronon/online/FetcherBase.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,15 @@ class FetcherBase(kvStore: KVStore,
351351
def fetchGroupBys(requests: scala.collection.Seq[Request]): Future[scala.collection.Seq[Response]] = {
352352
// split a groupBy level request into its kvStore level requests
353353
val groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])] = requests.iterator.map { request =>
354-
val groupByRequestMetaTry: Try[GroupByRequestMeta] = getGroupByServingInfo(request.name)
354+
val groupByServingInfoTry = getGroupByServingInfo(request.name)
355+
.recover {
356+
case ex: Throwable =>
357+
getGroupByServingInfo.refresh(request.name)
358+
logger.error(s"Couldn't fetch GroupByServingInfo for ${request.name}", ex)
359+
request.context.foreach(_.incrementException(ex))
360+
throw ex
361+
}
362+
val groupByRequestMetaTry: Try[GroupByRequestMeta] = groupByServingInfoTry
355363
.map { groupByServingInfo =>
356364
val context =
357365
request.context.getOrElse(Metrics.Context(Metrics.Environment.GroupByFetching, groupByServingInfo.groupBy))
@@ -583,7 +591,11 @@ class FetcherBase(kvStore: KVStore,
583591
requests.map { request =>
584592
// get join conf from metadata store if not passed in
585593
val joinTry: Try[JoinOps] = if (joinConf.isEmpty) {
586-
getJoinConf(request.name)
594+
val joinConfTry = getJoinConf(request.name)
595+
if (joinConfTry.isFailure) {
596+
getJoinConf.refresh(request.name)
597+
}
598+
joinConfTry
587599
} else {
588600
logger.debug(s"Using passed in join configuration: ${joinConf.get.metaData.getName}")
589601
Success(JoinOps(joinConf.get))
@@ -671,7 +683,7 @@ class FetcherBase(kvStore: KVStore,
671683
if (debug || Math.random() < 0.001) {
672684
logger.error(s"Failed to fetch $groupByRequest", ex)
673685
}
674-
Map(groupByRequest.name + "_exception" -> ex.traceString)
686+
Map(prefix + "_exception" -> ex.traceString)
675687
}
676688
.get
677689
}

online/src/main/scala/ai/chronon/online/FlexibleExecutionContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
2222
object FlexibleExecutionContext {
2323
def buildExecutor: ThreadPoolExecutor = buildExecutor()
2424
def buildExecutionContext: ExecutionContextExecutor = buildExecutionContext()
25+
2526
// users can also provide a custom execution context override in [MetadataStore]
2627
def buildExecutor(corePoolSize: Int = 20,
2728
maxPoolSize: Int = 1000,

online/src/main/scala/ai/chronon/online/MetadataStore.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class MetadataStore(kvStore: KVStore,
134134
if (result.isSuccess) Metrics.Context(Metrics.Environment.MetaDataFetching, result.get.join)
135135
else Metrics.Context(Metrics.Environment.MetaDataFetching, join = name)
136136
// Throw exception after metrics. No join metadata is bound to be a critical failure.
137+
// This will ensure that a Failure is never cached in the getJoinConf TTLCache
137138
if (result.isFailure) {
138139
context.withSuffix("join").incrementException(result.failed.get)
139140
throw result.failed.get
@@ -147,6 +148,7 @@ class MetadataStore(kvStore: KVStore,
147148
def validateJoinExist(team: String, name: String): Boolean = {
148149
val activeJoinList: Try[Seq[String]] = getJoinListByTeam(team)
149150
if (activeJoinList.isFailure) {
151+
getJoinListByTeam.refresh(team)
150152
logger.error(s"Failed to fetch active join list for team $team")
151153
false
152154
} else {
@@ -164,6 +166,7 @@ class MetadataStore(kvStore: KVStore,
164166
def validateGroupByExist(team: String, name: String): Boolean = {
165167
val activeGroupByList: Try[Seq[String]] = getGroupByListByTeam(team)
166168
if (activeGroupByList.isFailure) {
169+
getGroupByListByTeam.refresh(team)
167170
logger.error(s"Failed to fetch active group_by list for team $team")
168171
false
169172
} else {

0 commit comments

Comments
 (0)