-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Use BigQuery exports as part of table loading in spark #738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe code updates BigQuery integration logic by introducing a method to generate Changes
Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
20-31
: Add validation for SQL export parameters.Template correctly formats BigQuery EXPORT DATA statement but lacks input validation.
- private def exportDataTemplate(uri: String, format: String, sql: String) = + private def exportDataTemplate(uri: String, format: String, sql: String) = { + require(uri != null && uri.startsWith("gs://"), "URI must be a valid GCS path") + require(format != null && Seq("PARQUET", "CSV", "JSON", "AVRO").contains(format.toUpperCase), "Format must be valid") + require(sql != null && !sql.isEmpty, "SQL query cannot be empty") f""" |EXPORT DATA | OPTIONS ( | uri = '${uri}', | format = '${format}', | overwrite = true | ) |AS ( | ${sql} |); |""".stripMargin + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: online_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
4-4
: Imports added for BigQuery job API.Added necessary imports for new export functionality.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
42-67
: Add DataFrame return incase None
branchThe implementation now properly returns a DataFrame after exporting to GCS.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: api_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: aggregator_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: online_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)
4-4
: Import changes look goodClean import optimization.
Also applies to: 6-7, 8-8
20-31
: New exportDataTemplate method is well-structuredCleanly formats BigQuery EXPORT DATA statements.
51-55
: Robust export job execution with TryGood error handling approach.
57-67
: Export job result handling looks goodProper DataFrame return and error handling.
89-89
: Explicitly referencing scala.Option is clearerGood practice for avoiding ambiguity.
Also applies to: 121-125
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
46-91
: 🛠️ Refactor suggestionAdd unique identifier to export path.
Export path needs uniqueness for concurrent job safety.
- val destPath = s"${warehouseLocation}/export/${tableName.sanitize}/*.${formatStr}" + val uniqueId = java.util.UUID.randomUUID().toString + val destPath = s"${warehouseLocation}/export/${tableName.sanitize}_${uniqueId}/*.${formatStr}"
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)
74-91
: Consider cleanup of exported data.Exported files remain in GCS after reading.
exportJobTry.flatMap { job => scala.Option(job.getStatus.getError) match { case Some(err) => Failure(new RuntimeException(s"BigQuery export job failed: $err")) case None => Success(job) } } match { case Success(_) => val internalLoad = sparkSession.read.format(formatStr).load(destPath) - pColOption + val result = pColOption .map { case (nativeColumn) => internalLoad .withColumnRenamed(internalBQPartitionCol, nativeColumn.colName) } .getOrElse(internalLoad) + // Consider cleanup of exported files here if needed + result case Failure(e) => throw e }
68-72
: Add better error context.Enhance exception message with more details.
val exportJobTry: Try[Job] = Try { val exportConf = QueryJobConfiguration.of(exportSQL) val job = bigQueryClient.create(JobInfo.of(exportConf)) - scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null")) + scala.Option(job.waitFor()).getOrElse(throw new RuntimeException(s"Export job for table ${tableName} returned null")) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3)
spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (3)
Format
(88-111)table
(30-45)getCatalog
(100-109)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (3)
TableUtils
(42-574)TableUtils
(576-578)sql
(272-300)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/SparkBQUtils.scala (2)
SparkBQUtils
(7-24)toTableId
(9-16)
⏰ Context from checks skipped due to timeout of 90000ms (30)
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: analyzer_tests
- GitHub Check: api_tests
- GitHub Check: analyzer_tests
- GitHub Check: service_tests
- GitHub Check: aggregator_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: online_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: batch_tests
- GitHub Check: batch_tests
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)
3-12
: Updated imports to support new BigQuery export functionality.Added necessary imports for Scala Try/Success/Failure pattern and Extensions.
14-14
: Good addition of NativePartColumn case class.Captures partition column metadata with system-defined flag.
24-35
: Clean implementation of exportDataTemplate.Well-structured SQL generation with proper string interpolation.
94-127
: Improved partition column handling.Added system-defined flag detection with proper error handling.
56-60
:❓ Verification inconclusive
Verify warehouse location is properly configured.
Current implementation assumes warehouse location is correctly set.
🏁 Script executed:
#!/bin/bash # Verify how warehouse location is configured in the codebase rg "spark\.sql\.catalog\..*\.properties\..*\.location" --type scalaLength of output: 69
🏁 Script executed:
#!/bin/bash # find GCP location property key rg "PROPERTIES_KEY_GCP_LOCATION" -n --type scalaLength of output: 245
🏁 Script executed:
#!/bin/bash # Locate BigQueryMetastoreCatalog definition and GCP location key rg "object BigQueryMetastoreCatalog" -n --type scala rg "PROPERTIES_KEY_GCP_LOCATION" -n -C3 --type scalaLength of output: 1069
Verify warehouse location is properly set and non-empty
The code assumesspark.sql.catalog.<catalogName>.<PROPERTIES_KEY_GCP_LOCATION>is always defined. Please confirm:
BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_LOCATION
is declared and points to a valid GCS path.warehouseLocation
returned bygetConfString
isn’t empty—add a default or fail-fast if it is.Locations to check:
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (lines 57–60)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3)
60-60
: Sanitized table name needs documentationThe
.sanitize
method's behavior is unclear.- val destPath = s"${warehouseLocation}/export/${tableName.sanitize}/*.${formatStr}" + // Sanitize table name to ensure valid GCS path + val destPath = s"${warehouseLocation}/export/${tableName.sanitize}/*.${formatStr}"
68-92
: Consider cleanup of temporary exported filesThe current implementation leaves exported files in GCS.
Consider adding a cleanup step or implementing a TTL policy for exports.
117-122
: Enhanced error message for unknown partition valuesCurrent message lacks context for debugging.
- throw new IllegalArgumentException(s"Unknown partition column system definition: ${isSystemDefined}") + throw new IllegalArgumentException(s"Unknown partition column system definition for column ${name}: ${isSystemDefined}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: online_tests
- GitHub Check: join_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: flink_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: service_tests
- GitHub Check: fetcher_tests
- GitHub Check: flink_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: api_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)
3-11
: Import changes align with new implementation approachImports now include Scala Try/Success/Failure for better error handling and BigQueryMetastoreCatalog for warehouse location configuration.
14-14
: Good addition of type-safe NativePartColumn case classClear representation of partition columns with system-defined status.
24-35
: Well-structured exportDataTemplate utilityClean SQL template generation with proper parameter interpolation.
46-92
:❓ Verification inconclusive
Verify error handling for BigQuery export job
The export functionality is entirely new and might have edge cases.
🏁 Script executed:
#!/bin/bash # Search for similar BigQuery export job error handling patterns rg -A 5 "bigQueryClient.create\(JobInfo.of\(" --type scalaLength of output: 794
Verify export job error handling
Only found in
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(lines 46–92). You wrapcreate
&waitFor
in aTry
and checkstatus.getError
, but please confirm:
job.waitFor()
won’t block forever—use a timeout overload.- Transient BigQuery errors are retried or surfaced clearly.
- Partial export files are cleaned up on failure.
90-91
: 🛠️ Refactor suggestionError handling improvements needed
Direct exception throwing loses stack trace context.
- case Failure(e) => - throw e + case Failure(e) => + logger.error(s"BigQuery export failed for table ${bqFriendlyName}", e) + throw new RuntimeException(s"Failed to export BigQuery table: ${bqFriendlyName}", e)⛔ Skipped due to learnings
Learnt from: tchow-zlai PR: zipline-ai/chronon#263 File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57 Timestamp: 2025-01-24T23:55:40.650Z Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
sql = select | ||
) | ||
|
||
logger.info(s"Export job: ${exportSQL}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential security concern in logging
The full SQL in logs may expose sensitive data.
- logger.info(s"Export job: ${exportSQL}")
+ logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
+ logger.debug(s"Export job SQL: ${exportSQL}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
logger.info(s"Export job: ${exportSQL}") | |
- logger.info(s"Export job: ${exportSQL}") | |
+ logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}") | |
+ logger.debug(s"Export job SQL: ${exportSQL}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
67-67
: Potential security concern in loggingThe full SQL in logs may expose sensitive data.
- logger.info(s"Export job: ${exportSQL}") + logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}") + logger.debug(s"Export job SQL: ${exportSQL}")
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)
46-92
: Missing cleanup of exported filesExported files remain in GCS after loading.
case Success(_) => val internalLoad = sparkSession.read.format(formatStr).load(destPath) + // TODO: Implement cleanup of exported files after successful load pColOption .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it. internalLoad .withColumnRenamed(internalBQPartitionCol, nativeColumn.colName) } .getOrElse(internalLoad)
68-72
: Consider adding timeout to export jobLong-running jobs could block indefinitely.
val exportJobTry: Try[Job] = Try { val exportConf = QueryJobConfiguration.of(exportSQL) + // Set timeout configuration if available + val timeoutMs = sparkSession.sessionState.conf.getConfString("spark.sql.execution.bigquery.export.timeoutMs", "3600000").toLong val job = bigQueryClient.create(JobInfo.of(exportConf)) - scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null")) + scala.Option(job.waitFor(RetryOption.totalTimeout(org.threeten.bp.Duration.ofMillis(timeoutMs)))) + .getOrElse(throw new RuntimeException("Export job returned null or timed out")) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (23)
- GitHub Check: service_commons_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: online_tests
- GitHub Check: streaming_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: groupby_tests
- GitHub Check: aggregator_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3)
14-14
: Good addition of NativePartColumn classClean representation of partition columns with system flag.
24-35
: LGTM for exportDataTemplate implementationClear template function for generating SQL export statements.
94-128
: Good implementation of getPartitionColumnWell-structured method with proper error handling.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
51-52
: Make format string configurable.Hardcoded "parquet" format could be made configurable.
- val formatStr = "parquet" + val formatStr = sparkSession.sessionState.conf + .getConfString(s"spark.sql.catalog.${catalogName}.export.format", "parquet")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (29)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: online_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (7)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (7)
25-36
: Clean implementation for export SQL generation.Simple and clear template function.
52-56
: Good handling of system-defined partition columns.Clever approach to alias system partition columns during export.
65-66
: Improved logging security.No longer logging full SQL with potential sensitive data.
66-70
: Proper error handling for export job.Using Try for robust error handling.
79-86
: Clean dataframe handling with partition column renaming.Appropriately handles the renamed system-defined columns.
99-100
: Good implementation of unique paths for exports.Using UUID prevents concurrent export conflicts.
104-138
:❌ Incorrect review comment
Consider handling multiple partition columns.
Method only returns first partition column found.
🏁 Script executed:
#!/bin/bash # Check if BigQuery supports multiple partition columns in a single table rg -A 3 "multiple.*partition.*column" --type scala --type python cloud_gcp/Length of output: 108
Ignore multiple-partition suggestion
BigQuery tables support only a single partitioning column, so returning the first (and only) partition column viaheadOption
is correct.Likely an incorrect or invalid review comment.
private[cloud_gcp] def destPrefix(catalogName: String, | ||
tableName: String, | ||
formatStr: String, | ||
uniqId: Option[String] = None)(implicit sparkSession: SparkSession) = { | ||
val warehouseLocation = sparkSession.sessionState.conf | ||
.getConfString(s"spark.sql.catalog.${catalogName}.${BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_LOCATION}") | ||
.stripSuffix("/") | ||
val uniqueId = uniqId.getOrElse(UUID.randomUUID().toString) | ||
s"${warehouseLocation}/export/${tableName.sanitize}_${uniqueId}/*.${formatStr}" | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding cleanup of exported files.
Current implementation leaves exported files in GCS which may accumulate over time.
exportJobTry.flatMap { job =>
scala.Option(job.getStatus.getError) match {
case Some(err) => Failure(new RuntimeException(s"BigQuery export job failed: $err"))
case None => Success(job)
}
} match {
case Success(_) =>
val internalLoad = sparkSession.read.format(formatStr).load(destPath)
+ // TODO: Add cleanup of exported files after successful load
pColOption
.map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
91-101
: 🛠️ Refactor suggestionConsider file cleanup strategy
Export files will accumulate in GCS over time.
Add cleanup mechanism after successful DataFrame creation:
exportJobTry.flatMap { job => scala.Option(job.getStatus.getError) match { case Some(err) => Failure(new RuntimeException(s"BigQuery export job failed: $err")) case None => Success(job) } } match { case Success(_) => val internalLoad = sparkSession.read.format(formatStr).load(destPath) + // TODO: Add logic to clean up export files after successful read pColOption .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
64-65
: Consider adding job ID to log messageWould help with troubleshooting.
-logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}") +logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName} (export path: ${destPath})")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (29)
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)
14-14
: Good addition ofNativePartColumn
case classClean representation of partition column metadata.
24-36
: SQL template looks goodWell-structured template for BigQuery EXPORT DATA statements.
47-89
: Export implementation is robustAppropriately handles system-defined partition columns and error conditions. Job status is properly checked.
98-99
: Deterministic export path managementGood pattern using optional uniqueId for testability.
103-137
: Partition column detection looks goodCorrectly handles system-defined vs regular partition columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
46-89
: 🛠️ Refactor suggestionAdd cleanup for exported files.
Implementation successfully exports and loads data but doesn't clean up temporary files, which may accumulate in GCS.
Add a cleanup step after successful DataFrame loading:
case Success(_) => val internalLoad = sparkSession.read.format(formatStr).load(destPath) + // Clean up exported files after successful load + try { + val fs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(destPath.split("\\*")(0)), sparkSession.sparkContext.hadoopConfiguration) + fs.delete(new org.apache.hadoop.fs.Path(destPath.split("\\*")(0)), true) + } catch { + case e: Exception => logger.warn(s"Failed to clean up exported files at $destPath: ${e.getMessage}") + } pColOption .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)
64-65
: Good logging but consider more details.The log message could provide more context about the operation.
-logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}") +logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName} with ${formatStr} format to ${destPath}")
65-69
: Add export job timeout.The current implementation waits indefinitely for the job to complete.
val exportJobTry: Try[Job] = Try { val exportConf = QueryJobConfiguration.of(exportSQL) + // Set a timeout for the export job + exportConf.toBuilder().setJobTimeoutMs(1800000L).build() // 30 minute timeout val job = bigQueryClient.create(JobInfo.of(exportConf)) scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null")) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: service_commons_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: api_tests
- GitHub Check: streaming_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: flink_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: online_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (6)
3-12
: Import organization looks good.Clean update to imports to support Try, UUID, and other additions.
14-14
: Well-structured case class for partition column.The case class clearly separates system-defined from user-defined partition columns.
24-35
: Clean SQL template implementation.The template properly formats the EXPORT DATA statement.
91-101
: Well-designed destination path helper.Method elegantly handles path creation and supports test determinism via optional UUID parameter.
103-137
: Robust partition column detection.Properly handles system-defined partition columns, with clear error messages for unknown types.
56-57
:✅ Verification successful
Verify usage of Format.getCatalog.
Confirm that Format.getCatalog is properly implemented to handle all catalog scenarios.
🏁 Script executed:
#!/bin/bash # Check implementation of Format.getCatalog rg -A 10 "def getCatalog" --type scalaLength of output: 1295
🏁 Script executed:
#!/bin/bash # Locate the BigQueryNative.scala file and show its imports and implicit context file=$(fd BigQueryNative.scala) echo "Inspecting $file" sed -n '1,60p' "$file"Length of output: 2910
Format.getCatalog usage validated
- Supports 3-part, 2-part, and 1-part identifiers; defaults to current catalog for 2- and 1-part names.
- Implicit SparkSession is in scope in BigQueryNative.table.
- Throws on invalid patterns.
No changes required.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
49-49
: Potential SQL injection vulnerabilityDirect concatenation of partition filters into SQL query.
-val partitionWheres = if (partitionFilters.nonEmpty) s"WHERE ${partitionFilters}" else partitionFilters +val partitionWheres = if (partitionFilters.nonEmpty) { + // Basic validation to prevent SQL injection + if (partitionFilters.contains(";") || partitionFilters.toLowerCase.contains("union")) { + throw new IllegalArgumentException(s"Invalid partition filter: $partitionFilters") + } + s"WHERE ${partitionFilters}" +} else partitionFilters
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
71-88
: Consider adding file cleanup after exportExported files remain in GCS indefinitely.
case Success(_) => val internalLoad = sparkSession.read.format(formatStr).load(destPath) + // TODO: Add cleanup logic for exported files pColOption .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
⏰ Context from checks skipped due to timeout of 90000ms (30)
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: streaming_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: batch_tests
- GitHub Check: batch_tests
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)
14-14
: Good addition of NativePartColumn case classCleanly separates system-defined partition columns.
24-36
: LGTM: SQL template is well structuredClean template function for BigQuery EXPORT DATA statement.
51-55
: Good handling of system-defined partition columnsCorrectly aliases system-defined columns to avoid missing data.
91-101
: Good design for deterministic testingOptional uniqueId parameter enables deterministic testing.
103-137
: Well-structured partition column detectionProperly handles system-defined vs. regular partition columns.
logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}") | ||
val exportJobTry: Try[Job] = Try { | ||
val exportConf = QueryJobConfiguration.of(exportSQL) | ||
val job = bigQueryClient.create(JobInfo.of(exportConf)) | ||
scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add timeout to waitFor()
Prevent job from hanging indefinitely.
-scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
+scala.Option(job.waitFor(RetryOption.totalTimeout(org.threeten.bp.Duration.ofMinutes(30))))
+ .getOrElse(throw new RuntimeException("Export job returned null"))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}") | |
val exportJobTry: Try[Job] = Try { | |
val exportConf = QueryJobConfiguration.of(exportSQL) | |
val job = bigQueryClient.create(JobInfo.of(exportConf)) | |
scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null")) | |
} | |
logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}") | |
val exportJobTry: Try[Job] = Try { | |
val exportConf = QueryJobConfiguration.of(exportSQL) | |
val job = bigQueryClient.create(JobInfo.of(exportConf)) | |
- scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null")) | |
+ scala.Option(job.waitFor(RetryOption.totalTimeout(org.threeten.bp.Duration.ofMinutes(30)))) | |
+ .getOrElse(throw new RuntimeException("Export job returned null")) | |
} |
## Summary - Bigquery connector doesn't support reading large datasets well. Instead, we'll leverage [BigQuery Exports](https://cloud.google.com/bigquery/docs/exporting-data#sql) which is an official approach to getting data out of bigquery. As part of table loading, we'll first export the data to a GCS warehouse location, which should be a quick operation. Upon testing this on production data, it seems very quick (<10 seconds to extract 100GB). - There are some nuances in handling partition columns, particularly system defined pseudocolumns. since they don't show up in the projection if you do a `SELECT * ...`, we'll need to [alias them](https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table). The logic is as follows: 1. Given a table, we check the information schema to see if it is partitioned 2. If partitioned, check if it's a system defined partition column. 3. (a) If it's a system defined partition column, we'll alias that column to an internal chronon reserved name. If it's not, we'll simply just do a `SELECT * ` with no alias. (b) If not partitioned (eg in the case of a view), we'll just do a simple `SELECT * ` and apply the "partition" filters requested by the reader. 4. After the data gets exported with the possible alias in (3a), we'll read it back as a spark dataframe and rename the aliased column to the system defined partition column name. The rename is a noop if the internal column alias is not present (in the absence of a system defined partition column). We'll use the reserved catalog conf: `spark.sql.catalog.<catalog_name>.warehouse` as the root location to do exports, which is configured per project. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added support for exporting BigQuery table data as Parquet files to Google Cloud Storage, improving data loading into Spark. - **Refactor** - Replaced partition-based BigQuery reads with export-to-GCS approach for enhanced performance and reliability. - Centralized catalog retrieval logic for table formats, removing deprecated methods and improving consistency. - Updated test cases to align with new catalog retrieval method. - Cleaned up import statements for better code organization. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: thomaschow <[email protected]>
## Summary - Bigquery connector doesn't support reading large datasets well. Instead, we'll leverage [BigQuery Exports](https://cloud.google.com/bigquery/docs/exporting-data#sql) which is an official approach to getting data out of bigquery. As part of table loading, we'll first export the data to a GCS warehouse location, which should be a quick operation. Upon testing this on production data, it seems very quick (<10 seconds to extract 100GB). - There are some nuances in handling partition columns, particularly system defined pseudocolumns. since they don't show up in the projection if you do a `SELECT * ...`, we'll need to [alias them](https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table). The logic is as follows: 1. Given a table, we check the information schema to see if it is partitioned 2. If partitioned, check if it's a system defined partition column. 3. (a) If it's a system defined partition column, we'll alias that column to an internal chronon reserved name. If it's not, we'll simply just do a `SELECT * ` with no alias. (b) If not partitioned (eg in the case of a view), we'll just do a simple `SELECT * ` and apply the "partition" filters requested by the reader. 4. After the data gets exported with the possible alias in (3a), we'll read it back as a spark dataframe and rename the aliased column to the system defined partition column name. The rename is a noop if the internal column alias is not present (in the absence of a system defined partition column). We'll use the reserved catalog conf: `spark.sql.catalog.<catalog_name>.warehouse` as the root location to do exports, which is configured per project. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added support for exporting BigQuery table data as Parquet files to Google Cloud Storage, improving data loading into Spark. - **Refactor** - Replaced partition-based BigQuery reads with export-to-GCS approach for enhanced performance and reliability. - Centralized catalog retrieval logic for table formats, removing deprecated methods and improving consistency. - Updated test cases to align with new catalog retrieval method. - Cleaned up import statements for better code organization. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: thomaschow <[email protected]>
## Summary - Bigquery connector doesn't support reading large datasets well. Instead, we'll leverage [BigQuery Exports](https://cloud.google.com/bigquery/docs/exporting-data#sql) which is an official approach to getting data out of bigquery. As part of table loading, we'll first export the data to a GCS warehouse location, which should be a quiour clients operation. Upon testing this on production data, it seems very quiour clients (<10 seconds to extract 100GB). - There are some nuances in handling partition columns, particularly system defined pseudocolumns. since they don't show up in the projection if you do a `SELECT * ...`, we'll need to [alias them](https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table). The logic is as follows: 1. Given a table, we cheour clients the information schema to see if it is partitioned 2. If partitioned, cheour clients if it's a system defined partition column. 3. (a) If it's a system defined partition column, we'll alias that column to an internal chronon reserved name. If it's not, we'll simply just do a `SELECT * ` with no alias. (b) If not partitioned (eg in the case of a view), we'll just do a simple `SELECT * ` and apply the "partition" filters requested by the reader. 4. After the data gets exported with the possible alias in (3a), we'll read it baour clients as a spark dataframe and rename the aliased column to the system defined partition column name. The rename is a noop if the internal column alias is not present (in the absence of a system defined partition column). We'll use the reserved catalog conf: `spark.sql.catalog.<catalog_name>.warehouse` as the root location to do exports, which is configured per project. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added support for exporting BigQuery table data as Parquet files to Google Cloud Storage, improving data loading into Spark. - **Refactor** - Replaced partition-based BigQuery reads with export-to-GCS approach for enhanced performance and reliability. - Centralized catalog retrieval logic for table formats, removing deprecated methods and improving consistency. - Updated test cases to align with new catalog retrieval method. - Cleaned up import statements for better code organization. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: thomaschow <[email protected]>
Summary
SELECT * ...
, we'll need to alias them. The logic is as follows:(a) If it's a system defined partition column, we'll alias that column to an internal chronon reserved name. If it's not, we'll simply just do a
SELECT *
with no alias.(b) If not partitioned (eg in the case of a view), we'll just do a simple
SELECT *
and apply the "partition" filters requested by the reader.4. After the data gets exported with the possible alias in (3a), we'll read it back as a spark dataframe and rename the aliased column to the system defined partition column name. The rename is a noop if the internal column alias is not present (in the absence of a system defined partition column).
We'll use the reserved catalog conf:
spark.sql.catalog.<catalog_name>.warehouse
as the root location to do exports, which is configured per project.Checklist
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Refactor