Skip to content

Commit 5469052

Browse files
chore: add bazel build file for cloud_aws (#343)
## Summary - Adding new BUILD for cloud_aws - Adding the above to the CI/CD ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced AWS integration with a new library for AWS functionality and a framework for job submission. - Introduced a new utility for managing job submissions, statuses, and terminations. - Added dedicated triggers for cloud modules to improve workflow automation. - **Tests** - Improved testing coverage with additional utilities for validating cloud functionalities and increased timeout settings for asynchronous operations. - **Chores** - Updated dependency configurations to incorporate essential AWS SDK components. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 782b0e1 commit 5469052

File tree

7 files changed

+148
-78
lines changed

7 files changed

+148
-78
lines changed

.bazelrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ build --java_language_version=11
44
build --java_runtime_version=11
55
build --remote_cache=https://storage.googleapis.com/zipline-bazel-cache
66
test --test_output=errors
7-
test --test_timeout=900
7+
test --test_timeout=900

cloud_aws/BUILD.bazel

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
scala_library(
2+
name = "cloud_aws_lib",
3+
srcs = glob(["src/main/**/*.scala"]),
4+
visibility = ["//visibility:public"],
5+
format = True,
6+
deps = [
7+
maven_artifact("software.amazon.awssdk:dynamodb"),
8+
maven_artifact("software.amazon.awssdk:regions"),
9+
maven_artifact("software.amazon.awssdk:aws-core"),
10+
maven_artifact("software.amazon.awssdk:sdk-core"),
11+
maven_artifact("software.amazon.awssdk:utils"),
12+
maven_artifact("com.google.guava:guava"),
13+
maven_artifact("org.slf4j:slf4j-api"),
14+
maven_scala_artifact("org.scala-lang.modules:scala-collection-compat"),
15+
"//spark:lib",
16+
"//online:lib",
17+
"//api:lib",
18+
"//api:thrift_java",
19+
],
20+
)
21+
22+
test_deps = [
23+
":cloud_aws_lib",
24+
"//online:lib",
25+
maven_artifact("software.amazon.awssdk:dynamodb"),
26+
maven_artifact("software.amazon.awssdk:regions"),
27+
maven_artifact("software.amazon.awssdk:aws-core"),
28+
maven_artifact("software.amazon.awssdk:sdk-core"),
29+
maven_artifact("software.amazon.awssdk:utils"),
30+
maven_artifact("software.amazon.awssdk:auth"),
31+
maven_artifact("software.amazon.awssdk:identity-spi"),
32+
maven_scala_artifact("org.typelevel:cats-core"),
33+
maven_artifact("com.amazonaws:DynamoDBLocal"),
34+
maven_scala_artifact("com.chuusai:shapeless"),
35+
] + _CIRCE_DEPS + _SCALA_TEST_DEPS
36+
37+
scala_library(
38+
name = "test_lib",
39+
srcs = glob(["src/test/**/*.scala"]),
40+
format = True,
41+
visibility = ["//visibility:public"],
42+
deps = test_deps,
43+
)
44+
45+
46+
scala_test_suite(
47+
name = "tests",
48+
srcs = glob(["src/test/**/*.scala"]),
49+
# defined in prelude_bazel file
50+
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
51+
visibility = ["//visibility:public"],
52+
deps = test_deps + [":test_lib"],
53+
)

cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,22 @@
11
package ai.chronon.integrations.aws
22

3-
import ai.chronon.online.Api
4-
import ai.chronon.online.ExternalSourceRegistry
5-
import ai.chronon.online.GroupByServingInfoParsed
6-
import ai.chronon.online.KVStore
7-
import ai.chronon.online.LoggableResponse
8-
import ai.chronon.online.Serde
3+
import ai.chronon.online._
94
import software.amazon.awssdk.regions.Region
105
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
116

127
import java.net.URI
138

14-
/**
15-
* Implementation of Chronon's API interface for AWS. This is a work in progress and currently just covers the
9+
/** Implementation of Chronon's API interface for AWS. This is a work in progress and currently just covers the
1610
* DynamoDB based KV store implementation.
1711
*/
1812
class AwsApiImpl(conf: Map[String, String]) extends Api(conf) {
1913
@transient lazy val ddbClient: DynamoDbClient = {
2014
var builder = DynamoDbClient
2115
.builder()
16+
2217
sys.env.get("AWS_DEFAULT_REGION").foreach { region =>
2318
try {
24-
builder = builder.region(Region.of(region))
19+
builder.region(Region.of(region))
2520
} catch {
2621
case e: IllegalArgumentException =>
2722
throw new IllegalArgumentException(s"Invalid AWS region format: $region", e)
@@ -43,21 +38,18 @@ class AwsApiImpl(conf: Map[String, String]) extends Api(conf) {
4338
new DynamoDBKVStoreImpl(ddbClient)
4439
}
4540

46-
/**
47-
* The stream decoder method in the AwsApi is currently unimplemented. This needs to be implemented before
41+
/** The stream decoder method in the AwsApi is currently unimplemented. This needs to be implemented before
4842
* we can spin up the Aws streaming Chronon stack
4943
*/
5044
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): Serde = ???
5145

52-
/**
53-
* The external registry extension is currently unimplemented. We'll need to implement this prior to spinning up
46+
/** The external registry extension is currently unimplemented. We'll need to implement this prior to spinning up
5447
* a fully functional Chronon serving stack in Aws
5548
* @return
5649
*/
5750
override def externalRegistry: ExternalSourceRegistry = ???
5851

59-
/**
60-
* The logResponse method is currently unimplemented. We'll need to implement this prior to bringing up the
52+
/** The logResponse method is currently unimplemented. We'll need to implement this prior to bringing up the
6153
* fully functional serving stack in Aws which includes logging feature responses to a stream for OOC
6254
*/
6355
override def logResponse(resp: LoggableResponse): Unit = ???

cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -141,31 +141,29 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
141141
// timestamp to use for all get responses when the underlying tables don't have a ts field
142142
val defaultTimestamp = Instant.now().toEpochMilli
143143

144-
val getItemResults = getItemRequestPairs.map {
145-
case (req, getItemReq) =>
146-
Future {
147-
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
148-
val item: Try[util.Map[String, AttributeValue]] =
149-
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) {
150-
dynamoDbClient.getItem(getItemReq).item()
151-
}
152-
153-
val response = item.map(i => List(i).asJava)
154-
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(response, defaultTimestamp)
155-
GetResponse(req, resultValue)
156-
}
144+
val getItemResults = getItemRequestPairs.map { case (req, getItemReq) =>
145+
Future {
146+
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
147+
val item: Try[util.Map[String, AttributeValue]] =
148+
handleDynamoDbOperation(metricsContext.withSuffix("multiget"), req.dataset) {
149+
dynamoDbClient.getItem(getItemReq).item()
150+
}
151+
152+
val response = item.map(i => List(i).asJava)
153+
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(response, defaultTimestamp)
154+
GetResponse(req, resultValue)
155+
}
157156
}
158157

159-
val queryResults = queryRequestPairs.map {
160-
case (req, queryRequest) =>
161-
Future {
162-
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
163-
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) {
164-
dynamoDbClient.query(queryRequest).items()
165-
}
166-
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(responses, defaultTimestamp)
167-
GetResponse(req, resultValue)
158+
val queryResults = queryRequestPairs.map { case (req, queryRequest) =>
159+
Future {
160+
readRateLimiters.computeIfAbsent(req.dataset, _ => RateLimiter.create(defaultReadCapacityUnits)).acquire()
161+
val responses = handleDynamoDbOperation(metricsContext.withSuffix("query"), req.dataset) {
162+
dynamoDbClient.query(queryRequest).items()
168163
}
164+
val resultValue: Try[Seq[TimedValue]] = extractTimedValues(responses, defaultTimestamp)
165+
GetResponse(req, resultValue)
166+
}
169167
}
170168

171169
Future.sequence(getItemResults ++ queryResults)
@@ -224,20 +222,18 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
224222
(req.dataset, putItemReq)
225223
}
226224

227-
val futureResponses = datasetToWriteRequests.map {
228-
case (dataset, putItemRequest) =>
229-
Future {
230-
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire()
231-
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) {
232-
dynamoDbClient.putItem(putItemRequest)
233-
}.isSuccess
234-
}
225+
val futureResponses = datasetToWriteRequests.map { case (dataset, putItemRequest) =>
226+
Future {
227+
writeRateLimiters.computeIfAbsent(dataset, _ => RateLimiter.create(defaultWriteCapacityUnits)).acquire()
228+
handleDynamoDbOperation(metricsContext.withSuffix("multiput"), dataset) {
229+
dynamoDbClient.putItem(putItemRequest)
230+
}.isSuccess
231+
}
235232
}
236233
Future.sequence(futureResponses)
237234
}
238235

239-
/**
240-
* Implementation of bulkPut is currently a TODO for the DynamoDB store. This involves transforming the underlying
236+
/** Implementation of bulkPut is currently a TODO for the DynamoDB store. This involves transforming the underlying
241237
* Parquet data to Amazon's Ion format + swapping out old table for new (as bulkLoad only writes to new tables)
242238
*/
243239
override def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit = ???
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package ai.chronon.integrations.aws
2+
3+
import ai.chronon.spark.{JobSubmitter, JobType}
4+
5+
class LivySubmitter extends JobSubmitter {
6+
7+
override def submit(jobType: JobType,
8+
jobProperties: Map[String, String],
9+
files: List[String],
10+
args: String*): String = ???
11+
12+
override def status(jobId: String): Unit = ???
13+
14+
override def kill(jobId: String): Unit = ???
15+
}

cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,43 @@
11
package ai.chronon.integrations.aws
22

3-
import ai.chronon.online.KVStore.GetRequest
4-
import ai.chronon.online.KVStore.GetResponse
5-
import ai.chronon.online.KVStore.ListRequest
6-
import ai.chronon.online.KVStore.ListValue
7-
import ai.chronon.online.KVStore.PutRequest
3+
import ai.chronon.online.KVStore._
84
import com.amazonaws.services.dynamodbv2.local.main.ServerRunner
95
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer
106
import io.circe.generic.auto._
7+
import io.circe.generic.semiauto._
118
import io.circe.parser._
129
import io.circe.syntax._
13-
import org.scalatest.BeforeAndAfter
10+
import io.circe.{Decoder, Encoder}
11+
import org.scalatest.BeforeAndAfterAll
1412
import org.scalatest.flatspec.AnyFlatSpec
15-
import org.scalatest.matchers.must.Matchers.be
1613
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
17-
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
18-
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
14+
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
1915
import software.amazon.awssdk.regions.Region
2016
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
2117

2218
import java.net.URI
2319
import java.nio.charset.StandardCharsets
2420
import scala.concurrent.Await
2521
import scala.concurrent.duration.DurationInt
26-
import scala.util.Failure
27-
import scala.util.Success
28-
import scala.util.Try
22+
import scala.util.{Failure, Success, Try}
2923

30-
// different types of tables to store
31-
case class Model(modelId: String, modelName: String, online: Boolean)
32-
case class TimeSeries(joinName: String, featureName: String, tileTs: Long, metric: String, summary: Array[Double])
24+
object DDBTestUtils {
3325

34-
class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
26+
// different types of tables to store
27+
case class Model(modelId: String, modelName: String, online: Boolean)
28+
case class TimeSeries(joinName: String, featureName: String, tileTs: Long, metric: String, summary: Array[Double])
3529

30+
}
31+
class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfterAll {
32+
33+
import DDBTestUtils._
3634
import DynamoDBKVStoreConstants._
3735

36+
implicit val modelEncoder: Encoder[Model] = deriveEncoder[Model]
37+
implicit val modelDecoder: Decoder[Model] = deriveDecoder[Model]
38+
implicit val tsEncoder: Encoder[TimeSeries] = deriveEncoder[TimeSeries]
39+
implicit val tsDecoder: Decoder[TimeSeries] = deriveDecoder[TimeSeries]
40+
3841
var server: DynamoDBProxyServer = _
3942
var client: DynamoDbClient = _
4043
var kvStoreImpl: DynamoDBKVStoreImpl = _
@@ -55,7 +58,7 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
5558
series.asJson.noSpaces.getBytes(StandardCharsets.UTF_8)
5659
}
5760

58-
before {
61+
override def beforeAll(): Unit = {
5962
// Start the local DynamoDB instance
6063
server = ServerRunner.createServerFromCommandLineArgs(Array("-inMemory", "-port", "8000"))
6164
server.start()
@@ -72,9 +75,9 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
7275
.build()
7376
}
7477

75-
after {
76-
client.close()
77-
server.stop()
78+
override def afterAll(): Unit = {
79+
// client.close()
80+
// server.stop()
7881
}
7982

8083
// Test creation of a table with primary keys only (e.g. model)
@@ -115,20 +118,20 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
115118
buildModelPutRequest(model, dataset)
116119
}
117120

118-
val putResults = Await.result(kvStore.multiPut(putReqs), 1.second)
121+
val putResults = Await.result(kvStore.multiPut(putReqs), 1.minute)
119122
putResults.length shouldBe putReqs.length
120123
putResults.foreach(r => r shouldBe true)
121124

122125
// call list - first call is only for 10 elements
123126
val listReq1 = ListRequest(dataset, Map(listLimit -> 10))
124-
val listResults1 = Await.result(kvStore.list(listReq1), 1.second)
127+
val listResults1 = Await.result(kvStore.list(listReq1), 1.minute)
125128
listResults1.resultProps.contains(continuationKey) shouldBe true
126129
validateExpectedListResponse(listResults1.values, 10)
127130

128131
// call list - with continuation key
129132
val listReq2 =
130133
ListRequest(dataset, Map(listLimit -> 100, continuationKey -> listResults1.resultProps(continuationKey)))
131-
val listResults2 = Await.result(kvStore.list(listReq2), 1.second)
134+
val listResults2 = Await.result(kvStore.list(listReq2), 1.minute)
132135
listResults2.resultProps.contains(continuationKey) shouldBe false
133136
validateExpectedListResponse(listResults2.values, 100)
134137
}
@@ -148,17 +151,17 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
148151
val putReq2 = buildModelPutRequest(model2, dataset)
149152
val putReq3 = buildModelPutRequest(model3, dataset)
150153

151-
val putResults = Await.result(kvStore.multiPut(Seq(putReq1, putReq2, putReq3)), 1.second)
154+
val putResults = Await.result(kvStore.multiPut(Seq(putReq1, putReq2, putReq3)), 1.minute)
152155
putResults shouldBe Seq(true, true, true)
153156

154157
// let's try and read these
155158
val getReq1 = buildModelGetRequest(model1, dataset)
156159
val getReq2 = buildModelGetRequest(model2, dataset)
157160
val getReq3 = buildModelGetRequest(model3, dataset)
158161

159-
val getResult1 = Await.result(kvStore.multiGet(Seq(getReq1)), 1.second)
160-
val getResult2 = Await.result(kvStore.multiGet(Seq(getReq2)), 1.second)
161-
val getResult3 = Await.result(kvStore.multiGet(Seq(getReq3)), 1.second)
162+
val getResult1 = Await.result(kvStore.multiGet(Seq(getReq1)), 1.minute)
163+
val getResult2 = Await.result(kvStore.multiGet(Seq(getReq2)), 1.minute)
164+
val getResult3 = Await.result(kvStore.multiGet(Seq(getReq3)), 1.minute)
162165

163166
validateExpectedModelResponse(model1, getResult1)
164167
validateExpectedModelResponse(model2, getResult2)
@@ -178,13 +181,13 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
178181

179182
// write to the kv store and confirm the writes were successful
180183
val putRequests = points.map(p => buildTSPutRequest(p, dataset))
181-
val putResult = Await.result(kvStore.multiPut(putRequests), 1.second)
184+
val putResult = Await.result(kvStore.multiPut(putRequests), 1.minute)
182185
putResult.length shouldBe tsRange.length
183186
putResult.foreach(r => r shouldBe true)
184187

185188
// query in time range: 10/05/24 00:00 to 10/10
186189
val getRequest1 = buildTSGetRequest(points.head, dataset, 1728086400000L, 1728518400000L)
187-
val getResult1 = Await.result(kvStore.multiGet(Seq(getRequest1)), 1.second)
190+
val getResult1 = Await.result(kvStore.multiGet(Seq(getRequest1)), 1.minute)
188191
validateExpectedTimeSeriesResponse(points.head, 1728086400000L, 1728518400000L, getResult1)
189192
}
190193

@@ -231,7 +234,7 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
231234
private def validateExpectedListResponse(response: Try[Seq[ListValue]], maxElements: Int): Unit = {
232235
response match {
233236
case Success(mSeq) =>
234-
mSeq.length should be <= maxElements
237+
mSeq.length <= maxElements shouldBe true
235238
mSeq.foreach { modelKV =>
236239
val jsonStr = new String(modelKV.valueBytes, StandardCharsets.UTF_8)
237240
val returnedModel = decode[Model](jsonStr)

tools/build_rules/dependencies/maven_repository.bzl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ maven_repository = repository(
8383
# Hadoop
8484
"org.apache.hadoop:hadoop-client-api:3.3.4",
8585

86+
# AWS
87+
"software.amazon.awssdk:dynamodb:2.30.13",
88+
"software.amazon.awssdk:regions:2.30.13",
89+
"software.amazon.awssdk:aws-core:2.30.13",
90+
"software.amazon.awssdk:sdk-core:2.30.13",
91+
"software.amazon.awssdk:utils:2.30.13",
92+
"software.amazon.awssdk:auth:2.30.13",
93+
"software.amazon.awssdk:identity-spi:2.30.13",
94+
"com.amazonaws:DynamoDBLocal:2.4.0",
95+
8696
# Google Cloud
8797
"com.google.cloud:google-cloud-bigquery:2.42.0",
8898
"com.google.cloud:google-cloud-bigtable:2.41.0",
@@ -123,6 +133,7 @@ maven_repository = repository(
123133
"io.circe:circe-core_2.12:0.14.9",
124134
"io.circe:circe-generic_2.12:0.14.9",
125135
"io.circe:circe-parser_2.12:0.14.9",
136+
"com.chuusai:shapeless_2.12:2.3.12",
126137
],
127138
excluded_artifacts = [
128139
"org.pentaho:pentaho-aggdesigner-algorithm",

0 commit comments

Comments
 (0)