Skip to content

feat: support providing additional confs as yaml file for Driver.scala #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jan 7, 2025

Conversation

tchow-zlai
Copy link
Collaborator

@tchow-zlai tchow-zlai commented Dec 30, 2024

Summary

  • We want to be able to configure the spark jobs with additional confs when running them via Driver.scala. Let's thread through some conf files.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • Dependencies

    • Updated Google Cloud Dataproc dependency to 4.52.0
    • Added JSON processing and YAML support dependencies
  • Configuration

    • Added support for additional configuration files
    • Introduced new configuration options for Spark table format and partition format
  • Testing

    • Enhanced test cases for configuration parsing
    • Added tests for Google Cloud runtime classes
    • Improved BigQuery catalog and Dataproc submitter tests
  • Code Refactoring

    • Renamed SparkSubmitter to JobSubmitter
    • Renamed SparkAuth to JobAuth
    • Updated configuration loading mechanisms
    • Streamlined visibility and access of methods in various classes

Copy link

coderabbitai bot commented Dec 30, 2024

Walkthrough

This pull request introduces comprehensive updates across multiple files in the Chronon project, focusing on dependency management, configuration handling, and testing infrastructure. The changes primarily involve updating Google Cloud Dataproc and GCS connector versions, adding JSON and YAML processing libraries, enhancing configuration parsing, and refactoring job submission and authentication mechanisms.

Changes

File Change Summary
build.sbt - Updated Google Cloud Dataproc to 4.52.0
- Changed GCS connector to hadoop3-2.2.26
- Added JSON and YAML processing dependencies (json4s and snakeyaml)
cloud_gcp/src/main/resources/additional-confs.yaml - Added configuration for format provider and partition format
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala - Replaced SparkAuth and SparkSubmitter with JobAuth and JobSubmitter
- Updated YAML configuration parsing with snakeyaml
spark/src/main/scala/ai/chronon/spark/Driver.scala - Added optional configuration path
- Modified Spark session building logic to use additional configurations
spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala - Renamed SparkSubmitter to JobSubmitter
- Renamed SparkAuth to JobAuth
spark/src/test/resources/test-driver-additional-confs.yaml - Added configuration entry for test
spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala - Updated test for additional configuration parsing
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala - Changed package structure and added new test cases
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala - Enhanced job submission test with multiple URIs

Possibly related PRs

Suggested Reviewers

  • nikhil-zlai
  • piyush-zlai
  • chewy-zlai

Poem

🚀 Dependencies dance and twirl,
Configs bloom like a code-driven pearl.
YAML whispers, Spark takes flight,
Refactoring code with pure delight!
A symphony of bytes and might! 🌟

Warning

Review ran into problems

🔥 Problems

GitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@tchow-zlai tchow-zlai changed the base branch from tchow/bq-support-10 to main December 30, 2024 08:14
@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch from 6110df3 to 8cca9cc Compare December 30, 2024 08:14
@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch from 8cca9cc to 569bd8c Compare January 2, 2025 05:19
Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor version nit, but otherwise looks good

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)

155-177: Improve error handling in YAML parsing

The current error handling only throws the parse error. Consider providing more context.

Apply this diff:

    .map {
-     case Right(v) => v
-     case Left(e)  => throw e
+     case Right(v) => v
+     case Left(e)  => throw new RuntimeException(s"Failed to parse YAML config from $additionalConfPath: ${e.getMessage}", e)
    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 8cca9cc and 569bd8c.

📒 Files selected for processing (8)
  • build.sbt (2 hunks)
  • cloud_gcp/src/main/resources/additional-confs.yaml (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2 hunks)
  • spark/src/test/resources/test-driver-additional-confs.yaml (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • cloud_gcp/src/main/resources/additional-confs.yaml
  • spark/src/test/resources/test-driver-additional-confs.yaml
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala
  • build.sbt
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/Driver.scala (3)

72-73: LGTM!

Required imports for YAML parsing support.


92-94: LGTM!

Well-documented optional parameter for additional configurations.


152-152: LGTM!

Good encapsulation by making isLocal protected.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)

156-168: Enhance error handling and simplify YAML parsing.

Consider these improvements:

  1. Add try-catch for YAML parsing errors
  2. Use type-safe conversions
  3. Chain map operations
-      val additionalConfs = additionalConfPath.toOption
-        .map(Source.fromFile)
-        .map((src) =>
-          try { src.mkString }
-          finally { src.close })
-        .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
-        .map((map) => Extraction.decompose(map.asScala.toMap))
-        .map((v) => render(v))
-        .map(compact)
-        .map((str) => parse(str).extract[Map[String, String]])
+      val additionalConfs = additionalConfPath.toOption.map { path =>
+        try {
+          val src = Source.fromFile(path)
+          try {
+            val yaml = yamlLoader.load(src.mkString).asInstanceOf[java.util.Map[String, Any]]
+            parse(compact(render(Extraction.decompose(yaml.asScala.toMap)))).extract[Map[String, String]]
+          } catch {
+            case e: Exception =>
+              logger.error(s"Failed to parse YAML config from $path", e)
+              throw e
+          } finally {
+            src.close()
+          }
+        }
+      }
spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (1)

88-91: Extract test constants.

Move hardcoded values to constants for better maintainability.

+  private val ExpectedFormatProvider = "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
+  private val FormatProviderKey = "spark.chronon.table.format_provider.class"
+
   @Test
   def additionalConfsParsedCorrectly(): Unit = {
     ...
-    val confKey = "spark.chronon.table.format_provider.class"
-    assertEquals(confs.get(confKey), sparkSession.conf.getOption(confKey))
-    assertEquals(Some("ai.chronon.integrations.cloud_gcp.GcpFormatProvider"), sparkSession.conf.getOption(confKey))
+    assertEquals(confs.get(FormatProviderKey), sparkSession.conf.getOption(FormatProviderKey))
+    assertEquals(Some(ExpectedFormatProvider), sparkSession.conf.getOption(FormatProviderKey))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 569bd8c and e18a31f.

📒 Files selected for processing (7)
  • build.sbt (3 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (6 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • build.sbt
🔇 Additional comments (11)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala (4)

9-9: Good import
Helps to manage FS configuration.


15-15: Clean addition
No concerns.


16-16: Looks fine
Ensures property-type checks.


40-46: Nice coverage
Verifies critical GCP classes.

spark/src/main/scala/ai/chronon/spark/Driver.scala (3)

72-74: LGTM: Required imports for YAML support added.


93-94: LGTM: Well-documented optional configuration parameter.


171-178: Verify SparkSession configuration handling.

Ensure that SparkSession correctly applies the additional configurations.

✅ Verification successful

SparkSession configuration handling is properly implemented

The implementation correctly:

  • Merges additional configs with format-specific configs
  • Applies all configs to the SparkSession builder
  • Handles both local and non-local session configurations
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if SparkSessionBuilder handles configuration correctly
ast-grep --pattern 'SparkSessionBuilder.build($_, $_, $_, $_, $_)'

Length of output: 745


Script:

#!/bin/bash
# Find SparkSessionBuilder implementation
ast-grep --pattern 'object SparkSessionBuilder {
  $$$
}'

# Check for additional configuration handling
rg -A 10 "additionalConfig" 

Length of output: 21728

spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (2)

25-30: LGTM: Required imports added for YAML parsing.


39-41: LGTM: Simplified implementation using parent class.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)

31-31: LGTM: Clean transition to JobSubmitter


110-110: LGTM: Consistent with JobSubmitter transition

Comment on lines 52 to 62
test("Used to iterate locally. Do not enable this in CI/CD!") {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(List("gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/training_set.v1"),
"join",
"--end-date=2024-12-10",
"--conf-path=training_set.v1")
submitter.submit(
List("gs://zipline-jars/training_set.v1", "gs://zipline-jars/dataproc-submitter-conf.yaml"),
"join",
"--end-date=2024-12-10",
"--additional-conf-path=dataproc-submitter-conf.yaml",
"--conf-path=training_set.v1"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add @ignore annotation and parameterize test configuration

  1. Add @ignore to prevent CI runs
  2. Move GCS paths to test configuration
  3. Consider converting to proper integration test
+@Ignore("Local development only - Do not enable in CI/CD")
 test("Used to iterate locally. Do not enable this in CI/CD!") {
+  val config = TestConfig.load() // Add test configuration class
   val submitter = DataprocSubmitter()
   val submittedJobId =
     submitter.submit(
-      List("gs://zipline-jars/training_set.v1", "gs://zipline-jars/dataproc-submitter-conf.yaml"),
+      List(config.trainingJarPath, config.configPath),
       "join",
       "--end-date=2024-12-10",
-      "--additional-conf-path=dataproc-submitter-conf.yaml",
-      "--conf-path=training_set.v1"
+      s"--additional-conf-path=${config.configFileName}",
+      s"--conf-path=${config.trainingFileName}"
     )

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 91 to 105
private[cloud_gcp] def loadConfig: SubmitterConf = {
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
val yamlLoader = new Yaml()
implicit val formats: Formats = DefaultFormats
isO
.map(Source.fromInputStream)
.map((is) =>
try { is.mkString }
finally { is.close })
.map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
.map((jMap) => Extraction.decompose(jMap.asScala.toMap))
.map((jVal) => render(jVal))
.map(compact)
.map(parse(_).extract[SubmitterConf])
.getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml"))
Copy link

@coderabbitai coderabbitai bot Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling and resource management

  1. Use try-with-resources for Source
  2. Add specific error handling for YAML parsing
  3. Consider using safer type conversion
 private[cloud_gcp] def loadConfig: SubmitterConf = {
   val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
   val yamlLoader = new Yaml()
   implicit val formats: Formats = DefaultFormats
   isO
-    .map(Source.fromInputStream)
-    .map((is) =>
-      try { is.mkString }
-      finally { is.close })
+    .map { inputStream =>
+      try {
+        val source = Source.fromInputStream(inputStream)
+        try {
+          val content = source.mkString
+          val yaml = yamlLoader.load(content)
+          Option(yaml).map(_.asInstanceOf[java.util.Map[String, Any]])
+            .getOrElse(throw new IllegalArgumentException("Invalid YAML structure"))
+        } catch {
+          case e: Exception => throw new IllegalArgumentException(s"YAML parsing failed: ${e.getMessage}")
+        } finally {
+          source.close()
+          inputStream.close()
+        }
+      }
+    }
     .map((jMap) => Extraction.decompose(jMap.asScala.toMap))
     .map((jVal) => render(jVal))
     .map(compact)
     .map(parse(_).extract[SubmitterConf])
-    .getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml"))
+    .getOrElse(throw new IllegalArgumentException("Configuration file not found"))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private[cloud_gcp] def loadConfig: SubmitterConf = {
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
val yamlLoader = new Yaml()
implicit val formats: Formats = DefaultFormats
isO
.map(Source.fromInputStream)
.map((is) =>
try { is.mkString }
finally { is.close })
.map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
.map((jMap) => Extraction.decompose(jMap.asScala.toMap))
.map((jVal) => render(jVal))
.map(compact)
.map(parse(_).extract[SubmitterConf])
.getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml"))
private[cloud_gcp] def loadConfig: SubmitterConf = {
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
val yamlLoader = new Yaml()
implicit val formats: Formats = DefaultFormats
isO
.map { inputStream =>
try {
val source = Source.fromInputStream(inputStream)
try {
val content = source.mkString
val yaml = yamlLoader.load(content)
Option(yaml).map(_.asInstanceOf[java.util.Map[String, Any]])
.getOrElse(throw new IllegalArgumentException("Invalid YAML structure"))
} catch {
case e: Exception => throw new IllegalArgumentException(s"YAML parsing failed: ${e.getMessage}")
} finally {
source.close()
inputStream.close()
}
}
}
.map((jMap) => Extraction.decompose(jMap.asScala.toMap))
.map((jVal) => render(jVal))
.map(compact)
.map(parse(_).extract[SubmitterConf])
.getOrElse(throw new IllegalArgumentException("Configuration file not found"))
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will handle this in a followup.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tchow-zlai Understood. Would you like me to create a GitHub issue to track these improvements for the follow-up PR?

@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch from e18a31f to 2d9ac20 Compare January 2, 2025 22:48
@tchow-zlai tchow-zlai changed the base branch from main to tchow/bq-support-10 January 2, 2025 22:48
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/CatalogAwareDataPointer.scala (1)

27-41: Reflective instantiation
Consider handling exceptions from reflection or offering fallback to avoid runtime failures.

api/src/main/scala/ai/chronon/api/DataPointer.scala (1)

12-17: Naming detail
"URIDataPointer" is clear, but consider if "FileDataPointer" better conveys intent.

spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)

136-136: Check overhead of loadTable usage

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between e18a31f and 2d9ac20.

📒 Files selected for processing (19)
  • api/src/main/scala/ai/chronon/api/DataPointer.scala (2 hunks)
  • api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala (2 hunks)
  • build.sbt (3 hunks)
  • cloud_gcp/src/main/resources/additional-confs.yaml (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (6 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/CatalogAwareDataPointer.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Extensions.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/Format.scala (6 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (7 hunks)
  • spark/src/test/resources/test-driver-additional-confs.yaml (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • spark/src/test/resources/test-driver-additional-confs.yaml
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
  • cloud_gcp/src/main/resources/additional-confs.yaml
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
🔇 Additional comments (70)
spark/src/main/scala/ai/chronon/spark/CatalogAwareDataPointer.scala (1)

8-23: Straightforward data pointer implementation
Looks well-structured with lazy fields and a clear approach.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)

11-11: Case class additions
New constructor fields appear consistent with usage.


Line range hint 42-67: Partition metadata retrieval
Verify that multiple partition URIs work correctly; single-URI assumption may limit some cases.

api/src/main/scala/ai/chronon/api/DataPointer.scala (2)

4-10: Abstract class design
Good approach for extensibility.


40-59: Parser edge cases
Double-check format extraction with dot notation in paths.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala (3)

30-36: resolveTableName clarity
Implementation is concise and matches typical usage.


37-41: readFormat logic
Full reliance on BigQuery is okay, but ensure no fallback needed.


67-68: Return type
Cleanly distinguishes GCS vs BigQuery.

spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)

326-364: Writer flow
Handles multiple formats nicely.


366-399: Reader flow
Load logic is neatly compartmentalized by format.

spark/src/main/scala/ai/chronon/spark/TableUtils.scala (7)

27-27: No issues


141-141: Looks fine


224-224: Schema fetch approach is okay


487-487: Consistent usage of DataPointer


646-646: Consider concurrency in rename


735-736: Reading via DataPointer looks clean

Also applies to: 741-741


785-785: Simpler apply signature

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3)

1-1: Package restructure is fine


50-50: Local iteration test is okay


54-60: Ensure these args are not used in CI

api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala (12)

4-4: Good import


13-13: URIDataPointer usage is correct


19-22: BigQuery options look fine


27-27: No issues


32-32: Kafka pointer is correct


37-37: CSV pointer is correct


42-43: Key-value options parsing is good


48-49: Parquet pointer is correct


54-54: Dots pointer is okay


59-59: Prefixed format is okay


64-64: Glob partitions recognized


69-69: Fallback to no format is correct

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala (5)

1-14: New test class is neat


15-19: SparkSession build looks fine


20-38: Partitions test checks out


40-58: Empty partitions test is fine


60-88: Date partition handling is correct

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (6)

1-1: Package reorg is fine


7-7: Imports are okay

Also applies to: 13-13, 14-14


28-28: Service account flag is fine


37-43: Runtime class check test is good


52-61: BigQuery native table test is ignored safely


63-74: BigQuery external table test is ignored safely

spark/src/main/scala/ai/chronon/spark/Format.scala (9)

13-14: Good naming method.


50-50: No concerns.


65-65: Simple pass-through logic looks good.


142-142: No specific feedback.


143-143: Consistent naming for Hive.


177-177: No specific feedback.


178-178: Consistent naming for Iceberg.


228-228: No specific feedback.


229-229: Consistent naming for Delta.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (5)

2-3: Imports for new auth look fine.


6-8: Imports for JSON4S and YAML are correct.


31-31: Class extends new JobSubmitter interface well.


91-105: Properly closing streams and handling YAML.


110-110: Switch to JobAuth is straightforward.

build.sbt (5)

192-196: Additional Spark + JSON4S + YAML deps are in order.


218-218: Dataproc version bump is fine.


221-227: New GCP dependencies look correct.


415-418: Log4j MergeStrategy changes seem proper.


424-425: SLF4J implementation is consistent.

spark/src/main/scala/ai/chronon/spark/Driver.scala (8)

72-74: Imports for YAML & JSON4S are correct.


93-95: additionalConfPath is optional.


156-168: YAML-based config reading is concise.


169-171: Comment about serializers is clear.


172-178: Using SparkSessionBuilder with extra config is good.


183-183: LocalDataLoader usage is straightforward.


188-188: Recursive data loading is well handled.


190-190: Returning the session is correct.

Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor callout - you need to flip your dataproc test off, otherwise changes lgtm

@tchow-zlai tchow-zlai force-pushed the tchow/bq-support-10 branch 2 times, most recently from e6fb9e6 to 37e326a Compare January 3, 2025 06:00
@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch from 2d9ac20 to d6b1d3b Compare January 3, 2025 06:09
Copy link
Contributor

@david-zlai david-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of minor things that Piyush called out. let's put them in and then i can reapprove

@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch 2 times, most recently from e468bd2 to 44f5142 Compare January 4, 2025 00:55
@tchow-zlai tchow-zlai force-pushed the tchow/bq-support-10 branch from 37e326a to 9b39ab2 Compare January 4, 2025 00:55
@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch from 44f5142 to 6930112 Compare January 4, 2025 00:56
Base automatically changed from tchow/bq-support-10 to main January 4, 2025 05:09
@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch from 921c124 to b29605a Compare January 4, 2025 05:10
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

92-92: Rename unclear variable.

isO is cryptic. Consider yamlStreamOpt or configStreamOpt.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 2d9ac20 and b29605a.

📒 Files selected for processing (12)
  • build.sbt (2 hunks)
  • cloud_gcp/src/main/resources/additional-confs.yaml (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (6 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2 hunks)
  • spark/src/test/resources/test-driver-additional-confs.yaml (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
  • spark/src/test/resources/test-driver-additional-confs.yaml
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
  • build.sbt
🔇 Additional comments (8)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)

2-3: LGTM: Clean transition to generic job interfaces.

The change from SparkSubmitter to JobSubmitter improves modularity.

Also applies to: 31-31


91-105: Improve error handling and resource management.

Previous suggestion about using try-with-resources and better error handling still applies.


110-110: LGTM: Clean auth interface transition.


100-100: ⚠️ Potential issue

Add type safety to YAML parsing.

Unsafe type cast could fail at runtime. Consider using a safer parsing approach:

-      .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
+      .map { content =>
+        Option(yamlLoader.load(content)) match {
+          case Some(yaml: java.util.Map[String, Any] @unchecked) => yaml
+          case _ => throw new IllegalArgumentException("Invalid YAML structure")
+        }
+      }

Likely invalid or redundant comment.

cloud_gcp/src/main/resources/additional-confs.yaml (1)

1-2: LGTM! Configuration entries are well-structured.

The Spark configuration entries follow naming conventions and contain valid values.

spark/src/main/scala/ai/chronon/spark/Driver.scala (3)

72-74: LGTM! Required imports added.

Necessary imports for JSON and YAML processing.


156-167: Extract YAML parsing logic into a reusable function.

As suggested in the past review, this complex transformation chain should be extracted into a reusable function.


156-167: 🛠️ Refactor suggestion

Add error handling for YAML parsing.

The YAML parsing chain could fail at multiple points. Consider wrapping it in a Try and providing meaningful error messages.

Apply this diff:

      val additionalConfs = additionalConfPath.toOption
-       .map((src) =>
-         try { src.mkString }
-         finally { src.close })
-       .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
-       .map((map) => Extraction.decompose(map.asScala.toMap))
-       .map((v) => render(v))
-       .map(compact)
-       .map((str) => parse(str).extract[Map[String, String]])
+       .map(path => Try {
+         val src = Source.fromFile(path)
+         try {
+           val content = src.mkString
+           val yamlMap = yamlLoader.load(content).asInstanceOf[java.util.Map[String, Any]]
+           val jsonValue = Extraction.decompose(yamlMap.asScala.toMap)
+           parse(compact(render(jsonValue))).extract[Map[String, String]]
+         } catch {
+           case e: Exception =>
+             throw new RuntimeException(s"Failed to parse YAML file: $path", e)
+         } finally {
+           src.close()
+         }
+       }.getOrElse(Map.empty))

Likely invalid or redundant comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)

156-167: Simplify the YAML parsing chain.

The parsing chain with multiple transformations (YAML → Java Map → Scala Map → JSON → Map) is unnecessarily complex.

Consider this simpler approach:

-      implicit val formats: Formats = DefaultFormats
-      val yamlLoader = new Yaml()
-      val additionalConfs = additionalConfPath.toOption
-        .map(Source.fromFile)
-        .map((src) =>
-          try { src.mkString }
-          finally { src.close })
-        .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
-        .map((map) => Extraction.decompose(map.asScala.toMap))
-        .map((v) => render(v))
-        .map(compact)
-        .map((str) => parse(str).extract[Map[String, String]])
+      val additionalConfs = additionalConfPath.toOption.map { path =>
+        val yaml = new Yaml()
+        val source = Source.fromFile(path)
+        try {
+          yaml.load(source.mkString).asInstanceOf[java.util.Map[String, Any]]
+            .asScala.toMap.map { case (k, v) => k -> v.toString }
+        } finally {
+          source.close()
+        }
+      }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 65d666a and c68b1f3.

📒 Files selected for processing (5)
  • build.sbt (2 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
  • build.sbt
🔇 Additional comments (11)
spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (4)

21-22: LGTM: Required imports added for YAML parsing.

Also applies to: 27-31


40-42: LGTM: Clean implementation of TestArgs methods.


76-87: Complex YAML parsing chain needs refactoring.

This was previously identified and will be addressed in a follow-up task.


89-92: LGTM: Thorough test assertions.

Good coverage of both positive and negative cases.

spark/src/main/scala/ai/chronon/spark/Driver.scala (3)

52-53: LGTM!

Required imports for YAML and JSON processing.


93-95: LGTM!

Well-documented option for additional configurations.


171-178: LGTM!

Proper integration of additional configurations into SparkSession.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)

92-92: Rename variable for clarity.

Change inputStreamOption to configFileStream for better readability.


110-110: LGTM!

Clean transition from SparkAuth to JobAuth.


91-105: 🛠️ Refactor suggestion

Improve error handling and resource management.

Current implementation needs better resource cleanup and error handling.

 private[cloud_gcp] def loadConfig: SubmitterConf = {
   val configFileStream = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
   val yamlLoader = new Yaml()
   implicit val formats: Formats = DefaultFormats
   configFileStream
-    .map(Source.fromInputStream)
-    .map((is) =>
-      try { is.mkString }
-      finally { is.close })
+    .map { inputStream =>
+      try {
+        val source = Source.fromInputStream(inputStream)
+        try {
+          source.mkString
+        } finally {
+          source.close()
+        }
+      } finally {
+        inputStream.close()
+      }
+    }
     .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
     .map((jMap) => Extraction.decompose(jMap.asScala.toMap))
     .map((jVal) => render(jVal))
     .map(compact)
     .map(parse(_).extract[SubmitterConf])
-    .getOrElse(throw new IllegalArgumentException("Yaml conf not found or invalid yaml"))
+    .getOrElse(throw new IllegalArgumentException("Configuration file not found or invalid"))

Likely invalid or redundant comment.


2-3: Verify backward compatibility with SparkSubmitter.

The change from SparkSubmitter to JobSubmitter might affect existing implementations.

Also applies to: 31-31

✅ Verification successful

No backward compatibility concerns with JobSubmitter migration

The change is safe as:

  • No existing SparkSubmitter references found in codebase
  • JobSubmitter is a simple trait with basic job management methods
  • JobAuth maintains minimal interface with optional token method
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find usages of SparkSubmitter in the codebase
rg "SparkSubmitter" -l

Length of output: 22


Script:

#!/bin/bash
# Check for any references to SparkSubmitter or JobSubmitter in the codebase
rg "extends (SparkSubmitter|JobSubmitter)" -A 2

# Look for any class definitions containing Spark or Job
ast-grep --pattern 'class $_ extends $_Submitter { $$$  }'

Length of output: 517


Script:

#!/bin/bash
# Check historical references to SparkSubmitter
rg "SparkSubmitter" --type scala

# Look for any Auth related changes
rg "SparkAuth|JobAuth" --type scala

Length of output: 389


Script:

#!/bin/bash
# Check JobSubmitter implementation
cat spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala

Length of output: 290

@tchow-zlai tchow-zlai changed the title feat: support providing additional confs as yaml file feat: support providing additional confs as yaml file for Driver.scala Jan 6, 2025
@tchow-zlai tchow-zlai changed the base branch from main to tchow/fix-catalog-integration January 6, 2025 17:51
@tchow-zlai tchow-zlai force-pushed the tchow/spark-session-refactor branch from 1035250 to baa3ab8 Compare January 6, 2025 17:51
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

91-105: ⚠️ Potential issue

Fix resource management in YAML parsing.

The Source stream needs proper cleanup.

Apply this diff:

  private[cloud_gcp] def loadConfig: SubmitterConf = {
    val inputStreamOption = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
    val yamlLoader = new Yaml()
    implicit val formats: Formats = DefaultFormats
    inputStreamOption
-      .map(Source.fromInputStream)
-      .map((is) =>
-        try { is.mkString }
-        finally { is.close })
+      .map { inputStream =>
+        try {
+          val source = Source.fromInputStream(inputStream)
+          try {
+            source.mkString
+          } finally {
+            source.close()
+          }
+        } finally {
+          inputStream.close()
+        }
+      }
      .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
      .map((jMap) => Extraction.decompose(jMap.asScala.toMap))
      .map((jVal) => render(jVal))
      .map(compact)
      .map(parse(_).extract[SubmitterConf])
      .getOrElse(throw new IllegalArgumentException("Yaml conf not found or invalid yaml"))
  }
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)

156-167: ⚠️ Potential issue

Fix resource management in YAML parsing.

The Source stream needs proper cleanup.

Apply this diff:

      val additionalConfs = additionalConfPath.toOption
-        .map(Source.fromFile)
-        .map((src) =>
-          try { src.mkString }
-          finally { src.close })
+        .map { path =>
+          val source = Source.fromFile(path)
+          try {
+            source.mkString
+          } finally {
+            source.close()
+          }
+        }
        .map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)

156-167: Consider extracting YAML parsing logic.

The YAML parsing logic is duplicated between DataprocSubmitter and Driver. Consider extracting it into a shared utility class.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 1035250 and bbf6383.

📒 Files selected for processing (11)
  • build.sbt (2 hunks)
  • cloud_gcp/src/main/resources/additional-confs.yaml (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (6 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2 hunks)
  • spark/src/test/resources/test-driver-additional-confs.yaml (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • cloud_gcp/src/main/resources/additional-confs.yaml
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
  • spark/src/test/resources/test-driver-additional-confs.yaml
  • spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: other_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: join_spark_tests
  • GitHub Check: table_utils_delta_format_spark_tests
🔇 Additional comments (7)
build.sbt (2)

218-226: Version updates look good

Updated versions for cloud dependencies and added matching JSON/YAML libraries.


192-196: Verify dependency compatibility

New JSON and YAML dependencies added. Ensure version compatibility with Spark 3.X.X.

Run this script to check for potential conflicts:

✅ Verification successful

Dependencies are compatible with Spark 3.X.X

The json4s version is correctly pinned to match Spark's internal version, and all dependencies are properly scoped.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for dependency conflicts
rg -A 5 "json4s|snakeyaml|spark" | grep -E "version|libraryDependencies"

Length of output: 16105

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)

2-3: LGTM! Good refactoring.

The transition from SparkSubmitter/SparkAuth to JobSubmitter/JobAuth makes the code more generic and flexible.

Also applies to: 31-31, 110-110


92-92: Good variable naming!

The descriptive name inputStreamOption is much clearer than the previous isO.

spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala (1)

87-92: LGTM! Improved readability.

The formatting changes make the code more readable by breaking parameters into separate lines.

Also applies to: 117-118, 145-146

spark/src/main/scala/ai/chronon/spark/Driver.scala (2)

93-94: LGTM! Good addition of YAML config support.

The new additionalConfPath option aligns with the PR objective.


171-178: LGTM! Good integration of additional configs.

The SparkSession builder properly integrates the additional YAML configurations.

@tchow-zlai
Copy link
Collaborator Author

ptal @piyush-zlai | @david-zlai

@tchow-zlai tchow-zlai merged commit d649c02 into main Jan 7, 2025
10 checks passed
@tchow-zlai tchow-zlai deleted the tchow/spark-session-refactor branch January 7, 2025 16:29
@coderabbitai coderabbitai bot mentioned this pull request Feb 4, 2025
12 tasks
@coderabbitai coderabbitai bot mentioned this pull request Mar 27, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
#164)

## Summary

- We want to be able to configure the spark jobs with additional confs
when running them via `Driver.scala`. Let's thread through some conf
files.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Dependencies**
	- Updated Google Cloud Dataproc dependency to 4.52.0
	- Added JSON processing and YAML support dependencies

- **Configuration**
	- Added support for additional configuration files
- Introduced new configuration options for Spark table format and
partition format

- **Testing**
	- Enhanced test cases for configuration parsing
	- Added tests for Google Cloud runtime classes
	- Improved BigQuery catalog and Dataproc submitter tests

- **Code Refactoring**
	- Renamed `SparkSubmitter` to `JobSubmitter`
	- Renamed `SparkAuth` to `JobAuth`
	- Updated configuration loading mechanisms
	- Streamlined visibility and access of methods in various classes
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
#164)

## Summary

- We want to be able to configure the spark jobs with additional confs
when running them via `Driver.scala`. Let's thread through some conf
files.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Dependencies**
	- Updated Google Cloud Dataproc dependency to 4.52.0
	- Added JSON processing and YAML support dependencies

- **Configuration**
	- Added support for additional configuration files
- Introduced new configuration options for Spark table format and
partition format

- **Testing**
	- Enhanced test cases for configuration parsing
	- Added tests for Google Cloud runtime classes
	- Improved BigQuery catalog and Dataproc submitter tests

- **Code Refactoring**
	- Renamed `SparkSubmitter` to `JobSubmitter`
	- Renamed `SparkAuth` to `JobAuth`
	- Updated configuration loading mechanisms
	- Streamlined visibility and access of methods in various classes
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
#164)

## Summary

- We want to be able to configure the spark jobs with additional confs
when running them via `Driver.scala`. Let's thread through some conf
files.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Dependencies**
	- Updated Google Cloud Dataproc dependency to 4.52.0
	- Added JSON processing and YAML support dependencies

- **Configuration**
	- Added support for additional configuration files
- Introduced new configuration options for Spark table format and
partition format

- **Testing**
	- Enhanced test cases for configuration parsing
	- Added tests for Google Cloud runtime classes
	- Improved BigQuery catalog and Dataproc submitter tests

- **Code Refactoring**
	- Renamed `SparkSubmitter` to `JobSubmitter`
	- Renamed `SparkAuth` to `JobAuth`
	- Updated configuration loading mechanisms
	- Streamlined visibility and access of methods in various classes
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
#164)

## Summary

- We want to be able to configure the spark jobs with additional confs
when running them via `Driver.scala`. Let's thread through some conf
files.

## Cheour clientslist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Dependencies**
	- Updated Google Cloud Dataproc dependency to 4.52.0
	- Added JSON processing and YAML support dependencies

- **Configuration**
	- Added support for additional configuration files
- Introduced new configuration options for Spark table format and
partition format

- **Testing**
	- Enhanced test cases for configuration parsing
	- Added tests for Google Cloud runtime classes
	- Improved BigQuery catalog and Dataproc submitter tests

- **Code Refactoring**
	- Renamed `SparkSubmitter` to `JobSubmitter`
	- Renamed `SparkAuth` to `JobAuth`
	- Updated configuration loading mechanisms
	- Streamlined visibility and access of methods in various classes
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to traour clients
the status of staour clientss when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants