-
Notifications
You must be signed in to change notification settings - Fork 0
Summary upload #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Summary upload #50
Changes from all commits
fba5c42
9b0c82a
0aba15d
dbe401a
fbb7ffd
c503295
2f11cb0
2712d19
8eb6042
8b0d460
f7ff3c3
c50c873
5d59bbf
0956227
697afbc
4848221
cb20bb1
425adb9
9b231ee
90c157d
54cda0c
5f1b30b
db22935
4f9882a
24375e6
f15ab1f
b8c0d8c
c1bbe1f
835ab23
673cb1e
f966868
3c50b70
bc494fc
1b9f275
9c46dbc
27fdc18
1405872
3fe25fe
df6ccc1
2e4d2df
57c0291
a6447c8
d7860ee
4e18f18
b5d08b8
23e710f
f4d27fc
254e6c4
9a9ac67
cd7cd8e
ba9ea0c
6e75bcd
013c7d6
fb74aba
f920ef2
436a063
1acf3b5
4a8e5f9
40c52df
591c5ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ import ai.chronon.online.KVStore.ListValue | |||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.KVStore.TimedValue | ||||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.Metrics | ||||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.Metrics.Context | ||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.common.util.concurrent.RateLimiter | ||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.awssdk.core.SdkBytes | ||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient | ||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -30,6 +31,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse | |||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
import java.time.Instant | ||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util | ||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.ConcurrentHashMap | ||||||||||||||||||||||||||||||||||||||||||||||||||
import scala.concurrent.Future | ||||||||||||||||||||||||||||||||||||||||||||||||||
import scala.jdk.CollectionConverters._ | ||||||||||||||||||||||||||||||||||||||||||||||||||
import scala.util.Success | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -64,6 +66,8 @@ object DynamoDBKVStoreConstants { | |||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { | ||||||||||||||||||||||||||||||||||||||||||||||||||
import DynamoDBKVStoreConstants._ | ||||||||||||||||||||||||||||||||||||||||||||||||||
private val readRateLimiters = new ConcurrentHashMap[String, RateLimiter]() | ||||||||||||||||||||||||||||||||||||||||||||||||||
private val writeRateLimiters = new ConcurrentHashMap[String, RateLimiter]() | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.KVStore).withSuffix("dynamodb") | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -88,6 +92,9 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { | |||||||||||||||||||||||||||||||||||||||||||||||||
val rcu = getCapacityUnits(props, readCapacityUnits, defaultReadCapacityUnits) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val wcu = getCapacityUnits(props, writeCapacityUnits, defaultWriteCapacityUnits) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
readRateLimiters.put(dataset, RateLimiter.create(rcu)) | ||||||||||||||||||||||||||||||||||||||||||||||||||
writeRateLimiters.put(dataset, RateLimiter.create(wcu)) | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+95
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix rate limiter initialization in create method. The rate limiters are initialized with the correct capacity units, but there's a potential issue:
Consider moving the rate limiter initialization after successful table creation: - readRateLimiters.put(dataset, RateLimiter.create(rcu))
- writeRateLimiters.put(dataset, RateLimiter.create(wcu))
val request =
CreateTableRequest.builder
.attributeDefinitions(keyAttributes.toList.asJava)
.keySchema(keySchema.toList.asJava)
.provisionedThroughput(ProvisionedThroughput.builder.readCapacityUnits(rcu).writeCapacityUnits(wcu).build)
.tableName(dataset)
.build
logger.info(s"Triggering creation of DynamoDb table: $dataset")
try {
val _ = dynamoDbClient.createTable(request)
val tableRequest = DescribeTableRequest.builder.tableName(dataset).build
// Wait until the Amazon DynamoDB table is created.
val waiterResponse = dbWaiter.waitUntilTableExists(tableRequest)
if (waiterResponse.matched.exception().isPresent)
throw waiterResponse.matched.exception().get()
val tableDescription = waiterResponse.matched().response().get().table()
logger.info(s"Table created successfully! Details: \n${tableDescription.toString}")
+ readRateLimiters.put(dataset, RateLimiter.create(rcu))
+ writeRateLimiters.put(dataset, RateLimiter.create(wcu))
metricsContext.increment("create.successes") 📝 Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case it fails because the table already exists, I still want to create the ratelimiters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chewy-zlai, understood. Thank you for the clarification. Keeping the rate limiter initialization outside the try block ensures they are created even when the table already exists. I'll make a note of this for future reviews. ✏️ Learnings added
🧠 Learnings used
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
val request = | ||||||||||||||||||||||||||||||||||||||||||||||||||
CreateTableRequest.builder | ||||||||||||||||||||||||||||||||||||||||||||||||||
.attributeDefinitions(keyAttributes.toList.asJava) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -137,6 +144,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { | |||||||||||||||||||||||||||||||||||||||||||||||||
val getItemResults = getItemRequestPairs.map { | ||||||||||||||||||||||||||||||||||||||||||||||||||
case (req, getItemReq) => | ||||||||||||||||||||||||||||||||||||||||||||||||||
Future { | ||||||||||||||||||||||||||||||||||||||||||||||||||
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire() | ||||||||||||||||||||||||||||||||||||||||||||||||||
val item: Try[util.Map[String, AttributeValue]] = | ||||||||||||||||||||||||||||||||||||||||||||||||||
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDbClient.getItem(getItemReq).item() | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -151,6 +159,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { | |||||||||||||||||||||||||||||||||||||||||||||||||
val queryResults = queryRequestPairs.map { | ||||||||||||||||||||||||||||||||||||||||||||||||||
case (req, queryRequest) => | ||||||||||||||||||||||||||||||||||||||||||||||||||
Future { | ||||||||||||||||||||||||||||||||||||||||||||||||||
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire() | ||||||||||||||||||||||||||||||||||||||||||||||||||
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDbClient.query(queryRequest).items() | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -218,12 +227,10 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { | |||||||||||||||||||||||||||||||||||||||||||||||||
val futureResponses = datasetToWriteRequests.map { | ||||||||||||||||||||||||||||||||||||||||||||||||||
case (dataset, putItemRequest) => | ||||||||||||||||||||||||||||||||||||||||||||||||||
Future { | ||||||||||||||||||||||||||||||||||||||||||||||||||
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire() | ||||||||||||||||||||||||||||||||||||||||||||||||||
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDbClient.putItem(putItemRequest) | ||||||||||||||||||||||||||||||||||||||||||||||||||
}.recover { | ||||||||||||||||||||||||||||||||||||||||||||||||||
case _: Exception => false | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
true | ||||||||||||||||||||||||||||||||||||||||||||||||||
}.isSuccess | ||||||||||||||||||||||||||||||||||||||||||||||||||
chewy-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Future.sequence(futureResponses) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,60 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
package ai.chronon.spark.stats.drift | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.api.Constants | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.KVStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.online.KVStore.PutRequest | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ai.chronon.spark.TableUtils | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.apache.spark.sql.DataFrame | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.apache.spark.sql.types | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import scala.concurrent.ExecutionContext.Implicits.global | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import scala.concurrent.Future | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class SummaryUploader(summaryDF: DataFrame, kvStoreFunc: () => KVStore, putsPerRequest: Int = 100)(implicit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tu: TableUtils) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
extends Serializable { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val completed_schema: types.StructType = types.StructType( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Seq( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
types.StructField(tu.partitionColumn, types.StringType, nullable = false) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private val statsTableName = Constants.DriftStatsTable | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def run(): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Validate schema | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val requiredColumns = Seq("keyBytes", "valueBytes", "timestamp") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val missingColumns = requiredColumns.filterNot(summaryDF.columns.contains) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
require(missingColumns.isEmpty, s"Missing required columns: ${missingColumns.mkString(", ")}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
summaryDF.rdd.foreachPartition(rows => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val kvStore: KVStore = kvStoreFunc() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val putRequests = new scala.collection.mutable.ArrayBuffer[PutRequest] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (row <- rows) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
putRequests += PutRequest( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
chewy-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(row.getAs[Array[Byte]]("keyBytes")).getOrElse(Array.empty[Byte]), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(row.getAs[Array[Byte]]("valueBytes")).getOrElse(Array.empty[Byte]), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
statsTableName, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(row.getAs[Long]("timestamp")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val futureResults = putRequests.grouped(putsPerRequest).map { batch => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kvStore.multiPut(batch.toList).map { result => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!result.forall(identity)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(s"Failed to put ${result.count(!_)} records") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val aggregatedFuture = Future.sequence(futureResults.toSeq) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
aggregatedFuture.onComplete { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case scala.util.Success(_) => // All operations completed successfully | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case scala.util.Failure(e: IllegalArgumentException) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case scala.util.Failure(e: java.io.IOException) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case scala.util.Failure(e) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+51
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add handling for DynamoDB specific exceptions. Consider adding specific handling for DynamoDB's ProvisionedThroughputExceededException: case scala.util.Failure(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
case scala.util.Failure(e: java.io.IOException) =>
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
+ case scala.util.Failure(e: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException) =>
+ throw new RuntimeException(s"DynamoDB throughput exceeded: ${e.getMessage}", e)
case scala.util.Failure(e) => 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+28
to
+58
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure futures complete before partition processing ends. The current implementation might allow the partition processing to complete before all futures are resolved. Consider using val aggregatedFuture = Future.sequence(futureResults.toSeq)
- aggregatedFuture.onComplete {
- case scala.util.Success(_) => // All operations completed successfully
- case scala.util.Failure(e: IllegalArgumentException) =>
- throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
- case scala.util.Failure(e: java.io.IOException) =>
- throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
- case scala.util.Failure(e) =>
- throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
- }
+ import scala.concurrent.duration._
+ try {
+ scala.concurrent.Await.ready(aggregatedFuture, 5.minutes).value.get match {
+ case scala.util.Success(_) => // All operations completed successfully
+ case scala.util.Failure(e: IllegalArgumentException) =>
+ throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
+ case scala.util.Failure(e: java.io.IOException) =>
+ throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
+ case scala.util.Failure(e) =>
+ throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
+ }
+ } catch {
+ case e: scala.concurrent.TimeoutException =>
+ throw new RuntimeException("Upload timed out after 5 minutes", e)
+ } While this uses 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package ai.chronon.spark.test | ||
|
||
import ai.chronon.online.KVStore | ||
|
||
import scala.collection.mutable | ||
import scala.concurrent.Future | ||
|
||
class MockKVStore() extends KVStore with Serializable { | ||
val num_puts: mutable.Map[String,Int] = collection.mutable.Map[String, Int]() | ||
|
||
def bulkPut(sourceOfflineTable: String,destinationOnlineDataSet: String,partition: String): Unit = | ||
throw new UnsupportedOperationException("Not implemented in mock") | ||
def create(dataset: String): Unit = | ||
{ | ||
num_puts(dataset) = 0 | ||
} | ||
chewy-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def multiGet(requests: Seq[ai.chronon.online.KVStore.GetRequest]): scala.concurrent.Future[Seq[ai.chronon.online.KVStore.GetResponse]] = | ||
throw new UnsupportedOperationException("Not implemented in mock") | ||
def multiPut(keyValueDatasets: Seq[ai.chronon.online.KVStore.PutRequest]): scala.concurrent.Future[Seq[Boolean]] = { | ||
logger.info(s"Triggering multiput for ${keyValueDatasets.size}: rows") | ||
for (req <- keyValueDatasets if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty)) num_puts(req.dataset) += 1 | ||
|
||
val futureResponses = keyValueDatasets.map { req => | ||
if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty) Future{true} | ||
else Future{false} | ||
} | ||
Future.sequence(futureResponses) | ||
} | ||
chewy-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def show(): Unit = { | ||
num_puts.foreach(x => logger.info(s"Ran ${x._2} non-empty put actions for dataset ${x._1}")) | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nikhil-zlai - do you think this would warrant a need for more tests or would CI spark tests cover any breaks.