Skip to content

Commit 697e78b

Browse files
format
Co-authored-by: Thomas Chow <[email protected]>
1 parent 835f50c commit 697e78b

File tree

2 files changed

+32
-40
lines changed

2 files changed

+32
-40
lines changed

cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ import software.amazon.awssdk.core.SdkClient
1212

1313
import java.net.URI
1414

15-
/**
16-
* Implementation of Chronon's API interface for AWS. This is a work in progress and currently just covers the
15+
/** Implementation of Chronon's API interface for AWS. This is a work in progress and currently just covers the
1716
* DynamoDB based KV store implementation.
1817
*/
1918
class AwsApiImpl(conf: Map[String, String]) extends Api(conf) {
@@ -45,21 +44,18 @@ class AwsApiImpl(conf: Map[String, String]) extends Api(conf) {
4544
new DynamoDBKVStoreImpl(ddbClient)
4645
}
4746

48-
/**
49-
* The stream decoder method in the AwsApi is currently unimplemented. This needs to be implemented before
47+
/** The stream decoder method in the AwsApi is currently unimplemented. This needs to be implemented before
5048
* we can spin up the Aws streaming Chronon stack
5149
*/
5250
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): Serde = ???
5351

54-
/**
55-
* The external registry extension is currently unimplemented. We'll need to implement this prior to spinning up
52+
/** The external registry extension is currently unimplemented. We'll need to implement this prior to spinning up
5653
* a fully functional Chronon serving stack in Aws
5754
* @return
5855
*/
5956
override def externalRegistry: ExternalSourceRegistry = ???
6057

61-
/**
62-
* The logResponse method is currently unimplemented. We'll need to implement this prior to bringing up the
58+
/** The logResponse method is currently unimplemented. We'll need to implement this prior to bringing up the
6359
* fully functional serving stack in Aws which includes logging feature responses to a stream for OOC
6460
*/
6561
override def logResponse(resp: LoggableResponse): Unit = ???

cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -141,31 +141,29 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
141141
// timestamp to use for all get responses when the underlying tables don't have a ts field
142142
val defaultTimestamp = Instant.now().toEpochMilli
143143

144-
val getItemResults = getItemRequestPairs.map {
145-
case (req, getItemReq) =>
146-
Future {
147-
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
148-
val item: Try[util.Map[String, AttributeValue]] =
149-
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) {
150-
dynamoDbClient.getItem(getItemReq).item()
151-
}
152-
153-
val response = item.map(i => List(i).asJava)
154-
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(response, defaultTimestamp)
155-
GetResponse(req, resultValue)
156-
}
144+
val getItemResults = getItemRequestPairs.map { case (req, getItemReq) =>
145+
Future {
146+
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
147+
val item: Try[util.Map[String, AttributeValue]] =
148+
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) {
149+
dynamoDbClient.getItem(getItemReq).item()
150+
}
151+
152+
val response = item.map(i => List(i).asJava)
153+
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(response, defaultTimestamp)
154+
GetResponse(req, resultValue)
155+
}
157156
}
158157

159-
val queryResults = queryRequestPairs.map {
160-
case (req, queryRequest) =>
161-
Future {
162-
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
163-
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) {
164-
dynamoDbClient.query(queryRequest).items()
165-
}
166-
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(responses, defaultTimestamp)
167-
GetResponse(req, resultValue)
158+
val queryResults = queryRequestPairs.map { case (req, queryRequest) =>
159+
Future {
160+
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
161+
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) {
162+
dynamoDbClient.query(queryRequest).items()
168163
}
164+
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(responses, defaultTimestamp)
165+
GetResponse(req, resultValue)
166+
}
169167
}
170168

171169
Future.sequence(getItemResults ++ queryResults)
@@ -224,20 +222,18 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
224222
(req.dataset, putItemReq)
225223
}
226224

227-
val futureResponses = datasetToWriteRequests.map {
228-
case (dataset, putItemRequest) =>
229-
Future {
230-
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire()
231-
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) {
232-
dynamoDbClient.putItem(putItemRequest)
233-
}.isSuccess
234-
}
225+
val futureResponses = datasetToWriteRequests.map { case (dataset, putItemRequest) =>
226+
Future {
227+
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire()
228+
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) {
229+
dynamoDbClient.putItem(putItemRequest)
230+
}.isSuccess
231+
}
235232
}
236233
Future.sequence(futureResponses)
237234
}
238235

239-
/**
240-
* Implementation of bulkPut is currently a TODO for the DynamoDB store. This involves transforming the underlying
236+
/** Implementation of bulkPut is currently a TODO for the DynamoDB store. This involves transforming the underlying
241237
* Parquet data to Amazon's Ion format + swapping out old table for new (as bulkLoad only writes to new tables)
242238
*/
243239
override def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit = ???

0 commit comments

Comments
 (0)