Skip to content

Commit 0c3e610

Browse files
committed
style: Apply scalafix and scalafmt changes
1 parent 4051550 commit 0c3e610

File tree

3 files changed

+45
-44
lines changed

3 files changed

+45
-44
lines changed

online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,12 @@ object Fetcher {
7272
context.distribution(Metrics.Name.FetchCount, responseMap.size)
7373
}
7474

75-
/**
76-
* Response for a join schema request
77-
* @param joinName - Name of the join
78-
* @param keySchema - Avro schema string for the key
79-
* @param valueSchema - Avro schema string for the value
80-
* @param schemaHash - Hash of the join schema payload (used to track updates to key / value schema fields or types)
81-
*/
75+
/** Response for a join schema request
76+
* @param joinName - Name of the join
77+
* @param keySchema - Avro schema string for the key
78+
* @param valueSchema - Avro schema string for the value
79+
* @param schemaHash - Hash of the join schema payload (used to track updates to key / value schema fields or types)
80+
*/
8281
case class JoinSchemaResponse(joinName: String, keySchema: String, valueSchema: String, schemaHash: String)
8382
}
8483

@@ -431,14 +430,18 @@ class Fetcher(val kvStore: KVStore,
431430

432431
val joinCodecTry = joinCodecCache(joinName)
433432

434-
val joinSchemaResponse = joinCodecTry.map { joinCodec =>
435-
JoinSchemaResponse(joinName, joinCodec.keyCodec.schemaStr, joinCodec.valueCodec.schemaStr, joinCodec.loggingSchemaHash)
436-
}.recover {
437-
case exception: Throwable =>
433+
val joinSchemaResponse = joinCodecTry
434+
.map { joinCodec =>
435+
JoinSchemaResponse(joinName,
436+
joinCodec.keyCodec.schemaStr,
437+
joinCodec.valueCodec.schemaStr,
438+
joinCodec.loggingSchemaHash)
439+
}
440+
.recover { case exception: Throwable =>
438441
logger.error(s"Failed to fetch join schema for $joinName", exception)
439442
ctx.incrementException(exception)
440443
throw exception
441-
}
444+
}
442445

443446
joinSchemaResponse.foreach(_ => ctx.distribution("response.latency.millis", System.currentTimeMillis() - startTime))
444447
joinSchemaResponse

online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -181,47 +181,44 @@ class MetadataStore(fetchContext: FetchContext) {
181181

182182
def parseJoins(response: ListResponse): Seq[String] = {
183183
val result = response.values
184-
.map {
185-
seqListValues =>
186-
187-
seqListValues
188-
.map(kv => new String(kv.valueBytes, StandardCharsets.UTF_8))
189-
.map(v => ThriftJsonCodec.fromJsonStr[Join](v, check = false, classOf[Join]))
190-
.filter(_.join.metaData.online == isOnline)
191-
.map(_.metaData.name)
192-
193-
}.recover {
194-
case e: Exception =>
195-
logger.error("Failed to list & parse joins from list response", e)
196-
context.withSuffix("join_list").increment(Metrics.Name.Exception)
197-
throw e
184+
.map { seqListValues =>
185+
seqListValues
186+
.map(kv => new String(kv.valueBytes, StandardCharsets.UTF_8))
187+
.map(v => ThriftJsonCodec.fromJsonStr[Join](v, check = false, classOf[Join]))
188+
.filter(_.join.metaData.online == isOnline)
189+
.map(_.metaData.name)
190+
191+
}
192+
.recover { case e: Exception =>
193+
logger.error("Failed to list & parse joins from list response", e)
194+
context.withSuffix("join_list").increment(Metrics.Name.Exception)
195+
throw e
198196
}
199197

200198
result.get
201199
}
202200

203-
def doRetrieveAllListConfs(acc: mutable.ArrayBuffer[String], paginationKey: Option[Any] = None): Future[Seq[String]] = {
201+
def doRetrieveAllListConfs(acc: mutable.ArrayBuffer[String],
202+
paginationKey: Option[Any] = None): Future[Seq[String]] = {
204203
val propsMap = {
205204
paginationKey match {
206205
case Some(key) => Map(ListEntityType -> JoinKeyword, ContinuationKey -> key)
207-
case None => Map(ListEntityType -> JoinKeyword)
206+
case None => Map(ListEntityType -> JoinKeyword)
208207
}
209208
}
210209

211210
val listRequest = ListRequest(fetchContext.metadataDataset, propsMap)
212-
fetchContext.kvStore.list(listRequest).flatMap {
213-
response =>
214-
215-
val joinSeq: Seq[String] = parseJoins(response)
216-
val newAcc = acc ++ joinSeq
217-
if (response.resultProps.contains(ContinuationKey)) {
218-
doRetrieveAllListConfs(newAcc, response.resultProps.get(ContinuationKey))
219-
} else {
220-
context
221-
.withSuffix("join_list")
222-
.distribution(Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs)
223-
Future.successful(newAcc)
224-
}
211+
fetchContext.kvStore.list(listRequest).flatMap { response =>
212+
val joinSeq: Seq[String] = parseJoins(response)
213+
val newAcc = acc ++ joinSeq
214+
if (response.resultProps.contains(ContinuationKey)) {
215+
doRetrieveAllListConfs(newAcc, response.resultProps.get(ContinuationKey))
216+
} else {
217+
context
218+
.withSuffix("join_list")
219+
.distribution(Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs)
220+
Future.successful(newAcc)
221+
}
225222
}
226223
}
227224

online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class ListJoinsTest extends AnyFlatSpec with MockitoSugar with BeforeAndAfter wi
4949
it should "fail the call on internal issues" in {
5050
val metadataStore = new MetadataStore(FetchContext(kvStore))
5151
when(kvStore.list(any())).thenReturn(generateBrokenListResponse())
52-
an [Exception] should be thrownBy Await.result(metadataStore.listJoins(), 10.seconds)
52+
an[Exception] should be thrownBy Await.result(metadataStore.listJoins(), 10.seconds)
5353
}
5454

5555
it should "paginate list calls" in {
@@ -61,7 +61,8 @@ class ListJoinsTest extends AnyFlatSpec with MockitoSugar with BeforeAndAfter wi
6161
val listResponseValues: Seq[Try[Seq[ListValue]]] = responses.map(v => Success(Seq(v)))
6262

6363
// first response will have a continuation key
64-
val first = Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.head, Map(ContinuationKey -> "1")))
64+
val first = Future(
65+
ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.head, Map(ContinuationKey -> "1")))
6566
// second response will not have a continuation key
6667
val second = Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.last, Map.empty))
6768

@@ -77,7 +78,7 @@ class ListJoinsTest extends AnyFlatSpec with MockitoSugar with BeforeAndAfter wi
7778
// first is online = false
7879
"joins/user_transactions.txn_join_a",
7980
// this one is online = true
80-
"joins/user_transactions.txn_join_d",
81+
"joins/user_transactions.txn_join_d"
8182
)
8283

8384
paths.map { path =>

0 commit comments

Comments
 (0)