Skip to content

Summary upload #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 60 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
fba5c42
[WIP] Anomaly detection prototype with generated data
nikhil-zlai Sep 11, 2024
9b0c82a
discord -> slack + drift stuff + avro decoding benchmark
nikhil-zlai Sep 21, 2024
0aba15d
changes
nikhil-zlai Sep 21, 2024
dbe401a
overwatch
nikhil-zlai Oct 7, 2024
fbb7ffd
compile
nikhil-zlai Oct 8, 2024
c503295
pctile to pmf
nikhil-zlai Oct 14, 2024
2f11cb0
test v0
nikhil-zlai Oct 15, 2024
2712d19
more test fixes
nikhil-zlai Oct 15, 2024
8eb6042
adding incremental compute
nikhil-zlai Oct 15, 2024
8b0d460
printing queries in color
nikhil-zlai Oct 15, 2024
f7ff3c3
Partition Runner
nikhil-zlai Oct 17, 2024
c50c873
summary metrics thrift
nikhil-zlai Oct 18, 2024
5d59bbf
packing data into keybytes and value bytes
nikhil-zlai Oct 18, 2024
0956227
wiring up with partition runner + porting fraud data
nikhil-zlai Oct 21, 2024
697afbc
Merge branch 'main' of https://github.com/zipline-ai/chronon into sum…
chewys1024 Oct 22, 2024
4848221
Fix merge
chewys1024 Oct 22, 2024
cb20bb1
[WIP] Summary Uploader
chewys1024 Oct 22, 2024
425adb9
Added a mockKVStore
chewys1024 Oct 22, 2024
9b231ee
scalafixAll
chewys1024 Oct 22, 2024
90c157d
Add a check to only upload each partition once.
chewys1024 Oct 23, 2024
54cda0c
coderabbit changes
chewy-zlai Oct 23, 2024
5f1b30b
Summarizer (#17)
nikhil-zlai Oct 29, 2024
db22935
Restoring aggregator files to main
chewy-zlai Oct 29, 2024
4f9882a
Merge branch 'main' of https://github.com/zipline-ai/chronon into sum…
chewy-zlai Oct 29, 2024
24375e6
Merged with main to simplify change
chewy-zlai Oct 29, 2024
f15ab1f
revert build.sbt
chewy-zlai Oct 29, 2024
b8c0d8c
Merge branch 'main' of https://github.com/zipline-ai/chronon into sum…
chewy-zlai Oct 31, 2024
c1bbe1f
Add create to MockKVStore
chewy-zlai Oct 31, 2024
835ab23
Create map correctly
chewy-zlai Oct 31, 2024
673cb1e
Attempt to make it serializable
chewy-zlai Oct 31, 2024
f966868
Make MockKVStore and SummaryUploader serializable.
chewy-zlai Oct 31, 2024
3c50b70
Fix serialization by passing in a function to create the KVStore
chewy-zlai Oct 31, 2024
bc494fc
sbt scalafmt
chewy-zlai Oct 31, 2024
1b9f275
coderabbit suggested use of a variable
chewy-zlai Oct 31, 2024
9c46dbc
Clean up unintended changes to DriftTest
chewy-zlai Oct 31, 2024
27fdc18
Clean up unintended changes to DriftTest
chewy-zlai Oct 31, 2024
1405872
Batch putrequest defualting at 100 at a time
chewy-zlai Oct 31, 2024
3fe25fe
Store putRequests as an ArrayBuffer for better performance
chewy-zlai Oct 31, 2024
df6ccc1
Add a semaphore to limit puts
chewy-zlai Nov 4, 2024
2e4d2df
Add backoff to DynamoDBKVStoreImpl in case of ProvisionedThroughputEx…
chewy-zlai Nov 4, 2024
57c0291
Simplify KVStoreSemaphore via coderabbit suggestions
chewy-zlai Nov 4, 2024
a6447c8
Fixes error handling issue coderabbit noticed in DynamoDBKVStoreImpl
chewy-zlai Nov 4, 2024
d7860ee
Merge branch 'main' into summary-upload
chewy-zlai Nov 4, 2024
4e18f18
Switch semaphore to a ratelimiter per dataset for DynamoDBKVStoreImpl
chewy-zlai Nov 4, 2024
b5d08b8
Add ratelimiter to multiget as well for DDBKVSImpl
chewy-zlai Nov 4, 2024
23e710f
Merge branch 'summary-upload' of https://github.com/zipline-ai/chrono…
chewy-zlai Nov 4, 2024
f4d27fc
Remove parameter from creation of KVStore
chewy-zlai Nov 4, 2024
254e6c4
remove parameter from SummaryUploader
chewy-zlai Nov 4, 2024
9a9ac67
scalafixAll
chewy-zlai Nov 4, 2024
cd7cd8e
Handle partial errors from multiput
chewy-zlai Nov 4, 2024
ba9ea0c
Specificity in error handling
chewy-zlai Nov 4, 2024
6e75bcd
Fix guava dependency so RateLimiter will work.
chewy-zlai Nov 5, 2024
013c7d6
Update DynamoDBKVStore rateLimiters to use a TrieMap
chewy-zlai Nov 5, 2024
fb74aba
Switch to ConcurrentHashMap for RateLimiters
chewy-zlai Nov 5, 2024
f920ef2
remove semaphor
chewy-zlai Nov 5, 2024
436a063
remove use of semaphore
chewy-zlai Nov 5, 2024
1acf3b5
remove manual retry logic as AWS handles that for us
chewy-zlai Nov 5, 2024
4a8e5f9
more idiomatic use of concurrent maps
chewy-zlai Nov 5, 2024
40c52df
Move stats table name to Constants
chewy-zlai Nov 5, 2024
591c5ae
clean up redundant code
chewy-zlai Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/main/scala/ai/chronon/api/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ object Constants {
val LabelViewPropertyFeatureTable: String = "feature_table"
val LabelViewPropertyKeyLabelTable: String = "label_table"
val ChrononRunDs: String = "CHRONON_RUN_DS"
val DriftStatsTable: String = "drift_statistics"
}
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ lazy val spark = project
crossScalaVersions := supportedVersions,
libraryDependencies ++= spark_all_provided,
libraryDependencies ++= spark_all.map(_ % "test"),
libraryDependencies += "jakarta.servlet" % "jakarta.servlet-api" % "4.0.3"
)
libraryDependencies += "jakarta.servlet" % "jakarta.servlet-api" % "4.0.3",
libraryDependencies += "com.google.guava" % "guava" % "33.3.1-jre"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhil-zlai - do you think this would warrant a need for more tests or would CI spark tests cover any breaks.

)

lazy val flink = project
.dependsOn(aggregator.%("compile->compile;test->test"), online)
Expand Down Expand Up @@ -189,6 +190,7 @@ lazy val cloud_aws = project
libraryDependencies += "io.circe" %% "circe-core" % circeVersion % "test",
libraryDependencies += "io.circe" %% "circe-generic" % circeVersion % "test",
libraryDependencies += "io.circe" %% "circe-parser" % circeVersion % "test",
libraryDependencies += "com.google.guava" % "guava" % "33.3.1-jre",
libraryDependencies ++= spark_all
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import ai.chronon.online.KVStore.ListValue
import ai.chronon.online.KVStore.TimedValue
import ai.chronon.online.Metrics
import ai.chronon.online.Metrics.Context
import com.google.common.util.concurrent.RateLimiter
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
Expand All @@ -30,6 +31,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse

import java.time.Instant
import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.util.Success
Expand Down Expand Up @@ -64,6 +66,8 @@ object DynamoDBKVStoreConstants {

class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
import DynamoDBKVStoreConstants._
private val readRateLimiters = new ConcurrentHashMap[String, RateLimiter]()
private val writeRateLimiters = new ConcurrentHashMap[String, RateLimiter]()

protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.KVStore).withSuffix("dynamodb")

Expand All @@ -88,6 +92,9 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
val rcu = getCapacityUnits(props, readCapacityUnits, defaultReadCapacityUnits)
val wcu = getCapacityUnits(props, writeCapacityUnits, defaultWriteCapacityUnits)

readRateLimiters.put(dataset, RateLimiter.create(rcu))
writeRateLimiters.put(dataset, RateLimiter.create(wcu))
Comment on lines +95 to +96
Copy link

@coderabbitai coderabbitai bot Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix rate limiter initialization in create method.

The rate limiters are initialized with the correct capacity units, but there's a potential issue:

  1. If the create method fails, the rate limiters will remain in the maps.
  2. Subsequent create attempts might have different capacity units.

Consider moving the rate limiter initialization after successful table creation:

-    readRateLimiters.put(dataset, RateLimiter.create(rcu))
-    writeRateLimiters.put(dataset, RateLimiter.create(wcu))

     val request =
       CreateTableRequest.builder
         .attributeDefinitions(keyAttributes.toList.asJava)
         .keySchema(keySchema.toList.asJava)
         .provisionedThroughput(ProvisionedThroughput.builder.readCapacityUnits(rcu).writeCapacityUnits(wcu).build)
         .tableName(dataset)
         .build
 
     logger.info(s"Triggering creation of DynamoDb table: $dataset")
     try {
       val _ = dynamoDbClient.createTable(request)
       val tableRequest = DescribeTableRequest.builder.tableName(dataset).build
       // Wait until the Amazon DynamoDB table is created.
       val waiterResponse = dbWaiter.waitUntilTableExists(tableRequest)
       if (waiterResponse.matched.exception().isPresent)
         throw waiterResponse.matched.exception().get()
 
       val tableDescription = waiterResponse.matched().response().get().table()
       logger.info(s"Table created successfully! Details: \n${tableDescription.toString}")
+      readRateLimiters.put(dataset, RateLimiter.create(rcu))
+      writeRateLimiters.put(dataset, RateLimiter.create(wcu))
       metricsContext.increment("create.successes")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
readRateLimiters.put(dataset, RateLimiter.create(rcu))
writeRateLimiters.put(dataset, RateLimiter.create(wcu))
val request =
CreateTableRequest.builder
.attributeDefinitions(keyAttributes.toList.asJava)
.keySchema(keySchema.toList.asJava)
.provisionedThroughput(ProvisionedThroughput.builder.readCapacityUnits(rcu).writeCapacityUnits(wcu).build)
.tableName(dataset)
.build
logger.info(s"Triggering creation of DynamoDb table: $dataset")
try {
val _ = dynamoDbClient.createTable(request)
val tableRequest = DescribeTableRequest.builder.tableName(dataset).build
// Wait until the Amazon DynamoDB table is created.
val waiterResponse = dbWaiter.waitUntilTableExists(tableRequest)
if (waiterResponse.matched.exception().isPresent)
throw waiterResponse.matched.exception().get()
val tableDescription = waiterResponse.matched().response().get().table()
logger.info(s"Table created successfully! Details: \n${tableDescription.toString}")
readRateLimiters.put(dataset, RateLimiter.create(rcu))
writeRateLimiters.put(dataset, RateLimiter.create(wcu))
metricsContext.increment("create.successes")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it fails because the table already exists, I still want to create the ratelimiters.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chewy-zlai, understood. Thank you for the clarification. Keeping the rate limiter initialization outside the try block ensures they are created even when the table already exists. I'll make a note of this for future reviews.


✏️ Learnings added
Learnt from: chewy-zlai
PR: zipline-ai/chronon#50
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:95-96
Timestamp: 2024-11-07T00:49:32.110Z
Learning: In `DynamoDBKVStoreImpl.scala`, the rate limiters should be initialized outside the `try` block in the `create` method to ensure they are set up even if the table already exists.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:167-229
Timestamp: 2024-10-14T14:05:23.413Z
Learning: Leveraging DynamoDB batch writes for performance optimization in `multiPut` will be addressed in a future review.


val request =
CreateTableRequest.builder
.attributeDefinitions(keyAttributes.toList.asJava)
Expand Down Expand Up @@ -137,6 +144,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
val getItemResults = getItemRequestPairs.map {
case (req, getItemReq) =>
Future {
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
val item: Try[util.Map[String, AttributeValue]] =
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) {
dynamoDbClient.getItem(getItemReq).item()
Expand All @@ -151,6 +159,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
val queryResults = queryRequestPairs.map {
case (req, queryRequest) =>
Future {
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) {
dynamoDbClient.query(queryRequest).items()
}
Expand Down Expand Up @@ -218,12 +227,10 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
val futureResponses = datasetToWriteRequests.map {
case (dataset, putItemRequest) =>
Future {
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire()
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) {
dynamoDbClient.putItem(putItemRequest)
}.recover {
case _: Exception => false
}
true
}.isSuccess
}
}
Future.sequence(futureResponses)
Expand Down
1 change: 0 additions & 1 deletion online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ object KVStore {
trait KVStore {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
implicit val executionContext: ExecutionContext = FlexibleExecutionContext.buildExecutionContext

def create(dataset: String): Unit

def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ai.chronon.spark.stats.drift
import ai.chronon.api.Constants
import ai.chronon.online.KVStore
import ai.chronon.online.KVStore.PutRequest
import ai.chronon.spark.TableUtils
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class SummaryUploader(summaryDF: DataFrame, kvStoreFunc: () => KVStore, putsPerRequest: Int = 100)(implicit
tu: TableUtils)
extends Serializable {
val completed_schema: types.StructType = types.StructType(
Seq(
types.StructField(tu.partitionColumn, types.StringType, nullable = false)
)
)
private val statsTableName = Constants.DriftStatsTable

def run(): Unit = {
// Validate schema
val requiredColumns = Seq("keyBytes", "valueBytes", "timestamp")
val missingColumns = requiredColumns.filterNot(summaryDF.columns.contains)
require(missingColumns.isEmpty, s"Missing required columns: ${missingColumns.mkString(", ")}")

summaryDF.rdd.foreachPartition(rows => {
val kvStore: KVStore = kvStoreFunc()
val putRequests = new scala.collection.mutable.ArrayBuffer[PutRequest]
for (row <- rows) {
putRequests += PutRequest(
Option(row.getAs[Array[Byte]]("keyBytes")).getOrElse(Array.empty[Byte]),
Option(row.getAs[Array[Byte]]("valueBytes")).getOrElse(Array.empty[Byte]),
statsTableName,
Option(row.getAs[Long]("timestamp"))
)
}

val futureResults = putRequests.grouped(putsPerRequest).map { batch =>
kvStore.multiPut(batch.toList).map { result =>
if (!result.forall(identity)) {
throw new RuntimeException(s"Failed to put ${result.count(!_)} records")
}
}
}

val aggregatedFuture = Future.sequence(futureResults.toSeq)
aggregatedFuture.onComplete {
case scala.util.Success(_) => // All operations completed successfully
case scala.util.Failure(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
case scala.util.Failure(e: java.io.IOException) =>
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
case scala.util.Failure(e) =>
Comment on lines +51 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add handling for DynamoDB specific exceptions.

Consider adding specific handling for DynamoDB's ProvisionedThroughputExceededException:

         case scala.util.Failure(e: IllegalArgumentException) =>
           throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
         case scala.util.Failure(e: java.io.IOException) =>
           throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
+        case scala.util.Failure(e: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException) =>
+          throw new RuntimeException(s"DynamoDB throughput exceeded: ${e.getMessage}", e)
         case scala.util.Failure(e) =>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case scala.util.Failure(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
case scala.util.Failure(e: java.io.IOException) =>
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
case scala.util.Failure(e) =>
case scala.util.Failure(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
case scala.util.Failure(e: java.io.IOException) =>
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
case scala.util.Failure(e: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException) =>
throw new RuntimeException(s"DynamoDB throughput exceeded: ${e.getMessage}", e)
case scala.util.Failure(e) =>

throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
}
})
Comment on lines +28 to +58
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure futures complete before partition processing ends.

The current implementation might allow the partition processing to complete before all futures are resolved. Consider using Await.ready with a timeout to ensure all operations complete:

       val aggregatedFuture = Future.sequence(futureResults.toSeq)
-      aggregatedFuture.onComplete {
-        case scala.util.Success(_) => // All operations completed successfully
-        case scala.util.Failure(e: IllegalArgumentException) =>
-          throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
-        case scala.util.Failure(e: java.io.IOException) =>
-          throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
-        case scala.util.Failure(e) =>
-          throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
-      }
+      import scala.concurrent.duration._
+      try {
+        scala.concurrent.Await.ready(aggregatedFuture, 5.minutes).value.get match {
+          case scala.util.Success(_) => // All operations completed successfully
+          case scala.util.Failure(e: IllegalArgumentException) =>
+            throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
+          case scala.util.Failure(e: java.io.IOException) =>
+            throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
+          case scala.util.Failure(e) =>
+            throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
+        }
+      } catch {
+        case e: scala.concurrent.TimeoutException =>
+          throw new RuntimeException("Upload timed out after 5 minutes", e)
+      }

While this uses Await.ready, it's acceptable here as we need to ensure all operations complete before the partition processing ends. The timeout ensures we don't block indefinitely.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
summaryDF.rdd.foreachPartition(rows => {
val kvStore: KVStore = kvStoreFunc()
val putRequests = new scala.collection.mutable.ArrayBuffer[PutRequest]
for (row <- rows) {
putRequests += PutRequest(
Option(row.getAs[Array[Byte]]("keyBytes")).getOrElse(Array.empty[Byte]),
Option(row.getAs[Array[Byte]]("valueBytes")).getOrElse(Array.empty[Byte]),
statsTableName,
Option(row.getAs[Long]("timestamp"))
)
}
val futureResults = putRequests.grouped(putsPerRequest).map { batch =>
kvStore.multiPut(batch.toList).map { result =>
if (!result.forall(identity)) {
throw new RuntimeException(s"Failed to put ${result.count(!_)} records")
}
}
}
val aggregatedFuture = Future.sequence(futureResults.toSeq)
aggregatedFuture.onComplete {
case scala.util.Success(_) => // All operations completed successfully
case scala.util.Failure(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
case scala.util.Failure(e: java.io.IOException) =>
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
case scala.util.Failure(e) =>
throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
}
})
summaryDF.rdd.foreachPartition(rows => {
val kvStore: KVStore = kvStoreFunc()
val putRequests = new scala.collection.mutable.ArrayBuffer[PutRequest]
for (row <- rows) {
putRequests += PutRequest(
Option(row.getAs[Array[Byte]]("keyBytes")).getOrElse(Array.empty[Byte]),
Option(row.getAs[Array[Byte]]("valueBytes")).getOrElse(Array.empty[Byte]),
statsTableName,
Option(row.getAs[Long]("timestamp"))
)
}
val futureResults = putRequests.grouped(putsPerRequest).map { batch =>
kvStore.multiPut(batch.toList).map { result =>
if (!result.forall(identity)) {
throw new RuntimeException(s"Failed to put ${result.count(!_)} records")
}
}
}
val aggregatedFuture = Future.sequence(futureResults.toSeq)
import scala.concurrent.duration._
try {
scala.concurrent.Await.ready(aggregatedFuture, 5.minutes).value.get match {
case scala.util.Success(_) => // All operations completed successfully
case scala.util.Failure(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
case scala.util.Failure(e: java.io.IOException) =>
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
case scala.util.Failure(e) =>
throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
}
} catch {
case e: scala.concurrent.TimeoutException =>
throw new RuntimeException("Upload timed out after 5 minutes", e)
}
})

}
}
34 changes: 34 additions & 0 deletions spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ai.chronon.spark.test

import ai.chronon.online.KVStore

import scala.collection.mutable
import scala.concurrent.Future

class MockKVStore() extends KVStore with Serializable {
val num_puts: mutable.Map[String,Int] = collection.mutable.Map[String, Int]()

def bulkPut(sourceOfflineTable: String,destinationOnlineDataSet: String,partition: String): Unit =
throw new UnsupportedOperationException("Not implemented in mock")
def create(dataset: String): Unit =
{
num_puts(dataset) = 0
}
def multiGet(requests: Seq[ai.chronon.online.KVStore.GetRequest]): scala.concurrent.Future[Seq[ai.chronon.online.KVStore.GetResponse]] =
throw new UnsupportedOperationException("Not implemented in mock")
def multiPut(keyValueDatasets: Seq[ai.chronon.online.KVStore.PutRequest]): scala.concurrent.Future[Seq[Boolean]] = {
logger.info(s"Triggering multiput for ${keyValueDatasets.size}: rows")
for (req <- keyValueDatasets if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty)) num_puts(req.dataset) += 1

val futureResponses = keyValueDatasets.map { req =>
if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty) Future{true}
else Future{false}
}
Future.sequence(futureResponses)
}

def show(): Unit = {
num_puts.foreach(x => logger.info(s"Ran ${x._2} non-empty put actions for dataset ${x._1}"))

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package ai.chronon.spark.test.stats.drift
import ai.chronon.aggregator.test.Column
import ai.chronon.api
import ai.chronon.api.ColorPrinter.ColorString
import ai.chronon.api.Constants
import ai.chronon.api.Extensions.MetadataOps
import ai.chronon.online.KVStore
import ai.chronon.spark.Extensions._
import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.TableUtils
import ai.chronon.spark.stats.drift.Summarizer
import ai.chronon.spark.stats.drift.SummaryPacker
import ai.chronon.spark.stats.drift.SummaryUploader
import ai.chronon.spark.test.DataFrameGen
import ai.chronon.spark.test.MockKVStore
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.scalatest.flatspec.AnyFlatSpec
Expand All @@ -36,6 +40,18 @@ class DriftTest extends AnyFlatSpec with Matchers {
val packer = new SummaryPacker("drift_test_basic", summaryExprs, summarizer.tileSize, summarizer.sliceColumns)
val (packed, _) = packer.packSummaryDf(result)
packed.show()

val props = Map("is-time-sorted" -> "true")

val kvStore: () => KVStore = () => {
val result = new MockKVStore()
result.create(Constants.DriftStatsTable, props)
result
}

val uploader = new SummaryUploader(packed,kvStore)
uploader.run()
//kvStore.show()
}
}

Expand Down
Loading