-
Notifications
You must be signed in to change notification settings - Fork 0
chore: add bazel build file for cloud_aws #343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
f61610b
ed6c3e5
1e649c9
9940037
4a0c6b8
1eed697
835f50c
697e78b
38de36d
dff7219
d237756
ad255db
ec77c30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
scala_library( | ||
name = "cloud_aws_lib", | ||
srcs = glob(["src/main/**/*.scala"]), | ||
visibility = ["//visibility:public"], | ||
format = True, | ||
deps = [ | ||
maven_artifact("software.amazon.awssdk:dynamodb"), | ||
maven_artifact("software.amazon.awssdk:regions"), | ||
maven_artifact("software.amazon.awssdk:aws-core"), | ||
maven_artifact("software.amazon.awssdk:sdk-core"), | ||
maven_artifact("software.amazon.awssdk:utils"), | ||
maven_artifact("com.google.guava:guava"), | ||
maven_artifact("org.slf4j:slf4j-api"), | ||
maven_scala_artifact("org.scala-lang.modules:scala-collection-compat"), | ||
"//spark:lib", | ||
"//online:lib", | ||
"//api:lib", | ||
"//api:thrift_java", | ||
], | ||
) | ||
|
||
test_deps = [ | ||
":cloud_aws_lib", | ||
"//online:lib", | ||
maven_artifact("software.amazon.awssdk:dynamodb"), | ||
maven_artifact("software.amazon.awssdk:regions"), | ||
maven_artifact("software.amazon.awssdk:aws-core"), | ||
maven_artifact("software.amazon.awssdk:sdk-core"), | ||
maven_artifact("software.amazon.awssdk:utils"), | ||
maven_artifact("software.amazon.awssdk:auth"), | ||
maven_artifact("software.amazon.awssdk:identity-spi"), | ||
maven_scala_artifact("org.typelevel:cats-core"), | ||
maven_artifact("com.amazonaws:DynamoDBLocal"), | ||
maven_scala_artifact("com.chuusai:shapeless"), | ||
] + _CIRCE_DEPS + _SCALA_TEST_DEPS | ||
|
||
scala_library( | ||
name = "test_lib", | ||
srcs = glob(["src/test/**/*.scala"]), | ||
format = True, | ||
visibility = ["//visibility:public"], | ||
deps = test_deps, | ||
) | ||
|
||
|
||
scala_test_suite( | ||
name = "tests", | ||
srcs = glob(["src/test/**/*.scala"]), | ||
# defined in prelude_bazel file | ||
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainVerify _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES is defined. 🏁 Script executed: #!/bin/bash
# Search for JVM flags definition
rg "_JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES" --type bazel Length of output: 459 Undefined _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES identified: |
||
visibility = ["//visibility:public"], | ||
deps = test_deps + [":test_lib"], | ||
) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -8,20 +8,21 @@ import ai.chronon.online.LoggableResponse | |||||
import ai.chronon.online.Serde | ||||||
import software.amazon.awssdk.regions.Region | ||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient | ||||||
import software.amazon.awssdk.core.SdkClient | ||||||
|
||||||
import java.net.URI | ||||||
|
||||||
/** | ||||||
* Implementation of Chronon's API interface for AWS. This is a work in progress and currently just covers the | ||||||
/** Implementation of Chronon's API interface for AWS. This is a work in progress and currently just covers the | ||||||
* DynamoDB based KV store implementation. | ||||||
*/ | ||||||
class AwsApiImpl(conf: Map[String, String]) extends Api(conf) { | ||||||
@transient lazy val ddbClient: DynamoDbClient = { | ||||||
var builder = DynamoDbClient | ||||||
.builder() | ||||||
|
||||||
sys.env.get("AWS_DEFAULT_REGION").foreach { region => | ||||||
try { | ||||||
builder = builder.region(Region.of(region)) | ||||||
builder // = builder.region(Region.of(region)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Restore or remove commented code. The region setting is commented out which could affect AWS operations. Either restore it or remove it completely. - builder // = builder.region(Region.of(region))
+ builder = builder.region(Region.of(region)) 📝 Committable suggestion
Suggested change
tchow-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} catch { | ||||||
case e: IllegalArgumentException => | ||||||
throw new IllegalArgumentException(s"Invalid AWS region format: $region", e) | ||||||
|
@@ -43,21 +44,18 @@ class AwsApiImpl(conf: Map[String, String]) extends Api(conf) { | |||||
new DynamoDBKVStoreImpl(ddbClient) | ||||||
} | ||||||
|
||||||
/** | ||||||
* The stream decoder method in the AwsApi is currently unimplemented. This needs to be implemented before | ||||||
/** The stream decoder method in the AwsApi is currently unimplemented. This needs to be implemented before | ||||||
* we can spin up the Aws streaming Chronon stack | ||||||
*/ | ||||||
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): Serde = ??? | ||||||
|
||||||
/** | ||||||
* The external registry extension is currently unimplemented. We'll need to implement this prior to spinning up | ||||||
/** The external registry extension is currently unimplemented. We'll need to implement this prior to spinning up | ||||||
* a fully functional Chronon serving stack in Aws | ||||||
* @return | ||||||
*/ | ||||||
override def externalRegistry: ExternalSourceRegistry = ??? | ||||||
|
||||||
/** | ||||||
* The logResponse method is currently unimplemented. We'll need to implement this prior to bringing up the | ||||||
/** The logResponse method is currently unimplemented. We'll need to implement this prior to bringing up the | ||||||
* fully functional serving stack in Aws which includes logging feature responses to a stream for OOC | ||||||
*/ | ||||||
override def logResponse(resp: LoggableResponse): Unit = ??? | ||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,15 @@ | ||||||||||||||||||||||||||
package ai.chronon.integrations.aws | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
import ai.chronon.spark.{JobSubmitter, JobType} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
class LivySubmitter extends JobSubmitter { | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add AWS Livy client initialization. Initialize Livy client in constructor or companion object. -class LivySubmitter extends JobSubmitter {
+class LivySubmitter(
+ livyEndpoint: String,
+ awsRegion: String
+) extends JobSubmitter {
+ private val livyClient = LivyClient.builder()
+ .endpoint(livyEndpoint)
+ .region(awsRegion)
+ .build() 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
override def submit(jobType: JobType, | ||||||||||||||||||||||||||
jobProperties: Map[String, String], | ||||||||||||||||||||||||||
files: List[String], | ||||||||||||||||||||||||||
args: String*): String = ??? | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
Comment on lines
+7
to
+11
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implement required methods. All three overridden methods are unimplemented. Please provide implementations for:
Would you like me to help implement these methods with proper AWS Livy integration? Also applies to: 12-13, 14-15 |
||||||||||||||||||||||||||
override def status(jobId: String): Unit = ??? | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
override def kill(jobId: String): Unit = ??? | ||||||||||||||||||||||||||
} |
Uh oh!
There was an error while loading. Please reload this page.