Skip to content

Commit 14cc871

Browse files
authored
Don't request from KVStore if ANY of key is null (#968)
* Don't request null keys from KVStore * add another val to avoid indentation * run scalafmt * change from ALL to ANY * run scalafmt * merge recent changes to fix unit tests * fix unit tests * rebase the latest changes
1 parent cb60be1 commit 14cc871

File tree

3 files changed

+94
-25
lines changed

3 files changed

+94
-25
lines changed

online/src/main/scala/ai/chronon/online/FetcherBase.scala

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,10 @@ class FetcherBase(kvStore: KVStore,
350350
// 4. Finally converted to outputSchema
351351
def fetchGroupBys(requests: scala.collection.Seq[Request]): Future[scala.collection.Seq[Response]] = {
352352
// split a groupBy level request into its kvStore level requests
353-
val groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])] = requests.iterator.map { request =>
353+
// don't send to KV store if ANY of key's value is null
354+
val validRequests =
355+
requests.filter(r => r.keys == null || r.keys.values == null || !r.keys.values.exists(_ == null))
356+
val groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])] = validRequests.iterator.map { request =>
354357
val groupByServingInfoTry = getGroupByServingInfo(request.name)
355358
.recover {
356359
case ex: Throwable =>
@@ -665,28 +668,8 @@ class FetcherBase(kvStore: KVStore,
665668
}
666669
Map(fetchException.getRequestName + "_exception" -> fetchException.getMessage)
667670
}
668-
case Left(PrefixedRequest(prefix, groupByRequest)) => {
669-
responseMap
670-
.getOrElse(groupByRequest,
671-
Failure(new IllegalStateException(
672-
s"Couldn't find a groupBy response for $groupByRequest in response map")))
673-
.map { valueMap =>
674-
if (valueMap != null) {
675-
valueMap.map { case (aggName, aggValue) => prefix + "_" + aggName -> aggValue }
676-
} else {
677-
Map.empty[String, AnyRef]
678-
}
679-
}
680-
// prefix feature names
681-
.recover { // capture exception as a key
682-
case ex: Throwable =>
683-
if (debug || Math.random() < 0.001) {
684-
logger.error(s"Failed to fetch $groupByRequest", ex)
685-
}
686-
Map(prefix + "_exception" -> ex.traceString)
687-
}
688-
.get
689-
}
671+
case Left(PrefixedRequest(prefix, groupByRequest)) =>
672+
parseGroupByResponse(prefix, groupByRequest, responseMap)
690673
}.toMap
691674
}
692675
joinValuesTry match {
@@ -706,6 +689,41 @@ class FetcherBase(kvStore: KVStore,
706689
}
707690
}
708691

692+
def parseGroupByResponse(prefix: String,
693+
groupByRequest: Request,
694+
responseMap: Map[Request, Try[Map[String, AnyRef]]]) = {
695+
696+
// Group bys with ANY null keys won't be requested from the KV store and we don't expect a response.
697+
val isRequiredRequest =
698+
(groupByRequest.keys.nonEmpty && !groupByRequest.keys.values.exists(_ == null)) || groupByRequest.keys.isEmpty
699+
700+
val response: Try[Map[String, AnyRef]] = responseMap.get(groupByRequest) match {
701+
case Some(value) => value
702+
case None =>
703+
if (isRequiredRequest)
704+
Failure(new IllegalStateException(s"Couldn't find a groupBy response for $groupByRequest in response map"))
705+
else Success(null)
706+
}
707+
708+
response
709+
.map { valueMap =>
710+
if (valueMap != null) {
711+
valueMap.map { case (aggName, aggValue) => prefix + "_" + aggName -> aggValue }
712+
} else {
713+
Map.empty[String, AnyRef]
714+
}
715+
}
716+
// prefix feature names
717+
.recover { // capture exception as a key
718+
case ex: Throwable =>
719+
if (debug || Math.random() < 0.001) {
720+
logger.error(s"Failed to fetch $groupByRequest", ex)
721+
}
722+
Map(prefix + "_exception" -> ex.traceString)
723+
}
724+
.get
725+
}
726+
709727
/**
710728
* Fetch method to simulate a random access interface for Chronon
711729
* by distributing requests to relevant GroupBys. This is a batch

online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,4 +269,50 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper {
269269
val result2 = fetcherBase.checkLateBatchData(1710896400000L, "myGroupBy", 1710633600000L, tailHops2d, shortWindows)
270270
assertSame(result2, 0L)
271271
}
272+
273+
@Test
274+
def testParsingGroupByResponse_HappyCase(): Unit = {
275+
val baseFetcher = new FetcherBase(mock[KVStore])
276+
val request = Request(name = "name", keys = Map("email" -> "email"), atMillis = None, context = None)
277+
val response: Map[Request, Try[Map[String, AnyRef]]] = Map(
278+
request -> Success(Map(
279+
"key" -> "value"
280+
))
281+
)
282+
283+
val result = baseFetcher.parseGroupByResponse("prefix", request, response)
284+
result shouldBe Map("prefix_key" -> "value")
285+
}
286+
287+
@Test
288+
def testParsingGroupByResponse_NullKey(): Unit = {
289+
val baseFetcher = new FetcherBase(mock[KVStore])
290+
val request = Request(name = "name", keys = Map("email" -> null), atMillis = None, context = None)
291+
val request2 = Request(name = "name2", keys = Map("email" -> null), atMillis = None, context = None)
292+
293+
val response: Map[Request, Try[Map[String, AnyRef]]] = Map(
294+
request2 -> Success(Map(
295+
"key" -> "value"
296+
))
297+
)
298+
299+
val result = baseFetcher.parseGroupByResponse("prefix", request, response)
300+
result shouldBe Map()
301+
}
302+
303+
@Test
304+
def testParsingGroupByResponse_MissingKey(): Unit = {
305+
val baseFetcher = new FetcherBase(mock[KVStore])
306+
val request = Request(name = "name", keys = Map("email" -> "email"), atMillis = None, context = None)
307+
val request2 = Request(name = "name2", keys = Map("email" -> "email"), atMillis = None, context = None)
308+
309+
val response: Map[Request, Try[Map[String, AnyRef]]] = Map(
310+
request2 -> Success(Map(
311+
"key" -> "value"
312+
))
313+
)
314+
315+
val result = baseFetcher.parseGroupByResponse("prefix", request, response)
316+
result.keySet shouldBe Set("prefix_exception")
317+
}
272318
}

spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,10 @@ class FetcherTest extends TestCase {
401401

402402
val listingEventData = Seq(
403403
Row(1L, toTs("2021-04-10 03:10:00"), "2021-04-10"),
404-
Row(2L, toTs("2021-04-10 03:10:00"), "2021-04-10")
404+
Row(2L, toTs("2021-04-10 03:10:00"), "2021-04-10"),
405+
Row(2L, toTs("2021-04-10 03:10:00"), "2021-04-10"),
406+
Row(2L, toTs("2021-04-10 03:10:00"), "2021-04-10"),
407+
Row(null, toTs("2021-04-10 03:10:00"), "2021-04-10")
405408
)
406409
val ratingEventData = Seq(
407410
// 1L listing id event data
@@ -422,7 +425,9 @@ class FetcherTest extends TestCase {
422425
Row(2L, toTs("2021-04-10 02:30:00"), 5, "2021-04-10"),
423426
Row(2L, toTs("2021-04-10 02:30:00"), 8, "2021-04-10"),
424427
Row(2L, toTs("2021-04-10 02:30:00"), 8, "2021-04-10"),
425-
Row(2L, toTs("2021-04-07 00:30:00"), 10, "2021-04-10") // dated 4/10 but excluded from avg agg based on ts
428+
Row(2L, toTs("2021-04-07 00:30:00"), 10, "2021-04-10"), // dated 4/10 but excluded from avg agg based on ts
429+
Row(2L, toTs("2021-04-07 00:30:00"), 10, "2021-04-10"), // dated 4/10 but excluded from avg agg based on ts
430+
Row(null, toTs("2021-04-10 02:30:00"), 8, "2021-04-10")
426431
)
427432
// Schemas
428433
// {..., event (generic event column), ...}

0 commit comments

Comments
 (0)