Skip to content

Commit 11a7dcf

Browse files
piyush-zlaikumar-zlai
authored andcommitted
Enable batch IR caching by default + fix vertx bt startup issue (#435)
## Summary Enable batch IR caching by default & fix an issue where our Vertx init code tries to connect to BT at startup and takes a second or two on the worker threads (and results in the warning - 'Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 2976 ms, time limit is 2000 ms'). ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Streamlined caching configuration and logic with a consistent default setting for improved behavior. - Enhanced service startup by shifting to asynchronous initialization with better error handling for a more robust launch. - **Tests** - Removed an outdated test case that validated previous caching behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent d86b08a commit 11a7dcf

File tree

5 files changed

+35
-51
lines changed

5 files changed

+35
-51
lines changed

online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ case class FetchContext(kvStore: KVStore,
1919
.exists(_.asInstanceOf[Boolean])
2020
}
2121

22-
def isCachingEnabled(groupByName: String): Boolean = {
23-
Option(flagStore)
24-
.exists(_.isSet("enable_fetcher_batch_ir_cache", Map("group_by_streaming_dataset" -> groupByName).toJava))
25-
}
26-
2722
def shouldStreamingDecodeThrow(groupByName: String): Boolean = {
2823
Option(flagStore)
2924
.exists(

online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,19 @@ trait FetcherCache {
2626
@transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass)
2727

2828
val batchIrCacheName = "batch_cache"
29-
val maybeBatchIrCache: Option[BatchIrCache] =
29+
val defaultBatchIrCacheSize = "10000"
30+
31+
val configuredBatchIrCacheSize: Option[Int] =
3032
Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size_elements"))
31-
.map(size => new BatchIrCache(batchIrCacheName, size.toInt))
32-
.orElse(None)
33+
.orElse(Some(defaultBatchIrCacheSize))
34+
.map(_.toInt)
35+
.filter(_ > 0)
36+
37+
val maybeBatchIrCache: Option[BatchIrCache] =
38+
configuredBatchIrCacheSize
39+
.map(size => new BatchIrCache(batchIrCacheName, size))
3340

34-
// Caching needs to be configured globally
41+
// Caching needs to be configured globally with a cache size > 0
3542
def isCacheSizeConfigured: Boolean = maybeBatchIrCache.isDefined
3643

3744
// Caching needs to be enabled for the specific groupBy

online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
2727
@transient private implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
2828

2929
override def isCachingEnabled(groupBy: GroupBy): Boolean = {
30-
if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false
31-
32-
val gbName = groupBy.getMetaData.getName
33-
34-
val isCachingFlagEnabled = fetchContext.isCachingEnabled(gbName)
35-
36-
if (fetchContext.debug)
37-
logger.info(s"Online IR caching is ${if (isCachingFlagEnabled) "enabled" else "disabled"} for $gbName")
30+
if (fetchContext.debug) {
31+
configuredBatchIrCacheSize match {
32+
case Some(cacheSize) =>
33+
logger.info(s"Online IR caching is enabled with cache size = $cacheSize")
34+
case None =>
35+
logger.info("Online IR caching is disabled")
36+
}
37+
}
3838

39-
isCachingFlagEnabled
39+
isCacheSizeConfigured
4040
}
4141

4242
/** Convert a groupBy request into a batch kv request and optionally a streaming kv request

online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -203,35 +203,6 @@ class FetcherBaseTest extends AnyFlatSpec with MockitoSugar with Matchers with M
203203
verify(ttlCache, never()).apply(any())
204204
}
205205

206-
it should "determine if caching is enabled correctly" in {
207-
val flagStore: FlagStore = (flagName: String, attributes: java.util.Map[String, String]) => {
208-
flagName match {
209-
case "enable_fetcher_batch_ir_cache" =>
210-
attributes.get("group_by_streaming_dataset") match {
211-
case "test_groupby_2" => false
212-
case "test_groupby_3" => true
213-
case other @ _ =>
214-
fail(s"Unexpected group_by_streaming_dataset: $other")
215-
false
216-
}
217-
case _ => false
218-
}
219-
}
220-
221-
kvStore = mock[KVStore](Answers.RETURNS_DEEP_STUBS)
222-
when(kvStore.executionContext).thenReturn(ExecutionContext.global)
223-
224-
val fetchContext = FetchContext(kvStore, flagStore = flagStore)
225-
226-
val fetcherBaseWithFlagStore =
227-
spy[fetcher.JoinPartFetcher](new fetcher.JoinPartFetcher(fetchContext, new MetadataStore(fetchContext)))
228-
when(fetcherBaseWithFlagStore.isCacheSizeConfigured).thenReturn(true)
229-
230-
// no name set
231-
assertFalse(fetchContext.isCachingEnabled("test_groupby_2"))
232-
assertTrue(fetchContext.isCachingEnabled("test_groupby_3"))
233-
}
234-
235206
it should "fetch in the happy case" in {
236207
val fetchContext = mock[FetchContext]
237208
val baseFetcher = new fetcher.JoinPartFetcher(fetchContext, mock[MetadataStore])

service/src/main/java/ai/chronon/service/FetcherVerticle.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,27 @@ public class FetcherVerticle extends AbstractVerticle {
2626
@Override
2727
public void start(Promise<Void> startPromise) throws Exception {
2828
ConfigStore cfgStore = new ConfigStore(vertx);
29-
startHttpServer(cfgStore.getServerPort(), cfgStore.encodeConfig(), ApiProvider.buildApi(cfgStore), startPromise);
29+
30+
Api api = ApiProvider.buildApi(cfgStore);
31+
32+
// Execute the blocking Bigtable initialization in a separate worker thread
33+
vertx.executeBlocking(() -> api.buildJavaFetcher("feature-service", false))
34+
.onSuccess(fetcher -> {
35+
try {
36+
// This code runs back on the event loop when the blocking operation completes
37+
startHttpServer(cfgStore.getServerPort(), cfgStore.encodeConfig(), fetcher, startPromise);
38+
} catch (Exception e) {
39+
startPromise.fail(e);
40+
}
41+
})
42+
.onFailure(startPromise::fail);
3043
}
3144

32-
protected void startHttpServer(int port, String configJsonString, Api api, Promise<Void> startPromise) throws Exception {
45+
protected void startHttpServer(int port, String configJsonString, JavaFetcher fetcher, Promise<Void> startPromise) throws Exception {
3346
Router router = Router.router(vertx);
3447

3548
// Define routes
3649

37-
JavaFetcher fetcher = api.buildJavaFetcher("feature-service", false);
38-
3950
// Set up sub-routes for the various feature retrieval apis
4051
router.route("/v1/fetch/*").subRouter(FetchRouter.createFetchRoutes(vertx, fetcher));
4152

0 commit comments

Comments
 (0)