Skip to content

Commit 0fe9f53

Browse files
authored
Remove DDBKVStore rate limits (#741)
## Summary Pull out the DDB KV store rate limits as it causes docker startup errors due to class / jar version issues. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Removed rate limiting functionality from DynamoDB operations. - Eliminated dependency on Guava library from the build configuration. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent f08593b commit 0fe9f53

File tree

2 files changed

+0
-11
lines changed

2 files changed

+0
-11
lines changed

cloud_aws/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ shared_libs = [
55
maven_artifact("software.amazon.awssdk:sdk-core"),
66
maven_artifact("software.amazon.awssdk:utils"),
77
maven_artifact("software.amazon.awssdk:emr"),
8-
maven_artifact("com.google.guava:guava"),
98
maven_artifact("org.slf4j:slf4j-api"),
109
maven_artifact("com.fasterxml.jackson.module:jackson-module-afterburner"),
1110
maven_artifact_with_suffix("org.apache.hudi:hudi-spark3.5-bundle"),

cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import ai.chronon.online.KVStore.ListValue
1111
import ai.chronon.online.KVStore.TimedValue
1212
import ai.chronon.online.metrics.Metrics.Context
1313
import ai.chronon.online.metrics.Metrics
14-
import com.google.common.util.concurrent.RateLimiter
1514
import software.amazon.awssdk.core.SdkBytes
1615
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
1716
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
@@ -33,7 +32,6 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse
3332

3433
import java.time.Instant
3534
import java.util
36-
import java.util.concurrent.ConcurrentHashMap
3735
import scala.concurrent.Future
3836
import scala.util.Success
3937
import scala.util.Try
@@ -62,8 +60,6 @@ object DynamoDBKVStoreConstants {
6260

6361
class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
6462
import DynamoDBKVStoreConstants._
65-
private val readRateLimiters = new ConcurrentHashMap[String, RateLimiter]()
66-
private val writeRateLimiters = new ConcurrentHashMap[String, RateLimiter]()
6763

6864
protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.KVStore).withSuffix("dynamodb")
6965

@@ -88,9 +84,6 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
8884
val rcu = getCapacityUnits(props, readCapacityUnits, defaultReadCapacityUnits)
8985
val wcu = getCapacityUnits(props, writeCapacityUnits, defaultWriteCapacityUnits)
9086

91-
readRateLimiters.put(dataset, RateLimiter.create(rcu))
92-
writeRateLimiters.put(dataset, RateLimiter.create(wcu))
93-
9487
val request =
9588
CreateTableRequest.builder
9689
.attributeDefinitions(keyAttributes.toList.toJava)
@@ -139,7 +132,6 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
139132

140133
val getItemResults = getItemRequestPairs.map { case (req, getItemReq) =>
141134
Future {
142-
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
143135
val item: Try[util.Map[String, AttributeValue]] =
144136
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) {
145137
dynamoDbClient.getItem(getItemReq).item()
@@ -153,7 +145,6 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
153145

154146
val queryResults = queryRequestPairs.map { case (req, queryRequest) =>
155147
Future {
156-
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
157148
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) {
158149
dynamoDbClient.query(queryRequest).items()
159150
}
@@ -220,7 +211,6 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
220211

221212
val futureResponses = datasetToWriteRequests.map { case (dataset, putItemRequest) =>
222213
Future {
223-
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire()
224214
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) {
225215
dynamoDbClient.putItem(putItemRequest)
226216
}.isSuccess

0 commit comments

Comments
 (0)