-
Notifications
You must be signed in to change notification settings - Fork 0
feat: use spark bq connector v1 #664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update introduces enhancements to table reading and filtering logic across multiple Scala modules. A new Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant TableUtils
participant Format
participant SparkSession
Caller->>TableUtils: loadTable(tableName, rangeWheres)
TableUtils->>Format: table(tableName, combinedPredicates)
Format->>SparkSession: read.table(tableName).where(combinedPredicates)
SparkSession-->>Format: DataFrame
Format-->>TableUtils: DataFrame
TableUtils-->>Caller: DataFrame
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
🧰 Additional context used🧬 Code Graph Analysis (1)spark/src/main/scala/ai/chronon/spark/GroupBy.scala (2)
⏰ Context from checks skipped due to timeout of 90000ms (35)
🔇 Additional comments (1)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)
124-135
: Consider using DEBUG level for table column logs.Logging at INFO may be too verbose for regular operation.
- logger.info("Table columns:") - logger.info(f"${nativeTable.columns()}") + logger.debug("Table columns:") + logger.debug(f"${nativeTable.columns()}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
api/python/ai/chronon/resources/gcp/zipline-cli-install.sh
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (22)
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: hub_tests
- GitHub Check: hub_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: flink_tests
- GitHub Check: orchestration_tests
- GitHub Check: aggregator_tests
- GitHub Check: orchestration_tests
- GitHub Check: python_tests
- GitHub Check: aggregator_tests
- GitHub Check: api_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
api/python/ai/chronon/resources/gcp/zipline-cli-install.sh (1)
52-53
: Good improvement moving trap earlier.Moving cleanup trap after download ensures file removal on any exit.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
27-27
: LGTM: Added SLF4J import.Import needed for logging functionality.
51-52
: LGTM: Added logger instantiation.Standard pattern for SLF4J logger in Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (2)
118-124
: Update error message for consistency.The error message mentions "partition filter" but the parameter is named "rangeWheres".
def loadTable(tableName: String, rangeWheres: Seq[String] = List.empty[String]): DataFrame = { tableFormatProvider .readFormat(tableName) .map(_.table(tableName, combinePredicates(rangeWheres))(sparkSession)) .getOrElse( - throw new RuntimeException(s"Could not load table: ${tableName} with partition filter: ${rangeWheres}")) + throw new RuntimeException(s"Could not load table: ${tableName} with range predicates: ${rangeWheres}")) }
571-575
: Remove duplicate logging.The combined predicate string is logged both here and in scanDfBase method.
private def combinePredicates(predicates: Seq[String]): String = { val whereStr = predicates.map(p => s"($p)").mkString(" AND ") - logger.info(s"""Where str: $whereStr""") whereStr }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/format/Format.scala
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
spark/src/main/scala/ai/chronon/spark/format/Format.scala (1)
table
(11-15)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (1)
readFormat
(24-46)spark/src/main/scala/ai/chronon/spark/format/DefaultFormatProvider.scala (1)
readFormat
(15-23)spark/src/main/scala/ai/chronon/spark/format/Format.scala (1)
table
(11-15)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (1)
table
(16-27)
⏰ Context from checks skipped due to timeout of 90000ms (35)
- GitHub Check: service_commons_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: flink_tests
- GitHub Check: service_tests
- GitHub Check: online_tests
- GitHub Check: hub_tests
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: join_tests
- GitHub Check: hub_tests
- GitHub Check: groupby_tests
- GitHub Check: online_tests
- GitHub Check: fetcher_tests
- GitHub Check: flink_tests
- GitHub Check: analyzer_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: batch_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: orchestration_tests
- GitHub Check: orchestration_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/format/Format.scala (1)
5-5
: Import added for new method return type.Adding the DataFrame import supports the new table method's return type.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)
9-9
: Import added for method return type.Adding the DataFrame import supports the overridden table method's return type.
16-27
: Good implementation with proper filter handling.The implementation correctly handles empty filters with different approaches and properly formats the filter string for BigQuery.
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
599-603
: Filter logic refactoring looks good.Moving the rangeWheres filtering to loadTable and using combinePredicates for wheres improves code organization.
935b614
to
9f3b47f
Compare
9f3b47f
to
b0668d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/ResultValidationAbilityTest.scala (1)
63-75
: Consider testing partition filtering functionalityThe test verifies basic validation but doesn't test the new partition filtering capability introduced by the updated loadTable method.
Consider adding a test case that verifies partition predicates are properly applied:
it should "successful validation" in { val args = new TestArgs(Seq("--conf-path", confPath, "--expected-result-table", "a_table").toArray) // simple testing, more comprehensive testing are already done in CompareTest.scala val leftData = Seq((1, Some(1), 1.0, "a", "2021-04-10"), (2, Some(2), 2.0, "b", "2021-04-10")) val columns = Seq("serial", "value", "rating", "keyId", "ds") val rdd = args.sparkSession.sparkContext.parallelize(leftData) val df = args.sparkSession.createDataFrame(rdd).toDF(columns: _*) when(mockTableUtils.loadTable(any(), any())).thenReturn(df) assertTrue(args.validateResult(df, Seq("keyId", "ds"), mockTableUtils)) } + + it should "apply partition filters when validating" in { + val args = new TestArgs(Seq("--conf-path", confPath, "--expected-result-table", "a_table").toArray) + val columns = Seq("serial", "value", "rating", "keyId", "ds") + val data = Seq((1, Some(1), 1.0, "a", "2021-04-10")) + val rdd = args.sparkSession.sparkContext.parallelize(data) + val df = args.sparkSession.createDataFrame(rdd).toDF(columns: _*) + + // Verify that partition filters are passed correctly + when(mockTableUtils.loadTable(any(), any())).thenAnswer(invocation => { + val filters = invocation.getArguments()(1).asInstanceOf[Seq[String]] + assert(filters.contains("ds = '2021-04-10'")) + df + }) + + assertTrue(args.validateResult(df, Seq("keyId", "ds"), mockTableUtils)) + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/ResultValidationAbilityTest.scala
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
spark/src/test/scala/ai/chronon/spark/test/ResultValidationAbilityTest.scala (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
loadTable
(118-124)
⏰ Context from checks skipped due to timeout of 90000ms (35)
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: service_commons_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: hub_tests
- GitHub Check: service_tests
- GitHub Check: fetcher_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: hub_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: online_tests
- GitHub Check: orchestration_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: join_tests
- GitHub Check: aggregator_tests
- GitHub Check: python_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: orchestration_tests
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/ResultValidationAbilityTest.scala (2)
72-72
: Updated mocking to match new TableUtils.loadTable signatureCorrectly updated to use two parameters to align with the enhanced loadTable method that now accepts partition filters.
88-88
: Mock updated correctly for failed validation test caseProperly updated mock to use two parameters for loadTable method.
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
00ee4b2
to
ea92661
Compare
@@ -564,6 +568,12 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable | |||
} | |||
} | |||
|
|||
private def andPredicates(predicates: Seq[String]): String = { | |||
val whereStr = predicates.map(p => s"($p)").mkString(" AND ") | |||
logger.info(s"""Where str: $whereStr""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove as you're printing on line 601 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove that one actually.
Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
@@ -655,7 +655,7 @@ object GroupBy { | |||
|""".stripMargin) | |||
metaColumns ++= timeMapping | |||
|
|||
val partitionConditions = tableUtils.whereClauses(intersectedRange) | |||
val partitionConditions = tableUtils.whereClauses(intersectedRange, source.partitionColumn(tableUtils)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @varant-zlai
## Summary - We need to bring back the v1 version of Datasource for spark bigquery connector, since it supports partition pushdown. And alternative project_id's. The catalog version in the spark bigquery connector does not support that. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Enhanced table reading capabilities with support for applying partition filters and combining multiple predicates for more flexible data queries. - **Refactor** - Improved internal handling of predicate filters and table loading logic for more consistent and maintainable data access. - Refined data filtering by explicitly incorporating partition column information for more precise queries. - **Chores** - Updated script to ensure temporary files are cleaned up more reliably during installation processes. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - We need to bring back the v1 version of Datasource for spark bigquery connector, since it supports partition pushdown. And alternative project_id's. The catalog version in the spark bigquery connector does not support that. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Enhanced table reading capabilities with support for applying partition filters and combining multiple predicates for more flexible data queries. - **Refactor** - Improved internal handling of predicate filters and table loading logic for more consistent and maintainable data access. - Refined data filtering by explicitly incorporating partition column information for more precise queries. - **Chores** - Updated script to ensure temporary files are cleaned up more reliably during installation processes. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - We need to bring back the v1 version of Datasource for spark bigquery connector, since it supports partition pushdown. And alternative project_id's. The catalog version in the spark bigquery connector does not support that. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Enhanced table reading capabilities with support for applying partition filters and combining multiple predicates for more flexible data queries. - **Refactor** - Improved internal handling of predicate filters and table loading logic for more consistent and maintainable data access. - Refined data filtering by explicitly incorporating partition column information for more precise queries. - **Chores** - Updated script to ensure temporary files are cleaned up more reliably during installation processes. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - We need to bring back the v1 version of Datasource for spark bigquery connector, since it supports partition pushdown. And alternative project_id's. The catalog version in the spark bigquery connector does not support that. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Enhanced table reading capabilities with support for applying partition filters and combining multiple predicates for more flexible data queries. - **Refactor** - Improved internal handling of predicate filters and table loading logic for more consistent and maintainable data access. - Refined data filtering by explicitly incorporating partition column information for more precise queries. - **Chores** - Updated script to ensure temporary files are cleaned up more reliably during installation processes. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - We need to bring baour clients the v1 version of Datasource for spark bigquery connector, since it supports partition pushdown. And alternative project_id's. The catalog version in the spark bigquery connector does not support that. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Enhanced table reading capabilities with support for applying partition filters and combining multiple predicates for more flexible data queries. - **Refactor** - Improved internal handling of predicate filters and table loading logic for more consistent and maintainable data access. - Refined data filtering by explicitly incorporating partition column information for more precise queries. - **Chores** - Updated script to ensure temporary files are cleaned up more reliably during installation processes. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
Summary
Checklist
Summary by CodeRabbit
Summary by CodeRabbit