Skip to content

feat: Use BigQuery exports as part of table loading in spark #738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 6, 2025

Conversation

tchow-zlai
Copy link
Collaborator

@tchow-zlai tchow-zlai commented May 5, 2025

Summary

  • Bigquery connector doesn't support reading large datasets well. Instead, we'll leverage BigQuery Exports which is an official approach to getting data out of bigquery. As part of table loading, we'll first export the data to a GCS warehouse location, which should be a quick operation. Upon testing this on production data, it seems very quick (<10 seconds to extract 100GB).
  • There are some nuances in handling partition columns, particularly system defined pseudocolumns. since they don't show up in the projection if you do a SELECT * ..., we'll need to alias them. The logic is as follows:
  1. Given a table, we check the information schema to see if it is partitioned
  2. If partitioned, check if it's a system defined partition column.

(a) If it's a system defined partition column, we'll alias that column to an internal chronon reserved name. If it's not, we'll simply just do a SELECT * with no alias.
(b) If not partitioned (eg in the case of a view), we'll just do a simple SELECT * and apply the "partition" filters requested by the reader.
4. After the data gets exported with the possible alias in (3a), we'll read it back as a spark dataframe and rename the aliased column to the system defined partition column name. The rename is a noop if the internal column alias is not present (in the absence of a system defined partition column).

We'll use the reserved catalog conf: spark.sql.catalog.<catalog_name>.warehouse as the root location to do exports, which is configured per project.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Added support for exporting BigQuery table data as Parquet files to Google Cloud Storage, improving data loading into Spark.
  • Refactor

    • Replaced partition-based BigQuery reads with export-to-GCS approach for enhanced performance and reliability.
    • Centralized catalog retrieval logic for table formats, removing deprecated methods and improving consistency.
    • Updated test cases to align with new catalog retrieval method.
    • Cleaned up import statements for better code organization.

Copy link

coderabbitai bot commented May 5, 2025

Walkthrough

The code updates BigQuery integration logic by introducing a method to generate EXPORT DATA SQL statements and changing the table reading approach to export query results as Parquet files to Google Cloud Storage using the BigQuery client, replacing the previous Spark-based read. It also centralizes catalog retrieval by removing a local method and adding a new method in the Format object.

Changes

File(s) Change Summary
cloud_gcp/.../BigQueryNative.scala Added NativePartColumn case class; added exportDataTemplate, getPartitionColumn, destPrefix; refactored table to export filtered data as Parquet to GCS via BigQuery job and read with Spark; removed prior partition-based Spark BigQuery reads; added error handling.
cloud_gcp/.../GcpFormatProvider.scala Changed readFormat to call Format.getCatalog instead of local getCatalog.
spark/.../DefaultFormatProvider.scala Removed getCatalog method that parsed table name for catalog extraction.
spark/.../Format.scala Added getCatalog method to parse table name and determine catalog using Spark SQL parser and session.
spark/.../TableUtilsTest.scala Updated test to use Format.getCatalog instead of FormatProvider.getCatalog.
spark/.../TableUtils.scala Reorganized imports without logic changes.

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • varant-zlai
  • piyush-zlai

Poem

To BigQuery we now export with flair,
Parquet files flying through GCP air.
No more Spark reads, just jobs that wait,
As data lands at its cloud-based fate.
A method new, a pathway bright—
Code evolves in the softest light!
🚀


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

20-31: Add validation for SQL export parameters.

Template correctly formats BigQuery EXPORT DATA statement but lacks input validation.

-  private def exportDataTemplate(uri: String, format: String, sql: String) =
+  private def exportDataTemplate(uri: String, format: String, sql: String) = {
+    require(uri != null && uri.startsWith("gs://"), "URI must be a valid GCS path")
+    require(format != null && Seq("PARQUET", "CSV", "JSON", "AVRO").contains(format.toUpperCase), "Format must be valid")
+    require(sql != null && !sql.isEmpty, "SQL query cannot be empty")
     f"""
       |EXPORT DATA
       |  OPTIONS (
       |    uri = '${uri}',
       |    format = '${format}',
       |    overwrite = true
       |    )
       |AS (
       |   ${sql}
       |);
       |""".stripMargin
+  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 742958d and c4b4acd.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: online_tests
  • GitHub Check: api_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

4-4: Imports added for BigQuery job API.

Added necessary imports for new export functionality.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

42-67: Add DataFrame return in case None branch

The implementation now properly returns a DataFrame after exporting to GCS.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between c4b4acd and 4ffc17f.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: api_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: online_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)

4-4: Import changes look good

Clean import optimization.

Also applies to: 6-7, 8-8


20-31: New exportDataTemplate method is well-structured

Cleanly formats BigQuery EXPORT DATA statements.


51-55: Robust export job execution with Try

Good error handling approach.


57-67: Export job result handling looks good

Proper DataFrame return and error handling.


89-89: Explicitly referencing scala.Option is clearer

Good practice for avoiding ambiguity.

Also applies to: 121-125

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

46-91: 🛠️ Refactor suggestion

Add unique identifier to export path.

Export path needs uniqueness for concurrent job safety.

-    val destPath = s"${warehouseLocation}/export/${tableName.sanitize}/*.${formatStr}"
+    val uniqueId = java.util.UUID.randomUUID().toString
+    val destPath = s"${warehouseLocation}/export/${tableName.sanitize}_${uniqueId}/*.${formatStr}"
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)

74-91: Consider cleanup of exported data.

Exported files remain in GCS after reading.

 exportJobTry.flatMap { job =>
   scala.Option(job.getStatus.getError) match {
     case Some(err) => Failure(new RuntimeException(s"BigQuery export job failed: $err"))
     case None      => Success(job)
   }
 } match {
   case Success(_) =>
     val internalLoad = sparkSession.read.format(formatStr).load(destPath)
-    pColOption
+    val result = pColOption
       .map { case (nativeColumn) =>
         internalLoad
           .withColumnRenamed(internalBQPartitionCol, nativeColumn.colName)
       }
       .getOrElse(internalLoad)
+    // Consider cleanup of exported files here if needed
+    result

   case Failure(e) =>
     throw e
 }

68-72: Add better error context.

Enhance exception message with more details.

 val exportJobTry: Try[Job] = Try {
   val exportConf = QueryJobConfiguration.of(exportSQL)
   val job = bigQueryClient.create(JobInfo.of(exportConf))
-  scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
+  scala.Option(job.waitFor()).getOrElse(throw new RuntimeException(s"Export job for table ${tableName} returned null"))
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between b355d8d and 61fa4b9.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3)
spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (3)
  • Format (88-111)
  • table (30-45)
  • getCatalog (100-109)
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (3)
  • TableUtils (42-574)
  • TableUtils (576-578)
  • sql (272-300)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/SparkBQUtils.scala (2)
  • SparkBQUtils (7-24)
  • toTableId (9-16)
⏰ Context from checks skipped due to timeout of 90000ms (30)
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: api_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: service_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: online_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: batch_tests
  • GitHub Check: batch_tests
  • GitHub Check: api_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)

3-12: Updated imports to support new BigQuery export functionality.

Added necessary imports for Scala Try/Success/Failure pattern and Extensions.


14-14: Good addition of NativePartColumn case class.

Captures partition column metadata with system-defined flag.


24-35: Clean implementation of exportDataTemplate.

Well-structured SQL generation with proper string interpolation.


94-127: Improved partition column handling.

Added system-defined flag detection with proper error handling.


56-60:

❓ Verification inconclusive

Verify warehouse location is properly configured.

Current implementation assumes warehouse location is correctly set.


🏁 Script executed:

#!/bin/bash
# Verify how warehouse location is configured in the codebase
rg "spark\.sql\.catalog\..*\.properties\..*\.location" --type scala

Length of output: 69


🏁 Script executed:

#!/bin/bash
# find GCP location property key
rg "PROPERTIES_KEY_GCP_LOCATION" -n --type scala

Length of output: 245


🏁 Script executed:

#!/bin/bash
# Locate BigQueryMetastoreCatalog definition and GCP location key
rg "object BigQueryMetastoreCatalog" -n --type scala
rg "PROPERTIES_KEY_GCP_LOCATION" -n -C3 --type scala

Length of output: 1069


Verify warehouse location is properly set and non-empty
The code assumes

spark.sql.catalog.<catalogName>.<PROPERTIES_KEY_GCP_LOCATION>

is always defined. Please confirm:

  • BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_LOCATION is declared and points to a valid GCS path.
  • warehouseLocation returned by getConfString isn’t empty—add a default or fail-fast if it is.

Locations to check:

  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (lines 57–60)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3)

60-60: Sanitized table name needs documentation

The .sanitize method's behavior is unclear.

-  val destPath = s"${warehouseLocation}/export/${tableName.sanitize}/*.${formatStr}"
+  // Sanitize table name to ensure valid GCS path
+  val destPath = s"${warehouseLocation}/export/${tableName.sanitize}/*.${formatStr}"

68-92: Consider cleanup of temporary exported files

The current implementation leaves exported files in GCS.

Consider adding a cleanup step or implementing a TTL policy for exports.


117-122: Enhanced error message for unknown partition values

Current message lacks context for debugging.

-              throw new IllegalArgumentException(s"Unknown partition column system definition: ${isSystemDefined}")
+              throw new IllegalArgumentException(s"Unknown partition column system definition for column ${name}: ${isSystemDefined}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 61fa4b9 and 406e67f.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (31)
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: online_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: api_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: online_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: batch_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: flink_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: service_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: flink_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: api_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)

3-11: Import changes align with new implementation approach

Imports now include Scala Try/Success/Failure for better error handling and BigQueryMetastoreCatalog for warehouse location configuration.


14-14: Good addition of type-safe NativePartColumn case class

Clear representation of partition columns with system-defined status.


24-35: Well-structured exportDataTemplate utility

Clean SQL template generation with proper parameter interpolation.


46-92:

❓ Verification inconclusive

Verify error handling for BigQuery export job

The export functionality is entirely new and might have edge cases.


🏁 Script executed:

#!/bin/bash
# Search for similar BigQuery export job error handling patterns
rg -A 5 "bigQueryClient.create\(JobInfo.of\(" --type scala

Length of output: 794


Verify export job error handling

Only found in cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (lines 46–92). You wrap create & waitFor in a Try and check status.getError, but please confirm:

  • job.waitFor() won’t block forever—use a timeout overload.
  • Transient BigQuery errors are retried or surfaced clearly.
  • Partial export files are cleaned up on failure.

90-91: 🛠️ Refactor suggestion

Error handling improvements needed

Direct exception throwing loses stack trace context.

-      case Failure(e) =>
-        throw e
+      case Failure(e) =>
+        logger.error(s"BigQuery export failed for table ${bqFriendlyName}", e)
+        throw new RuntimeException(s"Failed to export BigQuery table: ${bqFriendlyName}", e)
⛔ Skipped due to learnings
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.

sql = select
)

logger.info(s"Export job: ${exportSQL}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential security concern in logging

The full SQL in logs may expose sensitive data.

-  logger.info(s"Export job: ${exportSQL}")
+  logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
+  logger.debug(s"Export job SQL: ${exportSQL}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info(s"Export job: ${exportSQL}")
- logger.info(s"Export job: ${exportSQL}")
+ logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
+ logger.debug(s"Export job SQL: ${exportSQL}")

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

67-67: Potential security concern in logging

The full SQL in logs may expose sensitive data.

-  logger.info(s"Export job: ${exportSQL}")
+  logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
+  logger.debug(s"Export job SQL: ${exportSQL}")
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)

46-92: Missing cleanup of exported files

Exported files remain in GCS after loading.

      case Success(_) =>
        val internalLoad = sparkSession.read.format(formatStr).load(destPath)
+       // TODO: Implement cleanup of exported files after successful load
        pColOption
          .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
            internalLoad
              .withColumnRenamed(internalBQPartitionCol, nativeColumn.colName)
          }
          .getOrElse(internalLoad)

68-72: Consider adding timeout to export job

Long-running jobs could block indefinitely.

    val exportJobTry: Try[Job] = Try {
      val exportConf = QueryJobConfiguration.of(exportSQL)
+     // Set timeout configuration if available
+     val timeoutMs = sparkSession.sessionState.conf.getConfString("spark.sql.execution.bigquery.export.timeoutMs", "3600000").toLong
      val job = bigQueryClient.create(JobInfo.of(exportConf))
-     scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
+     scala.Option(job.waitFor(RetryOption.totalTimeout(org.threeten.bp.Duration.ofMillis(timeoutMs))))
+       .getOrElse(throw new RuntimeException("Export job returned null or timed out"))
    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 406e67f and 15a9151.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (23)
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: online_tests
  • GitHub Check: streaming_tests
  • GitHub Check: api_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3)

14-14: Good addition of NativePartColumn class

Clean representation of partition columns with system flag.


24-35: LGTM for exportDataTemplate implementation

Clear template function for generating SQL export statements.


94-128: Good implementation of getPartitionColumn

Well-structured method with proper error handling.

@tchow-zlai tchow-zlai changed the title bigquery export feat: Use BigQuery exports as part of table loading in spark May 6, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

51-52: Make format string configurable.

Hardcoded "parquet" format could be made configurable.

- val formatStr = "parquet"
+ val formatStr = sparkSession.sessionState.conf
+   .getConfString(s"spark.sql.catalog.${catalogName}.export.format", "parquet")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 10da4e1 and 7f04ac7.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (29)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: batch_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: flink_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (7)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (7)

25-36: Clean implementation for export SQL generation.

Simple and clear template function.


52-56: Good handling of system-defined partition columns.

Clever approach to alias system partition columns during export.


65-66: Improved logging security.

No longer logging full SQL with potential sensitive data.


66-70: Proper error handling for export job.

Using Try for robust error handling.


79-86: Clean dataframe handling with partition column renaming.

Appropriately handles the renamed system-defined columns.


99-100: Good implementation of unique paths for exports.

Using UUID prevents concurrent export conflicts.


104-138:

❌ Incorrect review comment

Consider handling multiple partition columns.

Method only returns first partition column found.


🏁 Script executed:

#!/bin/bash
# Check if BigQuery supports multiple partition columns in a single table
rg -A 3 "multiple.*partition.*column" --type scala --type python cloud_gcp/

Length of output: 108


Ignore multiple-partition suggestion
BigQuery tables support only a single partitioning column, so returning the first (and only) partition column via headOption is correct.

Likely an incorrect or invalid review comment.

Comment on lines 92 to 102
private[cloud_gcp] def destPrefix(catalogName: String,
tableName: String,
formatStr: String,
uniqId: Option[String] = None)(implicit sparkSession: SparkSession) = {
val warehouseLocation = sparkSession.sessionState.conf
.getConfString(s"spark.sql.catalog.${catalogName}.${BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_LOCATION}")
.stripSuffix("/")
val uniqueId = uniqId.getOrElse(UUID.randomUUID().toString)
s"${warehouseLocation}/export/${tableName.sanitize}_${uniqueId}/*.${formatStr}"

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding cleanup of exported files.

Current implementation leaves exported files in GCS which may accumulate over time.

  exportJobTry.flatMap { job =>
    scala.Option(job.getStatus.getError) match {
      case Some(err) => Failure(new RuntimeException(s"BigQuery export job failed: $err"))
      case None      => Success(job)
    }
  } match {
    case Success(_) =>
      val internalLoad = sparkSession.read.format(formatStr).load(destPath)
+     // TODO: Add cleanup of exported files after successful load
      pColOption
        .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

91-101: 🛠️ Refactor suggestion

Consider file cleanup strategy

Export files will accumulate in GCS over time.

Add cleanup mechanism after successful DataFrame creation:

 exportJobTry.flatMap { job =>
   scala.Option(job.getStatus.getError) match {
     case Some(err) => Failure(new RuntimeException(s"BigQuery export job failed: $err"))
     case None      => Success(job)
   }
 } match {
   case Success(_) =>
     val internalLoad = sparkSession.read.format(formatStr).load(destPath)
+    // TODO: Add logic to clean up export files after successful read
     pColOption
       .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

64-65: Consider adding job ID to log message

Would help with troubleshooting.

-logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
+logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName} (export path: ${destPath})")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 7f04ac7 and 368162d.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (29)
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: batch_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: spark_tests
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)

14-14: Good addition of NativePartColumn case class

Clean representation of partition column metadata.


24-36: SQL template looks good

Well-structured template for BigQuery EXPORT DATA statements.


47-89: Export implementation is robust

Appropriately handles system-defined partition columns and error conditions. Job status is properly checked.


98-99: Deterministic export path management

Good pattern using optional uniqueId for testability.


103-137: Partition column detection looks good

Correctly handles system-defined vs regular partition columns.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

46-89: 🛠️ Refactor suggestion

Add cleanup for exported files.

Implementation successfully exports and loads data but doesn't clean up temporary files, which may accumulate in GCS.

Add a cleanup step after successful DataFrame loading:

 case Success(_) =>
   val internalLoad = sparkSession.read.format(formatStr).load(destPath)
+  // Clean up exported files after successful load
+  try {
+    val fs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(destPath.split("\\*")(0)), sparkSession.sparkContext.hadoopConfiguration)
+    fs.delete(new org.apache.hadoop.fs.Path(destPath.split("\\*")(0)), true)
+  } catch {
+    case e: Exception => logger.warn(s"Failed to clean up exported files at $destPath: ${e.getMessage}")
+  }
   pColOption
     .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)

64-65: Good logging but consider more details.

The log message could provide more context about the operation.

-logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
+logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName} with ${formatStr} format to ${destPath}")

65-69: Add export job timeout.

The current implementation waits indefinitely for the job to complete.

val exportJobTry: Try[Job] = Try {
  val exportConf = QueryJobConfiguration.of(exportSQL)
+  // Set a timeout for the export job
+  exportConf.toBuilder().setJobTimeoutMs(1800000L).build() // 30 minute timeout
  val job = bigQueryClient.create(JobInfo.of(exportConf))
  scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 368162d and da3f35b.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (31)
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_tests
  • GitHub Check: api_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_tests
  • GitHub Check: groupby_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: flink_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: api_tests
  • GitHub Check: online_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: flink_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: online_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: batch_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (6)

3-12: Import organization looks good.

Clean update to imports to support Try, UUID, and other additions.


14-14: Well-structured case class for partition column.

The case class clearly separates system-defined from user-defined partition columns.


24-35: Clean SQL template implementation.

The template properly formats the EXPORT DATA statement.


91-101: Well-designed destination path helper.

Method elegantly handles path creation and supports test determinism via optional UUID parameter.


103-137: Robust partition column detection.

Properly handles system-defined partition columns, with clear error messages for unknown types.


56-57:

✅ Verification successful

Verify usage of Format.getCatalog.

Confirm that Format.getCatalog is properly implemented to handle all catalog scenarios.


🏁 Script executed:

#!/bin/bash
# Check implementation of Format.getCatalog
rg -A 10 "def getCatalog" --type scala

Length of output: 1295


🏁 Script executed:

#!/bin/bash
# Locate the BigQueryNative.scala file and show its imports and implicit context
file=$(fd BigQueryNative.scala)
echo "Inspecting $file"
sed -n '1,60p' "$file"

Length of output: 2910


Format.getCatalog usage validated

  • Supports 3-part, 2-part, and 1-part identifiers; defaults to current catalog for 2- and 1-part names.
  • Implicit SparkSession is in scope in BigQueryNative.table.
  • Throws on invalid patterns.

No changes required.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

49-49: Potential SQL injection vulnerability

Direct concatenation of partition filters into SQL query.

-val partitionWheres = if (partitionFilters.nonEmpty) s"WHERE ${partitionFilters}" else partitionFilters
+val partitionWheres = if (partitionFilters.nonEmpty) {
+  // Basic validation to prevent SQL injection
+  if (partitionFilters.contains(";") || partitionFilters.toLowerCase.contains("union")) {
+    throw new IllegalArgumentException(s"Invalid partition filter: $partitionFilters")
+  }
+  s"WHERE ${partitionFilters}" 
+} else partitionFilters
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)

71-88: Consider adding file cleanup after export

Exported files remain in GCS indefinitely.

 case Success(_) =>
   val internalLoad = sparkSession.read.format(formatStr).load(destPath)
+  // TODO: Add cleanup logic for exported files
   pColOption
     .map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between da3f35b and 668486a.

📒 Files selected for processing (2)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
⏰ Context from checks skipped due to timeout of 90000ms (30)
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: online_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: streaming_tests
  • GitHub Check: flink_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: batch_tests
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (5)

14-14: Good addition of NativePartColumn case class

Cleanly separates system-defined partition columns.


24-36: LGTM: SQL template is well structured

Clean template function for BigQuery EXPORT DATA statement.


51-55: Good handling of system-defined partition columns

Correctly aliases system-defined columns to avoid missing data.


91-101: Good design for deterministic testing

Optional uniqueId parameter enables deterministic testing.


103-137: Well-structured partition column detection

Properly handles system-defined vs. regular partition columns.

Comment on lines +64 to +69
logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
val exportJobTry: Try[Job] = Try {
val exportConf = QueryJobConfiguration.of(exportSQL)
val job = bigQueryClient.create(JobInfo.of(exportConf))
scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add timeout to waitFor()

Prevent job from hanging indefinitely.

-scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
+scala.Option(job.waitFor(RetryOption.totalTimeout(org.threeten.bp.Duration.ofMinutes(30))))
+  .getOrElse(throw new RuntimeException("Export job returned null"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
val exportJobTry: Try[Job] = Try {
val exportConf = QueryJobConfiguration.of(exportSQL)
val job = bigQueryClient.create(JobInfo.of(exportConf))
scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
}
logger.info(s"Starting BigQuery export job for table: ${bqFriendlyName}")
val exportJobTry: Try[Job] = Try {
val exportConf = QueryJobConfiguration.of(exportSQL)
val job = bigQueryClient.create(JobInfo.of(exportConf))
- scala.Option(job.waitFor()).getOrElse(throw new RuntimeException("Export job returned null"))
+ scala.Option(job.waitFor(RetryOption.totalTimeout(org.threeten.bp.Duration.ofMinutes(30))))
+ .getOrElse(throw new RuntimeException("Export job returned null"))
}

@tchow-zlai tchow-zlai merged commit f08593b into main May 6, 2025
33 of 50 checks passed
@tchow-zlai tchow-zlai deleted the tchow/export-bq branch May 6, 2025 17:31
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

- Bigquery connector doesn't support reading large datasets well.
Instead, we'll leverage [BigQuery
Exports](https://cloud.google.com/bigquery/docs/exporting-data#sql)
which is an official approach to getting data out of bigquery. As part
of table loading, we'll first export the data to a GCS warehouse
location, which should be a quick operation. Upon testing this on
production data, it seems very quick (<10 seconds to extract 100GB).
- There are some nuances in handling partition columns, particularly
system defined pseudocolumns. since they don't show up in the projection
if you do a `SELECT * ...`, we'll need to [alias
them](https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table).
The logic is as follows:

1. Given a table, we check the information schema to see if it is
partitioned
2. If partitioned, check if it's a system defined partition column.
3. 
(a) If it's a system defined partition column, we'll alias that column
to an internal chronon reserved name. If it's not, we'll simply just do
a `SELECT * ` with no alias.
(b) If not partitioned (eg in the case of a view), we'll just do a
simple `SELECT * ` and apply the "partition" filters requested by the
reader.
4. After the data gets exported with the possible alias in (3a), we'll
read it back as a spark dataframe and rename the aliased column to the
system defined partition column name. The rename is a noop if the
internal column alias is not present (in the absence of a system defined
partition column).


We'll use the reserved catalog conf:
`spark.sql.catalog.<catalog_name>.warehouse` as the root location to do
exports, which is configured per project.



## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Added support for exporting BigQuery table data as Parquet files to
Google Cloud Storage, improving data loading into Spark.

- **Refactor**
- Replaced partition-based BigQuery reads with export-to-GCS approach
for enhanced performance and reliability.
- Centralized catalog retrieval logic for table formats, removing
deprecated methods and improving consistency.
  - Updated test cases to align with new catalog retrieval method.
  - Cleaned up import statements for better code organization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: thomaschow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

- Bigquery connector doesn't support reading large datasets well.
Instead, we'll leverage [BigQuery
Exports](https://cloud.google.com/bigquery/docs/exporting-data#sql)
which is an official approach to getting data out of bigquery. As part
of table loading, we'll first export the data to a GCS warehouse
location, which should be a quick operation. Upon testing this on
production data, it seems very quick (<10 seconds to extract 100GB).
- There are some nuances in handling partition columns, particularly
system defined pseudocolumns. since they don't show up in the projection
if you do a `SELECT * ...`, we'll need to [alias
them](https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table).
The logic is as follows:

1. Given a table, we check the information schema to see if it is
partitioned
2. If partitioned, check if it's a system defined partition column.
3. 
(a) If it's a system defined partition column, we'll alias that column
to an internal chronon reserved name. If it's not, we'll simply just do
a `SELECT * ` with no alias.
(b) If not partitioned (eg in the case of a view), we'll just do a
simple `SELECT * ` and apply the "partition" filters requested by the
reader.
4. After the data gets exported with the possible alias in (3a), we'll
read it back as a spark dataframe and rename the aliased column to the
system defined partition column name. The rename is a noop if the
internal column alias is not present (in the absence of a system defined
partition column).


We'll use the reserved catalog conf:
`spark.sql.catalog.<catalog_name>.warehouse` as the root location to do
exports, which is configured per project.



## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Added support for exporting BigQuery table data as Parquet files to
Google Cloud Storage, improving data loading into Spark.

- **Refactor**
- Replaced partition-based BigQuery reads with export-to-GCS approach
for enhanced performance and reliability.
- Centralized catalog retrieval logic for table formats, removing
deprecated methods and improving consistency.
  - Updated test cases to align with new catalog retrieval method.
  - Cleaned up import statements for better code organization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: thomaschow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

- Bigquery connector doesn't support reading large datasets well.
Instead, we'll leverage [BigQuery
Exports](https://cloud.google.com/bigquery/docs/exporting-data#sql)
which is an official approach to getting data out of bigquery. As part
of table loading, we'll first export the data to a GCS warehouse
location, which should be a quiour clients operation. Upon testing this on
production data, it seems very quiour clients (<10 seconds to extract 100GB).
- There are some nuances in handling partition columns, particularly
system defined pseudocolumns. since they don't show up in the projection
if you do a `SELECT * ...`, we'll need to [alias
them](https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table).
The logic is as follows:

1. Given a table, we cheour clients the information schema to see if it is
partitioned
2. If partitioned, cheour clients if it's a system defined partition column.
3. 
(a) If it's a system defined partition column, we'll alias that column
to an internal chronon reserved name. If it's not, we'll simply just do
a `SELECT * ` with no alias.
(b) If not partitioned (eg in the case of a view), we'll just do a
simple `SELECT * ` and apply the "partition" filters requested by the
reader.
4. After the data gets exported with the possible alias in (3a), we'll
read it baour clients as a spark dataframe and rename the aliased column to the
system defined partition column name. The rename is a noop if the
internal column alias is not present (in the absence of a system defined
partition column).


We'll use the reserved catalog conf:
`spark.sql.catalog.<catalog_name>.warehouse` as the root location to do
exports, which is configured per project.



## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Added support for exporting BigQuery table data as Parquet files to
Google Cloud Storage, improving data loading into Spark.

- **Refactor**
- Replaced partition-based BigQuery reads with export-to-GCS approach
for enhanced performance and reliability.
- Centralized catalog retrieval logic for table formats, removing
deprecated methods and improving consistency.
  - Updated test cases to align with new catalog retrieval method.
  - Cleaned up import statements for better code organization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to traour clients
the status of staour clientss when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: thomaschow <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants