-
Notifications
You must be signed in to change notification settings - Fork 0
feat: support providing additional confs as yaml file for Driver.scala #164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces comprehensive updates across multiple files in the Chronon project, focusing on dependency management, configuration handling, and testing infrastructure. The changes primarily involve updating Google Cloud Dataproc and GCS connector versions, adding JSON and YAML processing libraries, enhancing configuration parsing, and refactoring job submission and authentication mechanisms. Changes
Possibly related PRs
Suggested Reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
6110df3
to
8cca9cc
Compare
8cca9cc
to
569bd8c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor version nit, but otherwise looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
155-177
: Improve error handling in YAML parsingThe current error handling only throws the parse error. Consider providing more context.
Apply this diff:
.map { - case Right(v) => v - case Left(e) => throw e + case Right(v) => v + case Left(e) => throw new RuntimeException(s"Failed to parse YAML config from $additionalConfPath: ${e.getMessage}", e) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (8)
build.sbt
(2 hunks)cloud_gcp/src/main/resources/additional-confs.yaml
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(4 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
(2 hunks)spark/src/test/resources/test-driver-additional-confs.yaml
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- cloud_gcp/src/main/resources/additional-confs.yaml
- spark/src/test/resources/test-driver-additional-confs.yaml
- spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
- spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala
- build.sbt
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/Driver.scala (3)
72-73
: LGTM!Required imports for YAML parsing support.
92-94
: LGTM!Well-documented optional parameter for additional configurations.
152-152
: LGTM!Good encapsulation by making isLocal protected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
156-168
: Enhance error handling and simplify YAML parsing.Consider these improvements:
- Add try-catch for YAML parsing errors
- Use type-safe conversions
- Chain map operations
- val additionalConfs = additionalConfPath.toOption - .map(Source.fromFile) - .map((src) => - try { src.mkString } - finally { src.close }) - .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) - .map((map) => Extraction.decompose(map.asScala.toMap)) - .map((v) => render(v)) - .map(compact) - .map((str) => parse(str).extract[Map[String, String]]) + val additionalConfs = additionalConfPath.toOption.map { path => + try { + val src = Source.fromFile(path) + try { + val yaml = yamlLoader.load(src.mkString).asInstanceOf[java.util.Map[String, Any]] + parse(compact(render(Extraction.decompose(yaml.asScala.toMap)))).extract[Map[String, String]] + } catch { + case e: Exception => + logger.error(s"Failed to parse YAML config from $path", e) + throw e + } finally { + src.close() + } + } + }spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (1)
88-91
: Extract test constants.Move hardcoded values to constants for better maintainability.
+ private val ExpectedFormatProvider = "ai.chronon.integrations.cloud_gcp.GcpFormatProvider" + private val FormatProviderKey = "spark.chronon.table.format_provider.class" + @Test def additionalConfsParsedCorrectly(): Unit = { ... - val confKey = "spark.chronon.table.format_provider.class" - assertEquals(confs.get(confKey), sparkSession.conf.getOption(confKey)) - assertEquals(Some("ai.chronon.integrations.cloud_gcp.GcpFormatProvider"), sparkSession.conf.getOption(confKey)) + assertEquals(confs.get(FormatProviderKey), sparkSession.conf.getOption(FormatProviderKey)) + assertEquals(Some(ExpectedFormatProvider), sparkSession.conf.getOption(FormatProviderKey))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (7)
build.sbt
(3 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
(6 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
🚧 Files skipped from review as they are similar to previous changes (1)
- build.sbt
🔇 Additional comments (11)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala (4)
9-9
: Good import
Helps to manage FS configuration.
15-15
: Clean addition
No concerns.
16-16
: Looks fine
Ensures property-type checks.
40-46
: Nice coverage
Verifies critical GCP classes.spark/src/main/scala/ai/chronon/spark/Driver.scala (3)
72-74
: LGTM: Required imports for YAML support added.
93-94
: LGTM: Well-documented optional configuration parameter.
171-178
: Verify SparkSession configuration handling.Ensure that SparkSession correctly applies the additional configurations.
✅ Verification successful
SparkSession configuration handling is properly implemented
The implementation correctly:
- Merges additional configs with format-specific configs
- Applies all configs to the SparkSession builder
- Handles both local and non-local session configurations
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if SparkSessionBuilder handles configuration correctly ast-grep --pattern 'SparkSessionBuilder.build($_, $_, $_, $_, $_)'Length of output: 745
Script:
#!/bin/bash # Find SparkSessionBuilder implementation ast-grep --pattern 'object SparkSessionBuilder { $$$ }' # Check for additional configuration handling rg -A 10 "additionalConfig"Length of output: 21728
spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (2)
25-30
: LGTM: Required imports added for YAML parsing.
39-41
: LGTM: Simplified implementation using parent class.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)
31-31
: LGTM: Clean transition to JobSubmitter
110-110
: LGTM: Consistent with JobSubmitter transition
spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
Outdated
Show resolved
Hide resolved
test("Used to iterate locally. Do not enable this in CI/CD!") { | ||
|
||
val submitter = DataprocSubmitter() | ||
val submittedJobId = | ||
submitter.submit(List("gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/training_set.v1"), | ||
"join", | ||
"--end-date=2024-12-10", | ||
"--conf-path=training_set.v1") | ||
submitter.submit( | ||
List("gs://zipline-jars/training_set.v1", "gs://zipline-jars/dataproc-submitter-conf.yaml"), | ||
"join", | ||
"--end-date=2024-12-10", | ||
"--additional-conf-path=dataproc-submitter-conf.yaml", | ||
"--conf-path=training_set.v1" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @ignore annotation and parameterize test configuration
- Add @ignore to prevent CI runs
- Move GCS paths to test configuration
- Consider converting to proper integration test
+@Ignore("Local development only - Do not enable in CI/CD")
test("Used to iterate locally. Do not enable this in CI/CD!") {
+ val config = TestConfig.load() // Add test configuration class
val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(
- List("gs://zipline-jars/training_set.v1", "gs://zipline-jars/dataproc-submitter-conf.yaml"),
+ List(config.trainingJarPath, config.configPath),
"join",
"--end-date=2024-12-10",
- "--additional-conf-path=dataproc-submitter-conf.yaml",
- "--conf-path=training_set.v1"
+ s"--additional-conf-path=${config.configFileName}",
+ s"--conf-path=${config.trainingFileName}"
)
Committable suggestion skipped: line range outside the PR's diff.
private[cloud_gcp] def loadConfig: SubmitterConf = { | ||
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")) | ||
val yamlLoader = new Yaml() | ||
implicit val formats: Formats = DefaultFormats | ||
isO | ||
.map(Source.fromInputStream) | ||
.map((is) => | ||
try { is.mkString } | ||
finally { is.close }) | ||
.map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) | ||
.map((jMap) => Extraction.decompose(jMap.asScala.toMap)) | ||
.map((jVal) => render(jVal)) | ||
.map(compact) | ||
.map(parse(_).extract[SubmitterConf]) | ||
.getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling and resource management
- Use try-with-resources for Source
- Add specific error handling for YAML parsing
- Consider using safer type conversion
private[cloud_gcp] def loadConfig: SubmitterConf = {
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
val yamlLoader = new Yaml()
implicit val formats: Formats = DefaultFormats
isO
- .map(Source.fromInputStream)
- .map((is) =>
- try { is.mkString }
- finally { is.close })
+ .map { inputStream =>
+ try {
+ val source = Source.fromInputStream(inputStream)
+ try {
+ val content = source.mkString
+ val yaml = yamlLoader.load(content)
+ Option(yaml).map(_.asInstanceOf[java.util.Map[String, Any]])
+ .getOrElse(throw new IllegalArgumentException("Invalid YAML structure"))
+ } catch {
+ case e: Exception => throw new IllegalArgumentException(s"YAML parsing failed: ${e.getMessage}")
+ } finally {
+ source.close()
+ inputStream.close()
+ }
+ }
+ }
.map((jMap) => Extraction.decompose(jMap.asScala.toMap))
.map((jVal) => render(jVal))
.map(compact)
.map(parse(_).extract[SubmitterConf])
- .getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml"))
+ .getOrElse(throw new IllegalArgumentException("Configuration file not found"))
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private[cloud_gcp] def loadConfig: SubmitterConf = { | |
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")) | |
val yamlLoader = new Yaml() | |
implicit val formats: Formats = DefaultFormats | |
isO | |
.map(Source.fromInputStream) | |
.map((is) => | |
try { is.mkString } | |
finally { is.close }) | |
.map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) | |
.map((jMap) => Extraction.decompose(jMap.asScala.toMap)) | |
.map((jVal) => render(jVal)) | |
.map(compact) | |
.map(parse(_).extract[SubmitterConf]) | |
.getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml")) | |
private[cloud_gcp] def loadConfig: SubmitterConf = { | |
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")) | |
val yamlLoader = new Yaml() | |
implicit val formats: Formats = DefaultFormats | |
isO | |
.map { inputStream => | |
try { | |
val source = Source.fromInputStream(inputStream) | |
try { | |
val content = source.mkString | |
val yaml = yamlLoader.load(content) | |
Option(yaml).map(_.asInstanceOf[java.util.Map[String, Any]]) | |
.getOrElse(throw new IllegalArgumentException("Invalid YAML structure")) | |
} catch { | |
case e: Exception => throw new IllegalArgumentException(s"YAML parsing failed: ${e.getMessage}") | |
} finally { | |
source.close() | |
inputStream.close() | |
} | |
} | |
} | |
.map((jMap) => Extraction.decompose(jMap.asScala.toMap)) | |
.map((jVal) => render(jVal)) | |
.map(compact) | |
.map(parse(_).extract[SubmitterConf]) | |
.getOrElse(throw new IllegalArgumentException("Configuration file not found")) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will handle this in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tchow-zlai Understood. Would you like me to create a GitHub issue to track these improvements for the follow-up PR?
e18a31f
to
2d9ac20
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/CatalogAwareDataPointer.scala (1)
27-41
: Reflective instantiation
Consider handling exceptions from reflection or offering fallback to avoid runtime failures.api/src/main/scala/ai/chronon/api/DataPointer.scala (1)
12-17
: Naming detail
"URIDataPointer" is clear, but consider if "FileDataPointer" better conveys intent.spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
136-136
: Check overhead of loadTable usage
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (19)
api/src/main/scala/ai/chronon/api/DataPointer.scala
(2 hunks)api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala
(2 hunks)build.sbt
(3 hunks)cloud_gcp/src/main/resources/additional-confs.yaml
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(3 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
(6 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/CatalogAwareDataPointer.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/Format.scala
(6 hunks)spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(7 hunks)spark/src/test/resources/test-driver-additional-confs.yaml
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- spark/src/test/resources/test-driver-additional-confs.yaml
- spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
- cloud_gcp/src/main/resources/additional-confs.yaml
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
- spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
🔇 Additional comments (70)
spark/src/main/scala/ai/chronon/spark/CatalogAwareDataPointer.scala (1)
8-23
: Straightforward data pointer implementation
Looks well-structured with lazy fields and a clear approach.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)
11-11
: Case class additions
New constructor fields appear consistent with usage.
Line range hint
42-67
: Partition metadata retrieval
Verify that multiple partition URIs work correctly; single-URI assumption may limit some cases.api/src/main/scala/ai/chronon/api/DataPointer.scala (2)
4-10
: Abstract class design
Good approach for extensibility.
40-59
: Parser edge cases
Double-check format extraction with dot notation in paths.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala (3)
30-36
: resolveTableName clarity
Implementation is concise and matches typical usage.
37-41
: readFormat logic
Full reliance on BigQuery is okay, but ensure no fallback needed.
67-68
: Return type
Cleanly distinguishes GCS vs BigQuery.spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
326-364
: Writer flow
Handles multiple formats nicely.
366-399
: Reader flow
Load logic is neatly compartmentalized by format.spark/src/main/scala/ai/chronon/spark/TableUtils.scala (7)
27-27
: No issues
141-141
: Looks fine
224-224
: Schema fetch approach is okay
487-487
: Consistent usage of DataPointer
646-646
: Consider concurrency in rename
735-736
: Reading via DataPointer looks cleanAlso applies to: 741-741
785-785
: Simpler apply signaturecloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3)
1-1
: Package restructure is fine
50-50
: Local iteration test is okay
54-60
: Ensure these args are not used in CIapi/src/test/scala/ai/chronon/api/test/DataPointerTest.scala (12)
4-4
: Good import
13-13
: URIDataPointer usage is correct
19-22
: BigQuery options look fine
27-27
: No issues
32-32
: Kafka pointer is correct
37-37
: CSV pointer is correct
42-43
: Key-value options parsing is good
48-49
: Parquet pointer is correct
54-54
: Dots pointer is okay
59-59
: Prefixed format is okay
64-64
: Glob partitions recognized
69-69
: Fallback to no format is correctcloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala (5)
1-14
: New test class is neat
15-19
: SparkSession build looks fine
20-38
: Partitions test checks out
40-58
: Empty partitions test is fine
60-88
: Date partition handling is correctcloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (6)
1-1
: Package reorg is fine
7-7
: Imports are okayAlso applies to: 13-13, 14-14
28-28
: Service account flag is fine
37-43
: Runtime class check test is good
52-61
: BigQuery native table test is ignored safely
63-74
: BigQuery external table test is ignored safelyspark/src/main/scala/ai/chronon/spark/Format.scala (9)
13-14
: Good naming method.
50-50
: No concerns.
65-65
: Simple pass-through logic looks good.
142-142
: No specific feedback.
143-143
: Consistent naming for Hive.
177-177
: No specific feedback.
178-178
: Consistent naming for Iceberg.
228-228
: No specific feedback.
229-229
: Consistent naming for Delta.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (5)
2-3
: Imports for new auth look fine.
6-8
: Imports for JSON4S and YAML are correct.
31-31
: Class extends new JobSubmitter interface well.
91-105
: Properly closing streams and handling YAML.
110-110
: Switch to JobAuth is straightforward.build.sbt (5)
192-196
: Additional Spark + JSON4S + YAML deps are in order.
218-218
: Dataproc version bump is fine.
221-227
: New GCP dependencies look correct.
415-418
: Log4j MergeStrategy changes seem proper.
424-425
: SLF4J implementation is consistent.spark/src/main/scala/ai/chronon/spark/Driver.scala (8)
72-74
: Imports for YAML & JSON4S are correct.
93-95
:additionalConfPath
is optional.
156-168
: YAML-based config reading is concise.
169-171
: Comment about serializers is clear.
172-178
: Using SparkSessionBuilder with extra config is good.
183-183
: LocalDataLoader usage is straightforward.
188-188
: Recursive data loading is well handled.
190-190
: Returning the session is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor callout - you need to flip your dataproc test off, otherwise changes lgtm
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
Outdated
Show resolved
Hide resolved
e6fb9e6
to
37e326a
Compare
2d9ac20
to
d6b1d3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of minor things that Piyush called out. let's put them in and then i can reapprove
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
Outdated
Show resolved
Hide resolved
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
Outdated
Show resolved
Hide resolved
e468bd2
to
44f5142
Compare
37e326a
to
9b39ab2
Compare
44f5142
to
6930112
Compare
921c124
to
b29605a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
92-92
: Rename unclear variable.
isO
is cryptic. ConsideryamlStreamOpt
orconfigStreamOpt
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (12)
build.sbt
(2 hunks)cloud_gcp/src/main/resources/additional-confs.yaml
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
(6 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
(2 hunks)spark/src/test/resources/test-driver-additional-confs.yaml
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
- spark/src/test/resources/test-driver-additional-confs.yaml
- spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala
- spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
- build.sbt
🔇 Additional comments (8)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)
2-3
: LGTM: Clean transition to generic job interfaces.The change from SparkSubmitter to JobSubmitter improves modularity.
Also applies to: 31-31
91-105
: Improve error handling and resource management.Previous suggestion about using try-with-resources and better error handling still applies.
110-110
: LGTM: Clean auth interface transition.
100-100
:⚠️ Potential issueAdd type safety to YAML parsing.
Unsafe type cast could fail at runtime. Consider using a safer parsing approach:
- .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) + .map { content => + Option(yamlLoader.load(content)) match { + case Some(yaml: java.util.Map[String, Any] @unchecked) => yaml + case _ => throw new IllegalArgumentException("Invalid YAML structure") + } + }Likely invalid or redundant comment.
cloud_gcp/src/main/resources/additional-confs.yaml (1)
1-2
: LGTM! Configuration entries are well-structured.The Spark configuration entries follow naming conventions and contain valid values.
spark/src/main/scala/ai/chronon/spark/Driver.scala (3)
72-74
: LGTM! Required imports added.Necessary imports for JSON and YAML processing.
156-167
: Extract YAML parsing logic into a reusable function.As suggested in the past review, this complex transformation chain should be extracted into a reusable function.
156-167
: 🛠️ Refactor suggestionAdd error handling for YAML parsing.
The YAML parsing chain could fail at multiple points. Consider wrapping it in a Try and providing meaningful error messages.
Apply this diff:
val additionalConfs = additionalConfPath.toOption - .map((src) => - try { src.mkString } - finally { src.close }) - .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) - .map((map) => Extraction.decompose(map.asScala.toMap)) - .map((v) => render(v)) - .map(compact) - .map((str) => parse(str).extract[Map[String, String]]) + .map(path => Try { + val src = Source.fromFile(path) + try { + val content = src.mkString + val yamlMap = yamlLoader.load(content).asInstanceOf[java.util.Map[String, Any]] + val jsonValue = Extraction.decompose(yamlMap.asScala.toMap) + parse(compact(render(jsonValue))).extract[Map[String, String]] + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to parse YAML file: $path", e) + } finally { + src.close() + } + }.getOrElse(Map.empty))Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
156-167
: Simplify the YAML parsing chain.The parsing chain with multiple transformations (YAML → Java Map → Scala Map → JSON → Map) is unnecessarily complex.
Consider this simpler approach:
- implicit val formats: Formats = DefaultFormats - val yamlLoader = new Yaml() - val additionalConfs = additionalConfPath.toOption - .map(Source.fromFile) - .map((src) => - try { src.mkString } - finally { src.close }) - .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) - .map((map) => Extraction.decompose(map.asScala.toMap)) - .map((v) => render(v)) - .map(compact) - .map((str) => parse(str).extract[Map[String, String]]) + val additionalConfs = additionalConfPath.toOption.map { path => + val yaml = new Yaml() + val source = Source.fromFile(path) + try { + yaml.load(source.mkString).asInstanceOf[java.util.Map[String, Any]] + .asScala.toMap.map { case (k, v) => k -> v.toString } + } finally { + source.close() + } + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
build.sbt
(2 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
- build.sbt
🔇 Additional comments (11)
spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (4)
21-22
: LGTM: Required imports added for YAML parsing.Also applies to: 27-31
40-42
: LGTM: Clean implementation of TestArgs methods.
76-87
: Complex YAML parsing chain needs refactoring.This was previously identified and will be addressed in a follow-up task.
89-92
: LGTM: Thorough test assertions.Good coverage of both positive and negative cases.
spark/src/main/scala/ai/chronon/spark/Driver.scala (3)
52-53
: LGTM!Required imports for YAML and JSON processing.
93-95
: LGTM!Well-documented option for additional configurations.
171-178
: LGTM!Proper integration of additional configurations into SparkSession.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)
92-92
: Rename variable for clarity.Change
inputStreamOption
toconfigFileStream
for better readability.
110-110
: LGTM!Clean transition from SparkAuth to JobAuth.
91-105
: 🛠️ Refactor suggestionImprove error handling and resource management.
Current implementation needs better resource cleanup and error handling.
private[cloud_gcp] def loadConfig: SubmitterConf = { val configFileStream = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")) val yamlLoader = new Yaml() implicit val formats: Formats = DefaultFormats configFileStream - .map(Source.fromInputStream) - .map((is) => - try { is.mkString } - finally { is.close }) + .map { inputStream => + try { + val source = Source.fromInputStream(inputStream) + try { + source.mkString + } finally { + source.close() + } + } finally { + inputStream.close() + } + } .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) .map((jMap) => Extraction.decompose(jMap.asScala.toMap)) .map((jVal) => render(jVal)) .map(compact) .map(parse(_).extract[SubmitterConf]) - .getOrElse(throw new IllegalArgumentException("Yaml conf not found or invalid yaml")) + .getOrElse(throw new IllegalArgumentException("Configuration file not found or invalid"))Likely invalid or redundant comment.
2-3
: Verify backward compatibility with SparkSubmitter.The change from SparkSubmitter to JobSubmitter might affect existing implementations.
Also applies to: 31-31
✅ Verification successful
No backward compatibility concerns with JobSubmitter migration
The change is safe as:
- No existing SparkSubmitter references found in codebase
- JobSubmitter is a simple trait with basic job management methods
- JobAuth maintains minimal interface with optional token method
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Find usages of SparkSubmitter in the codebase rg "SparkSubmitter" -lLength of output: 22
Script:
#!/bin/bash # Check for any references to SparkSubmitter or JobSubmitter in the codebase rg "extends (SparkSubmitter|JobSubmitter)" -A 2 # Look for any class definitions containing Spark or Job ast-grep --pattern 'class $_ extends $_Submitter { $$$ }'Length of output: 517
Script:
#!/bin/bash # Check historical references to SparkSubmitter rg "SparkSubmitter" --type scala # Look for any Auth related changes rg "SparkAuth|JobAuth" --type scalaLength of output: 389
Script:
#!/bin/bash # Check JobSubmitter implementation cat spark/src/main/scala/ai/chronon/spark/JobSubmitter.scalaLength of output: 290
1035250
to
baa3ab8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
91-105
:⚠️ Potential issueFix resource management in YAML parsing.
The Source stream needs proper cleanup.
Apply this diff:
private[cloud_gcp] def loadConfig: SubmitterConf = { val inputStreamOption = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")) val yamlLoader = new Yaml() implicit val formats: Formats = DefaultFormats inputStreamOption - .map(Source.fromInputStream) - .map((is) => - try { is.mkString } - finally { is.close }) + .map { inputStream => + try { + val source = Source.fromInputStream(inputStream) + try { + source.mkString + } finally { + source.close() + } + } finally { + inputStream.close() + } + } .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]]) .map((jMap) => Extraction.decompose(jMap.asScala.toMap)) .map((jVal) => render(jVal)) .map(compact) .map(parse(_).extract[SubmitterConf]) .getOrElse(throw new IllegalArgumentException("Yaml conf not found or invalid yaml")) }spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
156-167
:⚠️ Potential issueFix resource management in YAML parsing.
The Source stream needs proper cleanup.
Apply this diff:
val additionalConfs = additionalConfPath.toOption - .map(Source.fromFile) - .map((src) => - try { src.mkString } - finally { src.close }) + .map { path => + val source = Source.fromFile(path) + try { + source.mkString + } finally { + source.close() + } + } .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
156-167
: Consider extracting YAML parsing logic.The YAML parsing logic is duplicated between
DataprocSubmitter
andDriver
. Consider extracting it into a shared utility class.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (11)
build.sbt
(2 hunks)cloud_gcp/src/main/resources/additional-confs.yaml
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
(6 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
(2 hunks)spark/src/test/resources/test-driver-additional-confs.yaml
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
(3 hunks)spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
- cloud_gcp/src/main/resources/additional-confs.yaml
- spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
- spark/src/test/resources/test-driver-additional-confs.yaml
- spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: other_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: no_spark_scala_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: join_spark_tests
- GitHub Check: table_utils_delta_format_spark_tests
🔇 Additional comments (7)
build.sbt (2)
218-226
: Version updates look goodUpdated versions for cloud dependencies and added matching JSON/YAML libraries.
192-196
: Verify dependency compatibilityNew JSON and YAML dependencies added. Ensure version compatibility with Spark 3.X.X.
Run this script to check for potential conflicts:
✅ Verification successful
Dependencies are compatible with Spark 3.X.X
The json4s version is correctly pinned to match Spark's internal version, and all dependencies are properly scoped.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check for dependency conflicts rg -A 5 "json4s|snakeyaml|spark" | grep -E "version|libraryDependencies"Length of output: 16105
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)
2-3
: LGTM! Good refactoring.The transition from
SparkSubmitter
/SparkAuth
toJobSubmitter
/JobAuth
makes the code more generic and flexible.Also applies to: 31-31, 110-110
92-92
: Good variable naming!The descriptive name
inputStreamOption
is much clearer than the previousisO
.spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala (1)
87-92
: LGTM! Improved readability.The formatting changes make the code more readable by breaking parameters into separate lines.
Also applies to: 117-118, 145-146
spark/src/main/scala/ai/chronon/spark/Driver.scala (2)
93-94
: LGTM! Good addition of YAML config support.The new
additionalConfPath
option aligns with the PR objective.
171-178
: LGTM! Good integration of additional configs.The SparkSession builder properly integrates the additional YAML configurations.
ptal @piyush-zlai | @david-zlai |
#164) ## Summary - We want to be able to configure the spark jobs with additional confs when running them via `Driver.scala`. Let's thread through some conf files. ## Checklist - [x] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Dependencies** - Updated Google Cloud Dataproc dependency to 4.52.0 - Added JSON processing and YAML support dependencies - **Configuration** - Added support for additional configuration files - Introduced new configuration options for Spark table format and partition format - **Testing** - Enhanced test cases for configuration parsing - Added tests for Google Cloud runtime classes - Improved BigQuery catalog and Dataproc submitter tests - **Code Refactoring** - Renamed `SparkSubmitter` to `JobSubmitter` - Renamed `SparkAuth` to `JobAuth` - Updated configuration loading mechanisms - Streamlined visibility and access of methods in various classes <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
#164) ## Summary - We want to be able to configure the spark jobs with additional confs when running them via `Driver.scala`. Let's thread through some conf files. ## Checklist - [x] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Dependencies** - Updated Google Cloud Dataproc dependency to 4.52.0 - Added JSON processing and YAML support dependencies - **Configuration** - Added support for additional configuration files - Introduced new configuration options for Spark table format and partition format - **Testing** - Enhanced test cases for configuration parsing - Added tests for Google Cloud runtime classes - Improved BigQuery catalog and Dataproc submitter tests - **Code Refactoring** - Renamed `SparkSubmitter` to `JobSubmitter` - Renamed `SparkAuth` to `JobAuth` - Updated configuration loading mechanisms - Streamlined visibility and access of methods in various classes <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
#164) ## Summary - We want to be able to configure the spark jobs with additional confs when running them via `Driver.scala`. Let's thread through some conf files. ## Checklist - [x] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Dependencies** - Updated Google Cloud Dataproc dependency to 4.52.0 - Added JSON processing and YAML support dependencies - **Configuration** - Added support for additional configuration files - Introduced new configuration options for Spark table format and partition format - **Testing** - Enhanced test cases for configuration parsing - Added tests for Google Cloud runtime classes - Improved BigQuery catalog and Dataproc submitter tests - **Code Refactoring** - Renamed `SparkSubmitter` to `JobSubmitter` - Renamed `SparkAuth` to `JobAuth` - Updated configuration loading mechanisms - Streamlined visibility and access of methods in various classes <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
#164) ## Summary - We want to be able to configure the spark jobs with additional confs when running them via `Driver.scala`. Let's thread through some conf files. ## Cheour clientslist - [x] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Dependencies** - Updated Google Cloud Dataproc dependency to 4.52.0 - Added JSON processing and YAML support dependencies - **Configuration** - Added support for additional configuration files - Introduced new configuration options for Spark table format and partition format - **Testing** - Enhanced test cases for configuration parsing - Added tests for Google Cloud runtime classes - Improved BigQuery catalog and Dataproc submitter tests - **Code Refactoring** - Renamed `SparkSubmitter` to `JobSubmitter` - Renamed `SparkAuth` to `JobAuth` - Updated configuration loading mechanisms - Streamlined visibility and access of methods in various classes <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
Summary
Driver.scala
. Let's thread through some conf files.Checklist
Summary by CodeRabbit
Dependencies
Configuration
Testing
Code Refactoring
SparkSubmitter
toJobSubmitter
SparkAuth
toJobAuth