Skip to content

Commit 96eb4d1

Browse files
chewy-zlainikhil-zlaichewys1024piyush-zlaicoderabbitai[bot]
authored
Summary upload (#50)
## Summary Creates a Summary Uploader which uploads summary data to a KVStore. ## Checklist - [ x ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced `SummaryUploader` for uploading summary statistics to a key-value store. - Added `MockKVStore` for testing key-value store operations. - Implemented `KVStoreSemaphore` for managing concurrent access to resources. - **Enhancements** - Increased data volume in tests to improve testing scenarios. - Integrated `SummaryUploader` in `DriftTest` for uploading summary data during tests. - Enhanced control over concurrent reads and writes to DynamoDB with updated `DynamoDBKVStoreImpl`. - Refined error handling and flow in `multiPut` operations for better robustness. - Updated Spark dependency from `3.5.0` to `3.5.1` for improved stability. - Added a new constant `DriftStatsTable` for drift statistics. - **Bug Fixes** - Improved error handling for upload failures in `SummaryUploader`. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: nikhil-zlai <[email protected]> Co-authored-by: Chewy Shaw <[email protected]> Co-authored-by: Piyush Narang <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: ken-zlai <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: ezvz <[email protected]>
1 parent 9ffb264 commit 96eb4d1

File tree

7 files changed

+126
-7
lines changed

7 files changed

+126
-7
lines changed

api/src/main/scala/ai/chronon/api/Constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ object Constants {
6262
val LabelViewPropertyFeatureTable: String = "feature_table"
6363
val LabelViewPropertyKeyLabelTable: String = "label_table"
6464
val ChrononRunDs: String = "CHRONON_RUN_DS"
65+
val DriftStatsTable: String = "drift_statistics"
6566
}

build.sbt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,9 @@ lazy val spark = project
157157
crossScalaVersions := supportedVersions,
158158
libraryDependencies ++= spark_all_provided,
159159
libraryDependencies ++= spark_all.map(_ % "test"),
160-
libraryDependencies += "jakarta.servlet" % "jakarta.servlet-api" % "4.0.3"
161-
)
160+
libraryDependencies += "jakarta.servlet" % "jakarta.servlet-api" % "4.0.3",
161+
libraryDependencies += "com.google.guava" % "guava" % "33.3.1-jre"
162+
)
162163

163164
lazy val flink = project
164165
.dependsOn(aggregator.%("compile->compile;test->test"), online)
@@ -189,6 +190,7 @@ lazy val cloud_aws = project
189190
libraryDependencies += "io.circe" %% "circe-core" % circeVersion % "test",
190191
libraryDependencies += "io.circe" %% "circe-generic" % circeVersion % "test",
191192
libraryDependencies += "io.circe" %% "circe-parser" % circeVersion % "test",
193+
libraryDependencies += "com.google.guava" % "guava" % "33.3.1-jre",
192194
libraryDependencies ++= spark_all
193195
)
194196

cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import ai.chronon.online.KVStore.ListValue
99
import ai.chronon.online.KVStore.TimedValue
1010
import ai.chronon.online.Metrics
1111
import ai.chronon.online.Metrics.Context
12+
import com.google.common.util.concurrent.RateLimiter
1213
import software.amazon.awssdk.core.SdkBytes
1314
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
1415
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
@@ -30,6 +31,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse
3031

3132
import java.time.Instant
3233
import java.util
34+
import java.util.concurrent.ConcurrentHashMap
3335
import scala.concurrent.Future
3436
import scala.jdk.CollectionConverters._
3537
import scala.util.Success
@@ -64,6 +66,8 @@ object DynamoDBKVStoreConstants {
6466

6567
class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
6668
import DynamoDBKVStoreConstants._
69+
private val readRateLimiters = new ConcurrentHashMap[String, RateLimiter]()
70+
private val writeRateLimiters = new ConcurrentHashMap[String, RateLimiter]()
6771

6872
protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.KVStore).withSuffix("dynamodb")
6973

@@ -88,6 +92,9 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
8892
val rcu = getCapacityUnits(props, readCapacityUnits, defaultReadCapacityUnits)
8993
val wcu = getCapacityUnits(props, writeCapacityUnits, defaultWriteCapacityUnits)
9094

95+
readRateLimiters.put(dataset, RateLimiter.create(rcu))
96+
writeRateLimiters.put(dataset, RateLimiter.create(wcu))
97+
9198
val request =
9299
CreateTableRequest.builder
93100
.attributeDefinitions(keyAttributes.toList.asJava)
@@ -137,6 +144,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
137144
val getItemResults = getItemRequestPairs.map {
138145
case (req, getItemReq) =>
139146
Future {
147+
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
140148
val item: Try[util.Map[String, AttributeValue]] =
141149
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) {
142150
dynamoDbClient.getItem(getItemReq).item()
@@ -151,6 +159,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
151159
val queryResults = queryRequestPairs.map {
152160
case (req, queryRequest) =>
153161
Future {
162+
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
154163
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) {
155164
dynamoDbClient.query(queryRequest).items()
156165
}
@@ -218,12 +227,10 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
218227
val futureResponses = datasetToWriteRequests.map {
219228
case (dataset, putItemRequest) =>
220229
Future {
230+
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire()
221231
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) {
222232
dynamoDbClient.putItem(putItemRequest)
223-
}.recover {
224-
case _: Exception => false
225-
}
226-
true
233+
}.isSuccess
227234
}
228235
}
229236
Future.sequence(futureResponses)

online/src/main/scala/ai/chronon/online/Api.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ object KVStore {
6464
trait KVStore {
6565
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
6666
implicit val executionContext: ExecutionContext = FlexibleExecutionContext.buildExecutionContext
67-
6867
def create(dataset: String): Unit
6968

7069
def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package ai.chronon.spark.stats.drift
2+
import ai.chronon.api.Constants
3+
import ai.chronon.online.KVStore
4+
import ai.chronon.online.KVStore.PutRequest
5+
import ai.chronon.spark.TableUtils
6+
import org.apache.spark.sql.DataFrame
7+
import org.apache.spark.sql.types
8+
9+
import scala.concurrent.ExecutionContext.Implicits.global
10+
import scala.concurrent.Future
11+
12+
class SummaryUploader(summaryDF: DataFrame, kvStoreFunc: () => KVStore, putsPerRequest: Int = 100)(implicit
13+
tu: TableUtils)
14+
extends Serializable {
15+
val completed_schema: types.StructType = types.StructType(
16+
Seq(
17+
types.StructField(tu.partitionColumn, types.StringType, nullable = false)
18+
)
19+
)
20+
private val statsTableName = Constants.DriftStatsTable
21+
22+
def run(): Unit = {
23+
// Validate schema
24+
val requiredColumns = Seq("keyBytes", "valueBytes", "timestamp")
25+
val missingColumns = requiredColumns.filterNot(summaryDF.columns.contains)
26+
require(missingColumns.isEmpty, s"Missing required columns: ${missingColumns.mkString(", ")}")
27+
28+
summaryDF.rdd.foreachPartition(rows => {
29+
val kvStore: KVStore = kvStoreFunc()
30+
val putRequests = new scala.collection.mutable.ArrayBuffer[PutRequest]
31+
for (row <- rows) {
32+
putRequests += PutRequest(
33+
Option(row.getAs[Array[Byte]]("keyBytes")).getOrElse(Array.empty[Byte]),
34+
Option(row.getAs[Array[Byte]]("valueBytes")).getOrElse(Array.empty[Byte]),
35+
statsTableName,
36+
Option(row.getAs[Long]("timestamp"))
37+
)
38+
}
39+
40+
val futureResults = putRequests.grouped(putsPerRequest).map { batch =>
41+
kvStore.multiPut(batch.toList).map { result =>
42+
if (!result.forall(identity)) {
43+
throw new RuntimeException(s"Failed to put ${result.count(!_)} records")
44+
}
45+
}
46+
}
47+
48+
val aggregatedFuture = Future.sequence(futureResults.toSeq)
49+
aggregatedFuture.onComplete {
50+
case scala.util.Success(_) => // All operations completed successfully
51+
case scala.util.Failure(e: IllegalArgumentException) =>
52+
throw new IllegalArgumentException(s"Invalid request data: ${e.getMessage}", e)
53+
case scala.util.Failure(e: java.io.IOException) =>
54+
throw new RuntimeException(s"KVStore I/O error: ${e.getMessage}", e)
55+
case scala.util.Failure(e) =>
56+
throw new RuntimeException(s"Failed to upload summary statistics: ${e.getMessage}", e)
57+
}
58+
})
59+
}
60+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package ai.chronon.spark.test
2+
3+
import ai.chronon.online.KVStore
4+
5+
import scala.collection.mutable
6+
import scala.concurrent.Future
7+
8+
class MockKVStore() extends KVStore with Serializable {
9+
val num_puts: mutable.Map[String,Int] = collection.mutable.Map[String, Int]()
10+
11+
def bulkPut(sourceOfflineTable: String,destinationOnlineDataSet: String,partition: String): Unit =
12+
throw new UnsupportedOperationException("Not implemented in mock")
13+
def create(dataset: String): Unit =
14+
{
15+
num_puts(dataset) = 0
16+
}
17+
def multiGet(requests: Seq[ai.chronon.online.KVStore.GetRequest]): scala.concurrent.Future[Seq[ai.chronon.online.KVStore.GetResponse]] =
18+
throw new UnsupportedOperationException("Not implemented in mock")
19+
def multiPut(keyValueDatasets: Seq[ai.chronon.online.KVStore.PutRequest]): scala.concurrent.Future[Seq[Boolean]] = {
20+
logger.info(s"Triggering multiput for ${keyValueDatasets.size}: rows")
21+
for (req <- keyValueDatasets if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty)) num_puts(req.dataset) += 1
22+
23+
val futureResponses = keyValueDatasets.map { req =>
24+
if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty) Future{true}
25+
else Future{false}
26+
}
27+
Future.sequence(futureResponses)
28+
}
29+
30+
def show(): Unit = {
31+
num_puts.foreach(x => logger.info(s"Ran ${x._2} non-empty put actions for dataset ${x._1}"))
32+
33+
}
34+
}

spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@ package ai.chronon.spark.test.stats.drift
33
import ai.chronon.aggregator.test.Column
44
import ai.chronon.api
55
import ai.chronon.api.ColorPrinter.ColorString
6+
import ai.chronon.api.Constants
67
import ai.chronon.api.Extensions.MetadataOps
8+
import ai.chronon.online.KVStore
79
import ai.chronon.spark.Extensions._
810
import ai.chronon.spark.SparkSessionBuilder
911
import ai.chronon.spark.TableUtils
1012
import ai.chronon.spark.stats.drift.Summarizer
1113
import ai.chronon.spark.stats.drift.SummaryPacker
14+
import ai.chronon.spark.stats.drift.SummaryUploader
1215
import ai.chronon.spark.test.DataFrameGen
16+
import ai.chronon.spark.test.MockKVStore
1317
import org.apache.spark.sql.DataFrame
1418
import org.apache.spark.sql.SparkSession
1519
import org.scalatest.flatspec.AnyFlatSpec
@@ -36,6 +40,18 @@ class DriftTest extends AnyFlatSpec with Matchers {
3640
val packer = new SummaryPacker("drift_test_basic", summaryExprs, summarizer.tileSize, summarizer.sliceColumns)
3741
val (packed, _) = packer.packSummaryDf(result)
3842
packed.show()
43+
44+
val props = Map("is-time-sorted" -> "true")
45+
46+
val kvStore: () => KVStore = () => {
47+
val result = new MockKVStore()
48+
result.create(Constants.DriftStatsTable, props)
49+
result
50+
}
51+
52+
val uploader = new SummaryUploader(packed,kvStore)
53+
uploader.run()
54+
//kvStore.show()
3955
}
4056
}
4157

0 commit comments

Comments
 (0)