-
Notifications
You must be signed in to change notification settings - Fork 0
feat: stagingquery on GCP #225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request modifies the Changes
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
6ca603a
to
f4fc0ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
val df = if (formats.map { case (_, fmtName) => fmtName.toUpperCase }.contains("BIGQUERY")) { | ||
val firstDataset = formats | ||
.collectFirst { | ||
case (mpi, _) => mpi(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure safe access to mpi(1)
Accessing mpi(1)
may cause an IndexOutOfBoundsException
if mpi
has fewer than two elements. Verify that mpi
always has at least two elements before accessing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (2)
40-44
: LGTM!The new imports support SQL parsing functionality.
411-440
:⚠️ Potential issueFix potential array index out of bounds.
The access to
mpi(1)
at line 428 could throw an IndexOutOfBoundsException ifmpi
has fewer than 2 elements.- case (mpi, _) if mpi.size == 3 => mpi(1) + case (mpi, _) if mpi.size >= 3 => mpi(1)Verify BigQuery table name resolution.
The code assumes table names will resolve to valid BigQuery datasets. Add validation for table name format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
411-438
:⚠️ Potential issueEnhanced query execution with format-specific handling.
The new implementation:
- Parses query to LogicalPlan
- Identifies table formats
- Applies BigQuery-specific configuration when needed
However, there's a potential NPE when accessing mpi(1).
Add null check:
- case (mpi, _) if mpi.size == 3 => mpi(1) + case (mpi, _) if mpi.size >= 3 => mpi(1)
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
424-424
: Address TODO comment.Consider using a configuration value for global materializationDataset.
Would you like me to implement a configuration-based solution for the global materializationDataset?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala (1)
40-42
: Configuration handling simplified.Direct configuration setting is cleaner than the previous save/restore pattern.
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (2)
40-42
: New imports for query parsing.Added necessary imports for logical plan analysis.
425-437
: Verify BigQuery format handling across codebase.Let's ensure consistent BigQuery configuration handling.
✅ Verification successful
BigQuery configuration is correctly implemented
The different materializationDataset sources are intentional:
- Ad-hoc queries use format-provided dataset
- Partition queries use table's dataset
- Write operations use configured output dataset
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Find all BigQuery format usage rg -l 'format\s*=\s*"bigquery"' # Find viewsEnabled configuration rg 'viewsEnabled' # Find materializationDataset usage rg 'materializationDataset'Length of output: 1041
Script:
#!/bin/bash # Check BigQueryFormat implementation cat cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala # Check GcpFormatProvider implementation cat cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scalaLength of output: 7471
val logicalPlan: LogicalPlan = sparkSession.sessionState.sqlParser.parsePlan(query) | ||
|
||
// Load tables through DataPointer resolution. | ||
val formats = logicalPlan | ||
.collect { | ||
case ur @ UnresolvedRelation(mpi, _, _) => { | ||
DataPointer.from(ur.tableName, sparkSession).readFormat match { | ||
case Some(fmtName) => (mpi, fmtName) | ||
} | ||
} | ||
} | ||
|
||
// todo(tchow): Use a global materializationDataset value. | ||
val df = if (formats.map { case (_, fmtName) => fmtName.toUpperCase }.contains("BIGQUERY")) { | ||
val firstDataset = formats | ||
.collectFirst { | ||
case (mpi, _) if mpi.size == 3 => mpi(1) | ||
} | ||
.getOrElse(throw new IllegalArgumentException("Could not find candidate dataset for materializationDataset")) | ||
val df = sparkSession.read | ||
.option("viewsEnabled", true.toString) | ||
.option("materializationDataset", firstDataset) | ||
.format("bigquery") | ||
.load(query) | ||
df | ||
} else sparkSession.sql(query) | ||
df.coalesce(partitionCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break this out into a function checkAndRunBigQuery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this will trigger bigquery run on even groupBy selects which is probably not the desired behavior.
we should move this into staging query - or an opportunisticallyRunOnBigQuery func.
@nikhil-zlai can you show me where that is? AFAICT this will be used to run setups in GroupBy and Join, plus some other spark sql stuff which will mostly stay in spark based on this logic. |
17641f0
to
22a3d70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
422-435
: Consider extracting BigQuery handling to a separate method.The BigQuery-specific logic would be more maintainable as a separate method.
+ private def handleBigQueryExecution(query: String, firstDataset: String): DataFrame = { + sparkSession.read + .option("viewsEnabled", true.toString) + .option("materializationDataset", firstDataset) + .format("bigquery") + .load(query) + } + def sql(query: String): DataFrame = { // ... existing code ... val df = if (formats.map { case (_, fmtName) => fmtName.toUpperCase }.contains("BIGQUERY")) { val firstDataset = formats .collectFirst { case (mpi, _) if mpi.size >= 2 => mpi(1) } .getOrElse(throw new IllegalArgumentException("Could not find candidate dataset for materializationDataset")) - val df = sparkSession.read - .option("viewsEnabled", true.toString) - .option("materializationDataset", firstDataset) - .format("bigquery") - .load(query) - df + handleBigQueryExecution(query, firstDataset) } else sparkSession.sql(query)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: other_spark_tests
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (2)
40-43
: LGTM: Required imports for SQL parsing.
424-428
:⚠️ Potential issueAdd bounds check for mpi array access.
Accessing
mpi(1)
could throw IndexOutOfBoundsException if mpi has fewer than 2 elements.- case (mpi, _) if mpi.size == 3 => mpi(1) + case (mpi, _) if mpi.size >= 2 => mpi(1)Likely invalid or redundant comment.
case ur @ UnresolvedRelation(mpi, _, _) => { | ||
DataPointer.from(ur.tableName, sparkSession).readFormat match { | ||
case Some(fmtName) => (mpi, fmtName) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add null check for format extraction.
The format extraction could throw NPE if DataPointer.from(ur.tableName, sparkSession).readFormat
returns None.
- DataPointer.from(ur.tableName, sparkSession).readFormat match {
- case Some(fmtName) => (mpi, fmtName)
- }
+ DataPointer.from(ur.tableName, sparkSession).readFormat match {
+ case Some(fmtName) => Some((mpi, fmtName))
+ case None => None
+ }
Committable suggestion skipped: line range outside the PR's diff.
// todo(tchow): Use a global materializationDataset value. | ||
val df = if (formats.map { case (_, fmtName) => fmtName.toUpperCase }.contains("BIGQUERY")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Consider implementing materialization dataset configuration now
Current implementation uses brittle position-based access (mpi(1)) and is tightly coupled with format-specific logic. A global configuration would improve maintainability.
🔗 Analysis chain
Consider using configuration for materialization dataset.
The TODO comment suggests using a global value. Consider implementing this now to avoid technical debt.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if there's already a configuration for materialization dataset
rg -l "materializationDataset.*=.*" --type scala
Length of output: 50
Script:
#!/bin/bash
# Search for the TODO comment and surrounding context
rg -B 5 -A 5 "todo.*Use a global materializationDataset" --type scala
# Search for BigQuery-related configuration patterns
rg -l "bigquery.*config" --type scala
# Search for similar configuration patterns
ast-grep --pattern 'val $_ = config.getString($_)'
Length of output: 1148
Script:
#!/bin/bash
# Search for configuration-related files
fd -e scala config
# Search for configuration patterns in scala files
rg -l "extends.*Config" --type scala
Length of output: 132
Script:
#!/bin/bash
# Check content of Config.scala
cat orchestration/src/main/scala/ai/chronon/orchestration/utils/Config.scala
Length of output: 1796
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191 | ||
// and: https://cloud.google.com/bigquery/docs/information-schema-intro#limitations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you want to kill these lines too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are still valid, we're just setting them in a different way.
22a3d70
to
a3bdc42
Compare
a3bdc42
to
c3dd039
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
433-438
: Consider using DataFrameReader options consistently.The implementation correctly supports staging queries by inferring from project-ID.
Consider moving viewsEnabled and materializationDataset to a configuration object for better maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (3)
413-415
: LGTM!Clean implementation of logical plan parsing.
428-432
:⚠️ Potential issueAdd bounds check for mpi access.
Direct access to mpi(1) could throw IndexOutOfBoundsException.
- .collectFirst { - case (mpi, _) if mpi.size == 3 => mpi(1) - } + .collectFirst { + case (mpi, _) if mpi.size >= 2 => mpi(1) + }Likely invalid or redundant comment.
416-424
:⚠️ Potential issueAdd error handling for format extraction.
Missing case for None in pattern matching could cause NPE.
- DataPointer.from(ur.tableName, sparkSession).readFormat match { - case Some(fmtName) => (mpi, fmtName) - } + DataPointer.from(ur.tableName, sparkSession).readFormat match { + case Some(fmtName) => Some((mpi, fmtName)) + case None => None + }Likely invalid or redundant comment.
8c32c96
to
a14078c
Compare
a14078c
to
0e35c87
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
426-427
: Consider implementing global configuration now.Move materialization dataset configuration to a global setting to improve maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: mutation_spark_tests
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4)
413-415
: LGTM!Clean and standard approach to SQL parsing.
433-438
: LGTM!Clean BigQuery read configuration.
426-431
:⚠️ Potential issueAdd bounds check for array access.
Accessing
mpi(1)
could throw IndexOutOfBoundsException.Apply this diff:
- .collectFirst { - case (mpi, _) if mpi.size == 3 => mpi(1) - } + .collectFirst { + case (mpi, _) if mpi.size >= 2 => mpi(1) + }Likely invalid or redundant comment.
416-424
:⚠️ Potential issueAdd error handling for format resolution.
The pattern matching is incomplete and could fail if readFormat returns None.
Apply this diff:
- case ur @ UnresolvedRelation(mpi, _, _) => { - DataPointer.from(ur.tableName, sparkSession).readFormat match { - case Some(fmtName) => (mpi, fmtName) - } - } + case ur @ UnresolvedRelation(mpi, _, _) => + DataPointer.from(ur.tableName, sparkSession).readFormat match { + case Some(fmtName) => Some((mpi, fmtName)) + case None => None + }Likely invalid or redundant comment.
0e35c87
to
04c50dc
Compare
04c50dc
to
6b14f76
Compare
4bd578b
to
2ad3039
Compare
22fb501
to
7dab021
Compare
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
7dab021
to
98dc781
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryStagingQuery.scala (1)
6-11
: Class currently just proxies to parent implementation.This new class extends StagingQuery but currently only calls the parent implementation without adding BigQuery-specific functionality.
Consider adding documentation explaining the purpose of this specialized class and its expected future behavior for BigQuery staging queries.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryStagingQuery.scala
(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryStagingQuery.scala (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
Summary
Checklist
Summary by CodeRabbit
New Features
BigQueryStagingQuery
class for staging queries.Bug Fixes